手记

RabbitMQ客户端程序实践

下面记录客户端编写时应注意的几个方面。

持久化消息

持久化消息必须达到以下3个要求:

  • 交换器持久化

  • 目标队列持久化

  • 消息的投递模式持久化

满足这些条件后broker(消息服务器)保存交换器元数据,队列元数据,绑定元数据,vhost元数据和消息到磁盘,这些数据将broker异常时恢复所有配置信息及消息。下面是客户端发送消息的代码段:

public void sendMsg(){
        ConnectionFactory factory=new ConnectionFactory();
        factory.setUsername("remoteuser1");
        factory.setPassword("remoteuser1");
        factory.setVirtualHost("/");
        factory.setHost("192.168.10.1");
        factory.setPort(5672);
        Connection conn=factory.newConnection();
        Channel channel=conn.createChannel();
        channel.exchangeDeclare("exchangeName1","direct",true);//声明可持久化交换器
        channel.queueDeclare("queue1", true, false, false, null);//声明可持久化队列
        channel.queueBind("queue1", "exchangeName1",   "routingkey1");//绑定队列到交换器并指定路由键

        channel.queueDeclare("queue2", false, false, false, null);//声明非持久化队列
        channel.queueBind("queue2", "exchangeName1", "routingkey1");        /*
        此处放置发送消息的代码,暂时不写
        */
        channel.close();
        conn.close();
}

执行代码,并使用如下命令在RabbitMQ节点查看队列当前的状态:

# rabbitmqctl list_queues name durable state messages_ready messages_ready_ram messages_ready_persistent

声明队列.png


从上图可以看到队列queue1设置为持久化的,queue2设置为非持久化。现在添加如下发送消息的代码段:

public void sendMsg(){/*
建立rabbitmq server连接
*/byte[] messageBody="hello world".getBytes();
channel.basicPublish("exchangeName1", "routingkey1", null, messageBody);//使用默认的发送模式/*
关闭连接
*/}

再次执行上面查看队列状态的命令,结果如下图:


队列消息状态.png


可以看到两个队列在内存中都有1条消息,但是持久化消息没有。使用如下命令手动重启RabbitMQ服务器,再次查看消息队列信息,会得到下图:

# systemctl restart rabbitmq-server.service# rabbitmqctl list_queues name durable state messages_ready messages_ready_ram messages_ready_persistent

消息丢失并且queue2丢失.png

  • 消息丢失是因为channel.basicPublish方法没有设置发送模式,默认使用非持久化模式。

  • queue2消息是因为channel.queueDeclare方法把队列设置为非持久化模式。

现在修改发送消息的模式,实现消息持久化:

public void sendMsg(){/*
建立rabbitmq server连接
*/byte[] messageBody="hello world".getBytes();
channel.basicPublish("exchangeName1", "routingkey1", 
                new AMQP.BasicProperties.Builder()
                .deliveryMode(2).build(), 
                messageBody);//设置发送模式为持久化/*
关闭连接
*/}

再次执行下面的命令查看队列消息当前状态,如图所示:

# rabbitmqctl list_queues name durable state messages_ready messages_ready_ram messages_ready_persistent

持久化消息.png


再次执行重启RabbitMQ服务并查看队列消息状态,如下图所示:


消费恢复.png


<code>queue1</code>的消息可以正常恢复。

Persistence Layer

RabbitMQ内部持久化层由两部分组成:message store和queue index,如下图所示。

持久化.png

  • message-store,用于存储消息体,消息头,相关属性。所有队列共享。

  • queue-index,每个队列对应一个,用于存储队列中消息在message-store中的位置,是否已发送,是否已消费确认。默认消息小于4kb时直接存储到queue_index,否则存储到message store。
    详情请参考http://www.rabbitmq.com/persistence-conf.html

Publisher Confirm

publisher confirm机制用于解决生产者与rabbitmq服务器之间消息可靠传输。它在消息服务器持久化消息后通知消息生产者发送成功。添加如下代码片段:

channel.confirmSelect();
channel.basicPublish(...);boolean isSuccess=channel.waitForConfirmsOrDie();

