Java实战 - 动态配置

背景

  • 目前有一个店铺线上平台,需要实时与用户进行沟通,而且用户数量是不断增加的,这时我们该如何设计解决方案?通常我们配置 Exchange 和 Queue 是固定的,那么在情况下,该何如动态新增 Exchange 和 Queue ?如何动态监听 Queue 的信息实时反馈给店铺和商家呢?

设计方案

  • 每个商家与用户之间有一个消息队列(实际上消息队列 = 用户);

  • 在用户注册时,新建用户与商家之间的消息队列;

  • 监听所有用户消息(消息队列);

解决方案

0. 添加 Maven 依赖

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.6</version>
</dependency>
<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>3.6.0</version>
</dependency>

1. 创建消息消费者处理程序(实施监听):MessageConsumerHandler

package com.example.rabbitMQ;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * @author :zhangbocong
 * @version :V1.0
 * @program :spring_boot_demo
 * @date :Created in 2021年1月21日
 * @description :RabbitMQ 消息消费者处理程序(实施监听)
 */
@Component
public class MessageConsumerHandler implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("新消息:"+new String(message.getBody())+" ,Queue来源----->"
                                   + message.getMessageProperties().getConsumerQueue());
    }
}

2. 获取所有”配置路径”下的 Queue 名称:QueueService

package com.example.rabbitMQ;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @author :zhangbocong
 * @version :V1.0
 * @program :spring_boot_demo
 * @date :Created in 2021年1月22日 17:19
 * @description :获取链接下所有消息队列名称
 */
@Service
public class QueueService {
    private static final Logger logger = LoggerFactory.getLogger(QueueService.class);

    private String url = "http://localhost:15672/api/queues"; // Rabbit提供的API
    private String username = "guest"; // 登录名
    private String password = "guest"; // 密码

    /**
     * @Author zhangbocong
     * @Description 根据API获得相关的RabbitMQ信息
     * @Date 2021/1/22
     * @Param []
     * @Return java.util.List<java.lang.String> QueueName 集合
     */
    public List<String> getMQJSONArray() {
        HttpGet httpPost = new HttpGet(url);
        CloseableHttpClient pClient = getBasicHttpClient();
        CloseableHttpResponse response = null;
        JSONArray jsonArray = null;
        List queueNameList = new ArrayList();
        try {
            response = pClient.execute(httpPost);
            StatusLine status = response.getStatusLine();
            int state = status.getStatusCode();
            if (state == HttpStatus.SC_OK) {
                String string = EntityUtils.toString(response.getEntity());
                jsonArray = (JSONArray) JSONObject.parse(string);
                if (null != jsonArray) {
                    for (int i = 0; i < jsonArray.size(); i++) {
                        String name = (String) jsonArray.getJSONObject(i).get("name");
                        queueNameList.add(name);
                    }
                    System.out.println("监听消息队列:" + queueNameList);
                    return queueNameList;
                }
            } else {
                System.out.println("请求错误,返回:" + state + "(" + url + ")");
            }
        } catch (Exception e) {
            logger.error("地址url:" + url + ",异常:" + e.toString());
        } finally {
            closeAll(response, pClient);
        }
        return queueNameList;
    }

