Java实战 - 消息持久化

警告:为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要同时设置Queue,Exchange和Message的持久化,否则持久化毫无意义。

一. Queue 持久化

// 4.x 和 5.x 版本是一样的
// 关键的是第二个参数设置为true,即durable=true.
channel.basicConsume(queueName,true,QueueingConsumer);

二. Message 持久化

  • 如过将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新获取之前被持久化的queue。队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启之前那个queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。
  • 消息的持久化设置方法一:
// 这里的关键是:MessageProperties.PERSISTENT_TEXT_PLAIN
// 实际上是设置BasicProperties:deliveryMode=2,代表消息被持久化;deliveryMode=1,代表消息不持久化;
channel.basicPublish(exchangeName,routingKey,
                         MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  • 消息的持久化设置方法二:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName,routingKey,properties, message.getBytes());

三. Exchange 持久化

  • 在不设置exchange的持久化对消息的可靠性来说没有什么影响,但是如果exchange不设置持久化,那么当服务重启之后,exchange将被清除,那么消息生产者就无法正常发送消息。所以还是建议设置exchange的持久化。
  • 消息的持久化设置:
    // 在这里的关键是第三个参数,设置为true,默认情况下为false
    channel.exchangeDeclare(exchangeName,"direct",true);
    

番外:如果想要设置消息请求后不再被清除,则需要配置以下信息

  • 监听队列的autoAck设置为false

    channel.basicConsume(queueName,false,consumer);
    
  • 不执行消息消费反馈

    //注意:是不执行以下代码,有这段需要被注释掉
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    
  • 参考文案:

    https://blog.csdn.net/u013256816/article/details/60875666/