Java实战 - RabbitTemplate

说明一:学习此教程前,一定要先学习前几篇教程,了解RabbitMQ所有模式的规则,谨记!!!

说明二:此教程仅为“通配符”模式,其他模式与此教程大同小异,只是配置交换机方式不同。

环境:Spring Boot 2.4.1,RabbitMQ 3.8.3,Erlang 22.3,Jar版本 2.4.1(跟随boot版本)

一. 添加 Maven 依赖

  • 该依赖包含 amqp-client 5.10.0 版本Jar包
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • 注意:添加后,会因新(5.x)老(4.x)Jar包的冲突问题,启动项目报错
  • 解决方案1:把之前的练习时添加的“amqp-client”包删除,改用5.x的写法
  • 解决方案2:把原来的4.x版本的Jar包和代码都注释掉,直接用RabbitTemplate的方法
  • 解决方案3:网上说用idea的Maven Helper工具的exclusions(排除)方式解决Jar包冲突的问题,不过没成功

二. 添加 Application.properties 配置

# Rabbit 地址
spring.rabbitmq.host=127.0.0.1
# Rabbit 端口号
spring.rabbitmq.port=5672
# Rabbit 虚拟地址(注意:后方不要存在空格,否则会导致启动后监听报错)
spring.rabbitmq.virtual-host=/
# Rabbit 账户
spring.rabbitmq.username=guest
# Rabbit 密码
spring.rabbitmq.password=guest

三. 添加 RabbitConfig 配置

package com.example.rabbitmq_demo.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    /**
     * 项目消息队列名称
     */
    public static final String QUEUE_NAME_PROJECT = "qbb.project";
    /**
     * 财务消息队列名称
     */
    public static final String QUEUE_NAME_FINANCE = "qbb.finance";
    /**
     * 库存消息队列名称
     */
    public static final String QUEUE_NAME_WAREHOUSE = "qbb.warehouse";
    /**
     * 交换机名称
     */
    public static final String TOPIC_EXCHANGE_NAME = "qbb.exchange";
    /**
     * 当前队列消费者最大并发线程数量,最小为1,最大不建议超过40
     */
    public static final int CONCURRENT_COUNT = 40;

    //创建项目消息队列
    @Bean(QUEUE_NAME_PROJECT)
    public Queue projectQueue() { return new Queue(QUEUE_NAME_PROJECT); }

    //创建财务消息队列
    @Bean(QUEUE_NAME_FINANCE)
    public Queue financeQueue() { return new Queue(QUEUE_NAME_FINANCE); }

    //创建库房消息队列
    @Bean(QUEUE_NAME_WAREHOUSE)
    public Queue warehouseQueue() { return new Queue(QUEUE_NAME_WAREHOUSE); }

    //创建“通配符”模式交换机
    @Bean
    public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE_NAME); }

    //绑定“项目”消息队列到交换机并标识routing key
    @Bean("project")
    Binding bindingExchangeProjectMessage() {
        return BindingBuilder.bind(projectQueue()).to(topicExchange()).with(QUEUE_NAME_PROJECT);
    }

    //绑定“财务”消息队列到交换机并标识routing key
    @Bean("finance")
    Binding bindingExchangeFinanceMessage() {
        return BindingBuilder.bind(projectQueue()).to(topicExchange()).with(QUEUE_NAME_FINANCE);
    }

    //绑定“库存”消息队列到交换机并标识routing key
    @Bean("warehouse")
    Binding bindingExchangeWarehouseMessage() {
        return BindingBuilder.bind(projectQueue()).to(topicExchange()).with(QUEUE_NAME_WAREHOUSE);
    }

    /**
     * 监听容器配置
     */
    @Bean("customContainerStockFactory")
    public SimpleRabbitListenerContainerFactory containerFactory
                           (SimpleRabbitListenerContainerFactoryConfigurer configurer,
                               ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConcurrentConsumers(CONCURRENT_COUNT);
        factory.setMaxConcurrentConsumers(CONCURRENT_COUNT);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

四. 创建“生产者”和“消费者”

  • 生产者规则:
    // 消息体可以是多种类型,例如:String、Map、List、实体类等
    rabbitTemplate.convertAndSend(“交换机名称”, “消息队列名称”, 消息体);
    
  • 消费者规则一:监听一个队列
    @RabbitHandler
    @RabbitListener(queues = “消息队列名称”, containerFactory = "配置链接工厂的Bean名称")
    // 注意:方法接值类型要与生产者消息体类型一致
    
  • 消费者规则二:监听多个队列
    @RabbitHandler
    @RabbitListener(queues = {“队列名称1”,...}, containerFactory = "配置链接工厂的Bean名称")
    // 注意:方法接值类型要与生产者消息体类型一致
    
  • 完整案例:
package com.example.rabbitmq_demo.test;

import com.example.rabbitmq_demo.config.RabbitConfig;
import com.example.rabbitmq_demo.entity.UserTestEntity;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("rabbitMQTest")
public class RabbitMQTest {
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 生产者
     */
    @RequestMapping("hello")
    public void hello() {
        //对象被默认序列化之后发送出去
        UserTestEntity user = new UserTestEntity(1,"小明");
        rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE_NAME,
                                             RabbitConfig.QUEUE_NAME_PROJECT, user);
        System.out.println("发送完毕!!!");
    }

    /**
     * 消费者
     * @Param [user 类型要与发送时的消息类型一致]
     */
    @RabbitHandler
    @RabbitListener(queues = RabbitConfig.QUEUE_NAME_PROJECT,
                         containerFactory = "customContainerStockFactory")
    public void receive(UserTestEntity user){
        System.out.println("message:"+user);
    }
}
  • 控制台输出:

番外:RabbitTemplate还有很多配置及配置方法,不仅限于此种,例如:Application.properties的地址配置至少就有三种配置方法