【RabbitMQ】Java编写简单的生产者和消费者

作者:Leopold    访问量:51

1 准备工作

环境说明:

  • 框架:Spring-Boot 2.3.3
  • 编译器:JDK 1.8
  • 必要包版本:com.rabbitmq包 5.9.0
  • RabbitMQ版本 : 3.8.8
  • Erlang版本: 23.0.4

  • 添加Maven依赖,如有重复请自行去重,com.rabbitmq必须使用
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/javax.websocket/javax.websocket-api -->
        <dependency>
            <groupId>javax.websocket</groupId>
            <artifactId>javax.websocket-api</artifactId>
            <version>1.1</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>
  • 添加RabbitMQ用户,不使用Guest用户
    详情见文章 -》【中间件】RabbitMQ在Linux上的安装

2 生产者

场景:客户端1点击按钮后,发送一条消息到RabbitMQ
源代码:

package com.sunreal.newframe.util;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * @author leopold
 * @Description Rabbitmq 生产者
 * @date 2020年9月17日
 */
public class RabbitProducer {
	private static final String EXCHANGE_NAME = "exchange_demo";
	private static final String ROUTING_KEY = "routingkey_demo";
	private static final String QUEUE_NAME = "queue_demo";
	private static final String IP_ADDRESS = "192.168.100.56";
	private static final int PORT = 5672; // 默认端口号为5672
	private static final String USER_NAME = "leopold";
	private static final String PASSWORD = "leopold";

	private static ConnectionFactory factory;

	static {
		factory = new ConnectionFactory();
		factory.setHost(IP_ADDRESS);
		factory.setPort(PORT);
		factory.setUsername(USER_NAME);
		factory.setPassword(PASSWORD);
	}

	public static Connection getConn() throws IOException, TimeoutException {
		return factory.newConnection(); // 建立连接
	}

	/**
	 * 获取信道
	 * 
	 * @return
	 * @throws IOException
	 * @throws TimeoutException
	 */
	public static Channel getChannel(Connection connection) throws IOException, TimeoutException {
		Channel channel = connection.createChannel(); // 创建信道
		channel = connection.createChannel(); // 创建信道
		// 创建一个type="direct"、持久化的、非自动删除的交换器
		channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
		// 创建一个持久化、非排他的、非自动删除的队列
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		// 将交换器和队列通过路由绑定
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
		return channel;
	}

	/**
	 * 发送消息
	 * 
	 * @param msg
	 * @param channel
	 * @throws IOException
	 */
	public static void sendMsg(String msg, Channel channel) throws IOException {
		channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
	}

	/**
	 * 关闭连接
	 * 
	 * @param channel
	 * @param connection
	 * @throws IOException
	 * @throws TimeoutException
	 */
	public static void close(Channel channel, Connection connection) throws IOException, TimeoutException {
		if (channel != null) {
			channel.close();
		}

		if (connection != null) {
			connection.close();
		}
	}

	public static void main(String[] args) throws IOException, TimeoutException {
		// 测试发送第一条消息
		Connection conn = null;
		Channel channel = null;
		try {
			conn = RabbitProducer.getConn();
			channel = RabbitProducer.getChannel(conn);
			String message = "Hello,World!";
			RabbitProducer.sendMsg(message, channel);

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			RabbitProducer.close(channel, conn);
		}

		// 测试发送第二条消息
		try {
			conn = RabbitProducer.getConn();
			channel = RabbitProducer.getChannel(conn);
			String message = "Hello,World2!";
			RabbitProducer.sendMsg(message, channel);

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			RabbitProducer.close(channel, conn);
		}
	}
}

3 消费者

场景:页面与客户端2开启Socket,实时消费队列,并将消费结果通过socket发送给前台页面。

3.1 application.yml

spring:
  rabbitmq:
    host: 【RabbitMQ的IP】
    #账号密码 默认用户是Guest,但可能需要开启远程访问(暂不清楚)。
    username: 【RabbitMQ的用户名,默认Guest】
    password: 【RabbitMQ的密码,默认Guest】
    #rbbitmq虚拟主机路径
    virtual-host: /
    #rabbitmq的端口号 也是默认的
    port: 5672
    listener:
      simple:
        acknowledge-mode: manual #手动接受数据
        #max-concurrency: 10 #最大并发
        #prefetch: 1 #限流

3.2 开启RabbitMQ

P.S. 此处为Spring-Boot启动器,请根据实际情况自行粘贴,尤其注意【启动器.class】,别全都粘贴过去了。。

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;

@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
@EnableRabbit
/**开启rabbitmq*/
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }


    /**消息的转换器
     * 设置成json 并放入到Spring中
     * */
    @Bean
    public MessageConverter messageConverter(){

        return new Jackson2JsonMessageConverter();

    }
}

