Java实战 - 5.x实践

说明:本案例提供4种(String、HashMap、List、实体类)模板交互方式

环境:Spring Boot 2.4.1,RabbitMQ 3.8.3,Erlang 22.3,Jar版本 2.4.1(跟随boot版本)

使用 Maven 依赖:

  • 该依赖包含 amqp-client 5.10.0 版本Jar包
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

一. 自定义转换工具包:ConversionUtil

package com.example.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import java.io.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ConversionUtil {
    /**
     * Object转换byte[]
     * 注意:该方法不可以转实体类,否则监听消息在转换类型时无法解析,引起error
     */
    public static byte[] objectConversionByte(Object objList) {
        String listString = JSON.toJSONString(objList, true);
        return listString.getBytes();
    }

    /**
     * Object实体类转换byte[]
     */
    public static byte[] classConversionByte(Object clazz) throws IOException {
        ByteArrayOutputStream bos=new ByteArrayOutputStream();
        ObjectOutputStream oos=new ObjectOutputStream(bos);
        oos.writeObject(clazz);
        return bos.toByteArray();
    }

    /**
     * byte[]转换List
     */
    public static <T> List<T> byteConversionList(byte[] body,Class<T> clazz) {
        List<T> list = JSON.parseArray(new String(body), clazz);
        return list;
    }

    /**
     * byte[]转换HashMap
     */
    public static <T, K> HashMap<K, T> byteConversionMap(byte[] body) {
        Map<K, T> map = JSON.parseObject(new String(body), new TypeReference<Map<K, T>>() {});
        return (HashMap<K, T>) map;
    }


    /**
     * byte[]转换Object实体类
     */
    public static Object byteConversionClass(byte[] body) throws IOException {
        ByteArrayInputStream bin=new ByteArrayInputStream(body);
        ObjectInputStream ois=new ObjectInputStream(bin);
        Object restorePerson = null;
        try {
            restorePerson = ois.readObject();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return restorePerson;
    }
}

二. 自定义生产者

public static void main(String[] args) throws Exception {
    // 交换机名称
    String exchangeName = "declareExchangeDirect";
    // 路由健名称
    String routingKey = "dog";
    /* 获取连接和通道 */
    Connection connection = RabbitMQUtils.getConnection();
    Channel channel = connection.createChannel();
    // 声明交换机 direct:交换机类型 主要有fanout,direct,topics三种
    // 注意:如果有声明过交换机,不需要重复声明
    channel.exchangeDeclare(exchangeName,"direct",true);
    /* 消息作坊 */
    /* 方法1:String类型 */
    String message = "Hello World!:"+ new Date().getTime();
    channel.basicPublish(exchangeName,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,
                                                  ConversionUtil.objectConversionByte(message));
    System.out.println(message);

    /* 方法2:实体类型  */
//    UserTestEntity user = new UserTestEntity(1,"小黑");
//    channel.basicPublish(exchangeName,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,
//                                                  ConversionUtil.classConversionByte(user));

    /* 方法3:HashMap类型 */
//    Map<String,Object> map = new HashMap<>();
//    map.put("user","小明");
//    map.put("date",2021-1-20);
//    map.put("message","哈哈哈");
//    channel.basicPublish(exchangeName,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,
//                                                  ConversionUtil.objectConversionByte(map));
//    System.out.println(map);

    /* 方法4:List类型 */
//    List<UserTestEntity> userList = new ArrayList<>();
//    userList.add(new UserTestEntity(1,"小黑"));
//    userList.add(new UserTestEntity(2,"小红"));
//    channel.basicPublish(exchangeName,routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,
//                                                ConversionUtil.objectConversionByte(userList));
//    System.out.println(userList);
    /* 关闭连接和通道 */
    channel.close();
    connection.close();
}

三. 自定义消费者

package com.example.rabbitMQ;

import com.example.entity.UserTestEntity;
import com.example.utils.ConversionUtil;
import com.example.utils.RabbitMQUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class RabbitConsumption extends DefaultConsumer {
    public RabbitConsumption(Channel channel) {
        super(channel);
    }

    public void handleDelivery(String consumerTag, Envelope envelope,
                              AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("consumerTag: " + consumerTag);
        System.out.println("envelope: " + envelope);
        System.out.println("properties: " + properties);
        /* 方法1:String类型 */
        System.out.println("body: " + new String(body));

        /* 方法2:实体类型 */
//        ObjectMapper objectMapper = new ObjectMapper();
//        UserTestEntity user = objectMapper.convertValue(ConversionUtil.byteConversionClass(body), UserTestEntity.class);
//        System.out.println(user);

//        /* 方法3:HashMap类型 */
//        Map<String, String> map = ConversionUtil.byteConversionMap(body);
//        System.out.println(map);

//        /* 方法4:List类型 */
//        List<UserTestEntity> userList
//                              = ConversionUtil.byteConversionList(body,UserTestEntity.class);
//        System.out.println(userList);
    }

    public static void main(String[] args) throws Exception {
        // 交换机名称
        String exchangeName = "declareExchangeDirect";
        //消息队列名称
        String queueName = "declareQueueOne";
        // 路由健名称
        String routingKey = "dog";
        /* 获取连接和通道 */
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明通道(注意:如果消息队列已经被创建/声明过,则不需要再次创建/声明)
        channel.queueDeclare(queueName,true,false,false,null);
        //绑定队列到交换机上,并指定路由键routingKey - dog(注意:如果消息队列已经绑定过交换机,则不需要再次绑定)
        channel.queueBind(queueName,exchangeName,routingKey);
        channel.basicConsume(queueName, true, new RabbitConsumption(channel));
    }
}

概念解释:

  • consumerTag:消费者标签,重启项目工程后,标签会变化
  • envelope:标签包装,其中包含了exchangeName、routingKey等信息
  • properties:消息属性,其中delivery-mode=2时,表示该消息被持久化,未被消费时不会被清除
  • body:消息体