在此channel上发送的消息都将执行confirm机制。<code>waitForConfirmsOrDie()</code>将会阻塞进程,降低生产者的吞吐量。confirm机制的异步实现通过订阅<code>ConfirmListener</code>来实现,消息服务器不保证发送消息的顺序与confirm的顺序相同。添加如下代码片段处理confirm回调,判断消息是否发送成功:

channel.addConfirmListener(new ConfirmListener(){       public void handleAck(long deliveryTag,boolean multiple){      //发送成功
        System.out.println(String.format("message ack %s",deliveryTag));
       }            
       public void handleNack(long deliveryTag,boolean multiple){      //发送失败
        System.out.println(String.format("message nack %s",deliveryTag));
        }
  });

生产者也可以订阅returnListener用于判断消息状态,它会在confirmListener之前执行,代码片段如下:

channel.addReturnListener(new ReturnListener(){            public void handleReturn(int replyCode,
                    String replyText,
                    String exchange,
                    String routingKey,
                    AMQP.BasicProperties properties,                    byte[] body)
                throws IOException{
                System.out.println(replyText);
                System.out.println(routingKey);
            }
        });

对于不同的消息类型,confirm机制有所不同:

  • 非持久化消息,消息服务器接收到消息,立即返回confirm。

  • 持久化消息,消息服务器接收到消息,并且持久化到磁盘后返回confirm。

通过publisher confirm机制保证生产者明确知道消息是否发送成功,但是并不能保证消息是否已路由到队列。rabbitmq没有消息追溯的功能,因此仍需要其它解决方案来追溯消息。

Consumer acknowledge

consumer acknowledge机制用于保证rabbitmq服务器与消费者之间消息可靠传输。代码片段如下:

//第2个参数表示取消自动确认channel.basicConsume("queue1", false, new DefaultConsumer(channel){            @Override
            public void handleDelivery(String consumerTag,
                    Envelope envelope,
                    AMQP.BasicProperties properties,                    byte[] body) throws IOException {
                String routingkey=envelope.getRoutingKey();                long deliveryTag=envelope.getDeliveryTag();
                String value=new String(body);
                System.out.println(value);                        //通知broker消息被确认
                channel.basicAck(deliveryTag, false);
            }
        });

<code>channel.basicAck</code>通知broker删除<code>deliveryTag</code>对应的消息。假如,消息队列中已经存在若干条消息,如下图:

当前队列状态@2x.png

可以看到<code>queue1</code>中已有两条消息,已经持久化到磁盘,并且没有消费者。现在启动消费者,但是消费消息后没有调用<code>basicAck</code>方法。运行后出现下图:

消费者.png


未ack.png

消费者已经消费消息,但是<code>messages_unacknowledged</code>显示2条消息没有确认,<code>queue1</code>中仍有两条消息。现在重启broker用于模拟服务器出现异常又恢复的场景,会出现如下图所示:


消费者重连并且消费消息.png

消息依旧在队列.png

没有<code>ack</code>的消息又被重新发送,而且<code>ack</code>的消息一直存在内存中。下面修改消费者应用,消费消息后调用<code>ack</code>,消息会被删除。如下图:


消息删除.png


如果有多个队列订阅同一个消息,队列之间不会相互影响。如下图:


queue2队列消息仍然存在.png


<code>queue1</code>和<code>queue2</code>订阅了同一个消息,<code>queue1</code>正常消费消息并且ack,<code>queue2</code>消费消息但是没有ack。
参考:[http://www.rabbitmq.com/confirms.html#when


Consumer prefetch

consumer prefetch机制使用<code>channel</code>上未被消费确认的消息个数控制broker发送给消费者消息的速率,先看下图:


consumer.png


具体客户端实现是通过<code>channel.basicQos</code>方法设置。如果不设置此值,broker会尽最大可能将队列中的消息全部发送到consumer,消息将会堆积到consumer内存中,broker中的队列将为空。如果此时再添加一个consumer,它将不会获取到任何消息。假如设置如下:

channel.basicQos(1);

它声明当前<code>channel</code>上只能存在一个未确认的消息,从而导致大量消息堆积到队列中,consumer不能得到充分运行。在实际场景中,通过估算consumer处理消息的时间,broker与consumer之间通信时间,来确定<code>qos</code>的值。



作者:华子闲话
链接:https://www.jianshu.com/p/1ddeda30c3af


0人推荐
随时随地看视频
慕课网APP