    /**
     * @Author zhangbocong
     * @Description 创建Http链接
     * @Date 2021/1/22
     * @Param []
     * @Return org.apache.http.impl.client.CloseableHttpClient
     */
    private CloseableHttpClient getBasicHttpClient() {
        // 创建HttpClientBuilder
        HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
        // 设置BasicAuth
        CredentialsProvider provider = new BasicCredentialsProvider();
        AuthScope scope = new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM);
        UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(username, password);
        provider.setCredentials(scope, credentials);
        httpClientBuilder.setDefaultCredentialsProvider(provider);
        return httpClientBuilder.build();
    }

    /**
     * @Author zhangbocong
     * @Description 关闭链接
     * @Date 2021/1/22
     * @Param [response, pClient]
     * @Return void
     */
    public void closeAll(CloseableHttpResponse response, CloseableHttpClient pClient) {
        if (response != null) {
            try {
                response.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        try {
            pClient.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3. 创建 RabbitMQ 动态配置:RabbitConfig

package com.example.config;

import com.example.rabbitMQ.MessageConsumerHandler;
import com.example.rabbitMQ.QueueService;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;

import java.io.IOException;
import java.util.List;

/**
 * @author :zhangbocong
 * @version :V1.0
 * @program :spring_boot_demo
 * @date :Created in 2021年1月21日
 * @description :RabbitMQ 动态配置
 */
@Configuration
public class RabbitConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;
    @Autowired
    MessageConsumerHandler messageConsumerHandler;
    @Autowired
    QueueService queueService;

    @Bean
    public RabbitAdmin rabbitAdmin() {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public void start() {
        try {
            mqMessageContainer().start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * @Author zhangbocong
     * @Description 监听容器配置
     * @Date 2021/1/22
     */
    @Bean
    @Order(value = 2)
    public SimpleMessageListenerContainer mqMessageContainer() throws AmqpException, IOException {
        SimpleMessageListenerContainer container
                             = new SimpleMessageListenerContainer(connectionFactory);
        List<String> list = queueService.getMQJSONArray();
        container.setQueueNames(list.toArray(new String[list.size()]));
        container.setExposeListenerChannel(true);
        container.setPrefetchCount(1000);//设置每个消费者获取的最大的消息数量
        container.setConcurrentConsumers(1);//消费者个数
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式为手动确认,AUTO为自动确认
        container.setMessageListener(messageConsumerHandler);//监听处理类
        return container;
    }
}

4. 创建 OKHttpClientUtil 工具类

package com.example.utils;

import okhttp3.*;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class OKHttpClientUtil {
    private static Long connTimeOut = 5L;
    private static Long readTimeOut = 20L;
    private static Long writeTimeOut = 10L;
    public static OkHttpClient client = null;

    static {
        client = new OkHttpClient.Builder()
            .connectTimeout(connTimeOut, TimeUnit.SECONDS)
            .readTimeout(readTimeOut, TimeUnit.SECONDS)
            .writeTimeout(writeTimeOut, TimeUnit.SECONDS)
            .retryOnConnectionFailure(true)
            .build();
    }

    public OKHttpClientUtil() {}

    public static String doGet(String url, Map<String, String> headers,
                                                 Map<String, String> param) throws Exception {
        StringBuffer urlS = new StringBuffer(url);
        if(param != null) {
            urlS.append("?");
            Iterator iterator = param.entrySet().iterator();
            while(iterator.hasNext()) {
                Map.Entry<String, String> e = (Map.Entry)iterator.next();
                urlS.append((String)e.getKey()).append("=").append((String)e.getValue() + "&");
            }
            urlS = new StringBuffer(urlS.substring(0,urlS.length()-1));
        }
        Request.Builder requestBuilder = new Request.Builder();
        if(headers != null && headers.size() > 0) {
            Iterator iterator = headers.keySet().iterator();
            while(iterator.hasNext()) {
                String key = (String)iterator.next();
                requestBuilder.addHeader(key, (String)headers.get(key));
            }
        }
        Request request = (requestBuilder).url(urlS.toString()).build();
        Response response = client.newCall(request).execute();
        String responseStr = response.body() == null?"":response.body().string();
        return responseStr;
    }
}

5. 创建 ApplicationContext 工具类:ApplicationContextUtil

package com.example.utils;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author :zhangbocong
 * @version :V1.0
 * @program :spring_boot_demo
 * @date :Created in 2021年1月22日 18:06
 * @description :applicationContext 工具类
 */
@Component
public class ApplicationContextUtil implements ApplicationContextAware {
    private static org.springframework.context.ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws BeansException {
        if(ApplicationContextUtil.applicationContext == null) {
            ApplicationContextUtil.applicationContext = applicationContext;
        }
        System.out.println("========ApplicationContext配置成功========");
    }

    //获取 applicationContext
    public static org.springframework.context.ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    //通过 name 获取 Bean
    public static Object getBean(String name){
        return getApplicationContext().getBean(name);
    }

    //通过 class 获取 Bean
    public static <T> T getBean(Class<T> clazz){
        return getApplicationContext().getBean(clazz);
    }

    //通过 name,以及 Clazz 返回指定的Bean
    public static <T> T getBean(String name,Class<T> clazz){
        return getApplicationContext().getBean(name, clazz);
    }
}

6. 创建消息生产中心(生产者):MessageProducerController

package com.example.rabbitMQ;

import com.example.utils.ApplicationContextUtil;
import com.example.utils.OKHttpClientUtil;
import com.example.vo.ResponseResult;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * @author :zhangbocong
 * @version :V1.0
 * @program :spring_boot_demo
 * @date :Created in 2021年1月22日 17:39
 * @description :消息生产中心(生产者)
 */
@RestController
@RequestMapping("messageProducer")
public class MessageProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Resource
    private RabbitAdmin rabbitAdmin;
    @Resource
    private ApplicationContextUtil applicationContextUtil;
    @Autowired
    private QueueService queueService;

    //服务链接地址(本地就是127.0.0.1)
    @Value("${server.address}")
    private String host;
    //服务端口号
    @Value("${server.port}")
    private Integer port;

    /**
     * @Author zhangbocong
     * @Description 发送消息
     * @Date 2021/1/22
     * @Param [queueName 消息队列名称, message 消息体]
     * @Return java.lang.String
     */
    @RequestMapping("messageProducer")
    public ResponseResult messageProducer(@RequestParam String queueName
                                             ,@RequestParam String message){
        // 验证消息队列是否存在
        containsQueue(queueName);
        // 发送消息
        rabbitTemplate.convertAndSend(queueName,message);
        return ResponseResult.success();
    }

    /**
     * @Author zhangbocong
     * @Description 消息队列认证 - 如果不存在则进行创建
     * @Date 2021/1/22
     * @Param [queueName 消息队列名称]
     * @Return void
     */
    public void containsQueue(String queueName){
        //判断队列是否存在
        Properties properties = rabbitAdmin.getQueueProperties(queueName);
        if(properties == null){
            // 声明一个持久化 Queue
            rabbitAdmin.declareQueue(new Queue(queueName, true, false, false, null));
            //新启动一个线程,通知消费者新增listener
            new Thread(new Runnable() {
                @Override
                public void run() {
                    String res = callAddNewListener(queueName);
                    if(!StringUtils.isEmpty(res)){
                        System.out.println("-->>调用创建新的 listener feign 失败");
                    }
                }
            }).start();
        }
    }

    /**
     * @Author zhangbocong
     * @Description 创建链接,通知消费者更新Queue列表
     * @Date 2021/1/22
     * @Param [queueName 消息队列名称]
     * @Return java.lang.String
     */
    private String callAddNewListener(String queueName){
        String url = "http://"+host+":"+port+"/messageProducer/addNewListener";
        Map<String,String> param = new HashMap<String,String>();
        param.put("queueName",queueName);
        try {
            OKHttpClientUtil.doGet(url,null,param);
        }catch (Exception e){
            e.printStackTrace();
            return "添加listener失败!!!";
        }
        return null;
    }

    /**
     * @Author zhangbocong
     * @Description 添加新的监听参数
     * @Date 2021/1/22
     * @Param [queueName 消息队列名称]
     * @Return java.lang.String
     */
    @RequestMapping("addNewListener")
    public String addNewListener(String queueName){
        SimpleMessageListenerContainer container =
                           applicationContextUtil.getBean(SimpleMessageListenerContainer.class);
        List<String> queueNameList = queueService.getMQJSONArray();
        int count = 0;
        while(!queueNameList.contains(queueName)){
            queueNameList = queueService.getMQJSONArray();
            count++;
            try {
                Thread.sleep(100);
            }catch (Exception e){
                e.printStackTrace();
            }
            if(count >=3){
                return "添加监听失败!!!";
            }
        }
        container.addQueueNames(queueName);
        long consumerCount = container.getActiveConsumerCount();
        return "修改成功:现有队列监听者["+consumerCount+"]个";
    }
}

参考地址

https://blog.csdn.net/tianjiliuhen/article/details/103287307