获取具有属性的 RabbitMQ 主体

我正在像这样从 RabbitMQ 读取:


connection = factory.newConnection();

ch = connection.createChannel() ;

String queueName = managerProps.getProperty("rmq.queue.name");

ch.queueDeclare(queueName ,true,false,false, null) ;


while (true) {

    GetResponse chResponse = ch.basicGet(queueName, false);

    logger.info("----" + new String(chResponse.getBody(), "UTF-8") + " ---\n");

}

这是我在日志中看到的:


[Thread-5] INFO com.mycompany.RmqReader - ----?? ♣wx .com.rabbitmq.jms.client.message.RMQTextMessage $b1213c86-10f4-4113-bd2f-45aaabce083f   ♠ ←rmq.jms.meamqpQueueNameq ~ ☺L ♫amqpRoutingKeyq ~ ☺L ☼destinationNameq ~ ☺xp ☺ t ↕jms.durable.queuest !MY.Queue.Name ~ ♦q ~ ♦z  ☻O ↔rmq.jms.message.delivery.mode♦   ☻ ↓rmq.jms.message.timestamp♣  ☺j?∟ ↑rmq.jms.message.priority♦   ♦ →rmq.jms.message.expiration♣         ↕rmq.jms.message.i 'ID:b1213c86-10f4-4113-bd2f-45aaabce083f   ♂ ◄objectTransaction☺☺ ►templateEndpoin -jtemplate://JSONDeliveryTemplateParallel.java ►deliveryLocatio +jms:queue:My.Queue.Name ►destinationIndex♦   ☺ ♫subsCutOffTime♦     ♀breadcrumbI ♀1149808347.0 ◄globalDeliveryUID♦ ?]? ►subscriptionNam §option_session_pubsub ◄originalMessageI ♀1149808347.0 ↨subscriptionDeliveryUID♦ ??8 ¶transactionTimestamp♣  ☺j??(z  ♥R   ♥M[

{"OptSession": {.... the actual body is here....}}

] ---

为什么我在这里看到标题?我如何实际提取身体?

http://img4.mukewang.com/639ad2c2000152f509340847.jpg

红颜莎娜
浏览 65回答 1
1回答

当年话下

看起来您已经使用 RMQConnectionFactory 发布了符合 jms 的数据,现在您正在使用非 jms ConnectionFactory请查看使用 RMQConnectionFactory 的示例消费者https://github.com/kunhaj/samples/blob/master/rabbitmq/src/main/java/RabbitMqConsumer.javaimport com.rabbitmq.jms.admin.RMQConnectionFactory;import javax.jms.*;/** *  docker run -d --hostname my-rabbit --name  *   some-rabbit  -p 5672:5672 -p 15672:15672 rabbitmq:3-management */public class RabbitMqConsumer {    public static void main(String[] args) throws Exception {        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();        connectionFactory.setUsername("guest");        connectionFactory.setPassword("guest");        connectionFactory.setVirtualHost("/");        connectionFactory.setHost("localhost");        connectionFactory.setPort(5672);        connectionFactory.setDeclareReplyToDestination(false);        Connection connection = connectionFactory.createConnection();        connection.start();        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        Queue queue = session.createQueue("TEST-QUEUE");        drainQueue(session, queue);    }    protected static void drainQueue(Session session, Queue queue) throws Exception {        MessageConsumer receiver = session.createConsumer(queue);        Message msg = receiver.receiveNoWait();        while (msg != null) {            String msgBody = ((TextMessage) msg).getText();            System.out.println("recieved" + msgBody);            msg = receiver.receiveNoWait();        }    }}另请参阅 JMS 和 AMQP 0-9-1 目标互操作性https://www.rabbitmq.com/jms-client.html#destination-interoperability
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java