3.3 注册并开启WebSockekt

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 开启WebSocket支持
 * @author leopold
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

3.4 配置WebScoket

注意事项:
1)@Queue@Exchange等注解请自行根据生产者设定的值输入,此处请看【2 生产者】,类中静态变量的值。
2)receiveUserMessage()方法中的形参 String msg,可替换为 @Payload UserTest userTest,其中 UserTest为接收参数的类(防止接收参数过多),切记添加@Payload注解。

P.S. 请自行将system.out.println换成log日志输出(随便。。)

package com.sunreal.demo.socket;


import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;

@ServerEndpoint(value = "/websocket")
@Component
public class WebSocketServer {

    //静态变量 用于记录当前在线连接数 应该把它设计成线程安全
    private static int onlineCount=0;

    /**Concurrent包下的 写时复制Set 用它作于存储客户端对应的MyWebSocket对象*/
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet= new CopyOnWriteArraySet<WebSocketServer>();


    /**与某个客户端的链接会话,需要通过它来给客户端发送数据*/

    private Session session;
    /**
     * 参数1:Message 可以获得消息的内容字节 还可以获得消息的其他属性
     * 参数2:可以写确定接受的参数类型比如User
     * 参数3:Channel 通道
     *      com.rabbitmq.client.Channel必须是这个包下
     *      通过这个参数可以拒绝消息
     *      让rabbitmq再发给别的消费者
     *
     * 使用@RabbitListener 可以绑定交换机  路由键 管道
     *
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue_demo",durable = "true"),
            exchange = @Exchange(name = "exchange_demo",durable = "true",type = "direct"),
            key = "routingkey_demo"
    )
    )
    @RabbitHandler//注解意思:如果有消息过来 需要消费的时候才会调用该方法
    /**如果已知传递的参数是 UserTest对象可以通过该注解
     * 消息头需要用map接受
     * 既然是手动接受消息 就需要设置channel
     * */
    public void receiveUserMessage(String msg, @Headers Map<String,Object> headers, Channel channel) throws IOException {
        //sendMessage(message.toString());
        System.out.println(msg);
        onMessage(msg);//调用消息方法将数据传给他

        Long deliveryTag= (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手动接受并告诉rabbitmq消息已经接受了  deliverTag记录接受消息 false不批量接受
        channel.basicAck(deliveryTag,false);

        /**
         * basicReject()
         * 参数1: 消息标签
         * 参数2: true 将消息从新放入队列  false 接受到并将消息抛弃
         *
         *
         try {
         channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
         System.out.println(message);
         } catch (IOException e) {
         e.printStackTrace();
         }
         */

    }

    /**服务器端推送消息*/
    public void sendMessage(String message){
        try {
            System.out.println("session可否显示出来"+session);
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 连接建立成功调用的方法
     * */
    @OnOpen
    public void onOpen(Session session){
        this.session=session;
        webSocketSet.add(this);
        System.out.println("有新的连接加入!当前在线人数为"+getOnlineCount());
        System.out.println(session);
    }

    /**
     * 连接关闭调用的方法
     * */
    @OnClose
    public void onClose(){
        /**从安全Set中 移除当前连接对象*/
        webSocketSet.remove(this);
        subOnlineCount();
        System.out.println("有一连接关闭!当前在线人数为"+getOnlineCount());
    }



    @OnMessage
    public void onMessage(String message){

        System.out.println("来自客户端的消息:"+message);

        for (WebSocketServer webSocketServer:webSocketSet){
            webSocketServer.sendMessage(message);
        }

    }


    public static int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

3.5 index.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>websocket通讯</title>
</head>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>

<script>
    var socket;
    console.log(typeof socket)
    if (typeof (WebSocket) == "undefined") {
        alert("您的浏览器不支持WebSocket");
    } else {
        alert("您的浏览器支持WebSocket");

        socket = new WebSocket("ws://localhost:8090/websocket");

        socket.onopen = function () {
            alert("开启Socket 监听")
        };

        //获得消息事件
        socket.onmessage = function (msg) {
            $("#show").append(msg.data + "<br/>");
            //console.log("收到消息:" + msg.data);
            //发现消息进入    调后台获取
            //getCallingList();
        };

        //关闭事件
        socket.onclose = function () {
            console.log("Socket已关闭");
        };
        //发生了错误事件
        socket.onerror = function () {
            alert("Socket发生了错误");
        };
        /**
         $(window).unload(function(){
                socket.close();
            });
         */
    }
</script>
<body>
<div >

</div>
</body>

</html>


如果您觉得此文章对您有帮助,欢迎评论转载!

您的每一次评论与转载,都是对作者极大的鼓励!

注意:除非注明,否则均为[Leopold's Blog]原创文章,转载必须以链接形式标明本文链接。

本文链接:https://www.yusong.site/leopold/785.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注