Java实战 - 路由模式

说明:该案例为5.x之前的写法,5.x之后监听方法中某些方法改变了,不过不耽误使用4.x练习

环境:Spring Boot 2.4.1,RabbitMQ 3.8.3,Erlang 22.3,Jar版本 4.10.0

一. 路由模式:示例图

  • 生产者:发送消息到交换机并且要指定路由key(密码)
  • 消费者:将队列绑定到交换机,并且指定路由key(密码)获取消息
  • 它是一种完全匹配,只有匹配到的消费者才能消费消息
  • 消息中的路由键( routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“ dog”,则只转发 routing key 标记为“ dog”的消息,不会转发“ dog.puppy”,也不会转发“ dog.guard”等等。它是完全匹配、单播的模式。

二:准备工作与简单模式一致

三. JAVA示例

1. 创建 - 消息生产者

/**
 * @Author zhangbocong
 * @Description 路由模式 - 生产者
 * @Date 2020/12/29
 * @Param [args]
 * @Return void
 */
public static void main(String[] args) throws Exception {
    // 交换机名称
    String exchangeName = "testExchangeDirect";
    // 路由健名称
    String routingKey = "dog";
    /* 获取连接和通道 */
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //声明交换机 direct:交换机类型 主要有fanout,direct,topic三种
    channel.exchangeDeclare(exchangeName,"direct");
    /* 消息作坊 */
    String message = "Hello World!";
    channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
    System.out.println(message);
    /* 关闭连接和通道 */
    channel.close();
    connection.close();
}

2. 创建 - 消息消费者1

/**
 * @Author zhangbocong
 * @Description 路由模式 - 消费者1
 * @Date 2020/12/29
 * @Param [args]
 * @Return void
 */
public static void main(String[] args) throws Exception {
    // 交换机名称
    String exchangeName = "testExchangeDirect";
    //消息队列名称
    String queueName = "testDirectQueueOne";
    // 路由健名称
    String routingKey = "dog";
    /* 获取连接和通道 */
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //声明通道
    channel.queueDeclare(queueName,false,false,false,null);
    //绑定队列到交换机上
    channel.queueBind(queueName,exchangeName,routingKey);
    //同一时刻服务器只发送一条消息给消费端
    channel.basicQos(1);
    //定义消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    //监听队列
    channel.basicConsume(queueName,true,consumer);
    while(true){
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(message);
    }
}

3. 创建 - 消息消费者2

/**
 * @Author zhangbocong
 * @Description 路由模式 - 消费者2
 * @Date 2020/12/29
 * @Param [args]
 * @Return void
 */
public static void main(String[] args) throws Exception {
    // 交换机名称
    String exchangeName = "testExchangeDirect";
    //消息队列名称
    String queueName = "testDirectQueueTwo";
    // 路由健名称
    String routingKey = "cat";
    /* 获取连接和通道 */
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    //声明通道
    channel.queueDeclare(queueName,false,false,false,null);
    //绑定队列到交换机上
    channel.queueBind(queueName,exchangeName,routingKey);
    //同一时刻服务器只发送一条消息给消费端
    channel.basicQos(1);
    //定义消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    //监听队列
    channel.basicConsume(queueName,true,consumer);
    while(true){
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(message);
    }
}

4. 输出示例

  • 消息生产者输出:

  • 消息消费者1 输出:

  • 消息消费者2 输出: