Java实战 - work模式

说明:该案例为5.x之前的写法,5.x之后监听方法中某些方法改变了,不过不耽误使用4.x练习

环境:Spring Boot 2.4.1,RabbitMQ 3.8.3,Erlang 22.3,Jar版本 4.10.0

一. work模式:示例图

  • 一个生产者,多个消费者,每个消费者获取到的消息唯一

二:准备工作与简单模式一致

三. JAVA示例

1. 创建 - 消息生产者

/**
 * @Author zhangbocong
 * @Description work模式 - 生产者
 * @Date 2020/12/29
 * @Param [args]
 * @Return void
 */
public static void main(String[] args) throws Exception {
    //消息队列名称
    String queueName = "test_direct_queue";
    /* 获取连接和通道 */
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //声明队列
    channel.queueDeclare(queueName,false,false,false,null);
    /* 消息作坊 */
    String message = "";
    for(int i = 0; i<10; i++){
        message = "" + i;
        channel.basicPublish("",queueName,null,message.getBytes());
        System.out.println("发送消息:"+message);
        Thread.sleep(i);
    }
    /* 关闭连接和通道 */
    channel.close();
    connection.close();
}

2. 创建 - 消费者1 - 手动模式

  • 消费者从消息队列获取消息后,服务端并没有标记为成功消费,消费者成功消费后需要将状态返回到服务端
/**
 * @Author zhangbocong
 * @Description work模式 - 消费者1 - 手动模式
 * @Date 2020/12/29
 * @Param [args]
 * @Return void
 */
public static void main(String[] args) throws Exception {
    //消息队列名称
    String queueName = "test_direct_queue";
    /* 获取连接和通道 */
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //声明通道
    channel.queueDeclare(queueName,false,false,false,null);
    //同一时刻服务器只发送一条消息给消费端
    channel.basicQos(1);
    //定义消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    //监听队列
    channel.basicConsume(queueName,false,consumer);

    while(true){
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println("recive1:"+message);
        Thread.sleep(100);
        //消息消费完给服务器返回确认状态,表示该消息已被消费
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
}

3. 创建 - 消费者2 - 自动模式

  • 消费者从消息队列获取消息后,服务端就认为该消息已经成功消费
/**
 * @Author zhangbocong
 * @Description work模式 - 消费者2 - 自动模式
 * @Date 2020/12/29
 * @Param [args]
 * @Return void
 */
public static void main(String[] args) throws Exception {
    //消息队列名称
    String queueName = "test_direct_queue";
    /* 获取连接和通道 */
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //声明通道
    channel.queueDeclare(queueName,false,false,false,null);
    //同一时刻服务器只发送一条消息给消费端
    channel.basicQos(1);
    //定义消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    //监听队列
    channel.basicConsume(queueName,true,consumer);

    while(true){
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println("recive1:"+message);
        Thread.sleep(10);
    }
}

4. 自动模式 & 手动模式的区别

  • 自动模式:

    channel.basicConsume(QUEUE_NAME,true,consumer);
    while(true){
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println("recive1:"+message);
      Thread.sleep(10);
      //无需反馈
      //channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }
    
  • 手动模式:

    channel.basicConsume(QUEUE_NAME,false,consumer);
    while(true){
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println("recive1:"+message);
      Thread.sleep(10);
      //消息消费完给服务器返回确认状态,表示该消息已被消费(RPC)
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    }