Java实战 - 订阅模式

说明:该案例为5.x之前的写法,5.x之后监听方法中某些方法改变了,不过不耽误使用4.x练习

环境:Spring Boot 2.4.1,RabbitMQ 3.8.3,Erlang 22.3,Jar版本 4.10.0

一. 订阅模式:示例图

  • 一个生产者发送的消息会被多个消费者获取。
  • 生产者:可以将消息发送到队列或者是交换机。
  • 消费者:只能从队列中获取消息。
  • 如果消息发送到没有队列绑定的交换机上,那么消息将丢失。
  • 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。 fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。 fanout 类型转发消息是最快的 。

二:准备工作与简单模式一致

三. JAVA示例

1. 创建 - 消息生产者

/**
 * @Author zhangbocong
 * @Description 订阅模式 - 生产者,发送消息到交换机
 * @Date 2020/12/29
 * @Param [args]
 * @Return void
 */
public static void main(String[] args) throws Exception {
    // 交换机名称
    String exchangeName = "testExchangeFanout";
    /* 获取连接和通道 */
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //声明交换机 fanout:交换机类型 主要有fanout,direct,topic三种
    channel.exchangeDeclare(exchangeName,"fanout");
    /* 消息作坊 */
    String message = "Hello World!-B";
    channel.basicPublish(exchangeName,"",null,message.getBytes());
    System.out.println(message);
    /* 关闭连接和通道 */
    channel.close();
    connection.close();
}

2. 创建 - 消息消费者1

/**
 * @Author zhangbocong
 * @Description 订阅模式 - 消费者1
 * @Date 2020/12/29
 * @Param [args]
 * @Return void
 */
public static void main(String[] args) throws Exception {
    // 交换机名称
    String exchangeName = "testExchangeFanout";
    //消息队列名称
    String queueName = "testDirectQueueOne";
    /* 获取连接和通道 */
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //声明通道
    channel.queueDeclare(queueName,false,false,false,null);
    //绑定队列到交换机上
    channel.queueBind(queueName,exchangeName,"");
    //同一时刻服务器只发送一条消息给消费端
    channel.basicQos(1);
    //定义消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    //监听队列
    channel.basicConsume(queueName,true,consumer);
    while(true){
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(message);
    }
}

3. 创建 - 消息消费者2

/**
 * @Author zhangbocong
 * @Description 订阅模式 - 消费者2
 * @Date 2020/12/29
 * @Param [args]
 * @Return void
 */
public static void main(String[] args) throws Exception {
    // 交换机名称
    String exchangeName = "testExchangeFanout";
    //消息队列名称
    String queueName = "testDirectQueueTwo";
    /* 获取连接和通道 */
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //声明通道
    channel.queueDeclare(queueName,false,false,false,null);
    //绑定队列到交换机上
    channel.queueBind(queueName,exchangeName,"");
    //同一时刻服务器只发送一条消息给消费端
    channel.basicQos(1);
    //定义消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    //监听队列
    channel.basicConsume(queueName,true,consumer);
    while(true){
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(message);
    }
}

4. 输出示例

  • 消息生产者输出:

  • 消息消费者1 输出:

  • 消息消费者2 输出: