Java实战 - 简单模式

说明:该案例为5.x之前的写法,5.x之后监听方法中某些方法改变了,不过不耽误使用4.x练习

环境:Spring Boot 2.4.1,RabbitMQ 3.8.3,Erlang 22.3,Jar版本 4.10.0

一. 简单模式:示例图

  • 一个生产者,一个消费者

二. 准备工作

1. 设置虚拟主机信息,系统初始主机名称 “/“,也可自由设置,方法如下:

2. 添加Maven依赖

<!-- RabbitMQ包start -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.10.0</version>
</dependency>
<!-- RabbitMQ包end -->

3. 定义 RabbitMQ 地址及端口号

/**
 * @Author zhangbocong
 * @Description ElasticsearchSwitch 地址
 * @Date 2020/12/29
 */
public static final String RABBITMQ_URL = "127.0.0.1";

/**
 * @Author zhangbocong
 * @Description ElasticsearchSwitch 端口号
 * @Date 2020/12/29
 */
public static final Integer RABBITMQ_PORT = 5672;

4. 创建RabbitMQ链接

package com.example.spring_boot_demo.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author :zhangbocong
 * @version :V1.0
 * @program :spring_boot_demo
 * @date :Created in 2020年12月29日 17:34
 * @description :RabbitMQ 工具类
 */
public class RabbitMQUtils {
    /**
     * @Author zhangbocong
     * @Description 创建RabbitMQ链接
     * @Date 2020/12/29
     * @Param []
     * @Return com.rabbitmq.client.Connection
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务器地址
        factory.setHost(ApiCountUtils.RABBITMQ_URL);
        //设置服务器端口号
        factory.setPort(ApiCountUtils.RABBITMQ_PORT);
        //设置vhost虚拟主机名称
        factory.setVirtualHost("zhangbocong");
        //设置vhost虚拟账户
        factory.setUsername("guest");
        //设置vhost虚拟密码
        factory.setPassword("guest");
        //通过工厂获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

三. JAVA示例

1. 创建消息生产者

package com.example.spring_boot_demo.controller;

import com.example.spring_boot_demo.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @author :zhangbocong
 * @version :V1.0
 * @program :spring_boot_demo
 * @date :Created in 2020年12月29日 17:56
 * @description :Rabbit 消息生产者
 */
public class RabbitSendOut {
    public static void main(String[] args) throws Exception {
        //消息队列名称
        String queueName = "test_direct_queue";
        //获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明创建队列
        channel.queueDeclare(queueName,false,false,false,null);
        //消息内容
        String message = "Hello World!-a";
        channel.basicPublish("",queueName,null,message.getBytes());
        System.out.println("发送消息:"+message);
        /* 关闭连接和通道 */
        channel.close();
        connection.close();
    }
}

2. 创建消息消费者

package com.example.spring_boot_demo.controller;

import com.example.spring_boot_demo.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

/**
 * @author :zhangbocong
 * @version :V1.0
 * @program :spring_boot_demo
 * @date :Created in 2020年12月29日 17:57
 * @description :Rabbit 消息消费者
 */
public class RabbitConsumption {
    //消费者消费消息
    public static void main(String[] args) throws Exception {
        //消息队列名称
        String queueName = "test_direct_queue";
        /* 获取连接和通道 */
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明通道
        channel.queueDeclare(queueName,false,false,false,null);
        //定义消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //监听队列
        channel.basicConsume(queueName,true,consumer);

        while(true){
            //这个方法会阻塞住,直到获取到消息
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("接收到消息:"+message);
        }
    }
}

3. 执行效果