[TOC]
一、引言
模块之间的耦合度过高,一旦一个模块宕机后,全部功能都不能用了,并且同步通讯的成本过高,用户体验差。
RabbitMQ引言 |
|
二、RabbitMQ介绍
市面上比较火爆的几款MQ:
ActiveMQ,RocketMQ,Kafka,RabbitMQ。
-
语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多门语言,RabbitMQ支持多种语言。
-
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
-
消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
-
学习成本:RabbitMQ非常简单。
RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal,由Erlang语言开发(并发的编程语言)
RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。
RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。RabbitMQ跟Erlang和AMQP有关。下面简单介绍一下Erlang和AMQP。
Erlang是一门动态类型的函数式编程语言,它也是一门解释型语言,由Erlang虚拟机解释执行。从语言模型上说,Erlang是基于Actor模型的实现。在Actor模型里面,万物皆Actor,每个Actor都封装着内部状态,Actor相互之间只能通过消息传递这一种方式来进行通信。对应到Erlang里,每个Actor对应着一个Erlang进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销(不考虑Erlang虚拟机底层实现中的锁应用)。
AMQP(Advanced Message Queue Protocol)定义了一种消息系统规范。这个规范描述了在一个分布式的系统中各个子系统如何通过消息交互。而RabbitMQ则是AMQP的一种基于erlang的实现。AMQP将分布式系统中各个子系统隔离开来,子系统之间不再有依赖。子系统仅依赖于消息。子系统不关心消息的发送者,也不关心消息的接受者。
优点
1、解耦:降低系统模块的耦合度
2、提高系统响应时间
3、异步消息
4、过载保护,流量削峰
1.应用解耦
场景:双11购物,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
这种做法有一个缺点:
- 当库存系统出现故障时,订单就会失败.
- 订单系统和库存系统高耦合.
引入消息队列
-
订单系统: 用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
-
库存系统: 订阅下单的消息,获取下单消息,进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失
2.异步处理
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1. 串行的方式 2. 并行的方式
串行方式: 将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.
并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.
消息队列
引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍
3.流量削峰
流量削峰一般在秒杀活动中应用广泛
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
这样我们就可以采用队列的机制来处理,如同我们在超市结算一样,并不会一窝蜂一样涌入收银台,而是排队结算,一个接着一个的处理,不能插队,因为同时结算就只能达到这么多。
三、RabbitMQ安装
1 2 3 4 5 6 7 8 9 10 11
| version: "3.1" services: rabbitmq: image: daocloud.io/library/rabbitmq:management restart: always container_name: rabbitmq ports: - 5672:5672 - 15672:15672 volumes: - ./data:/var/lib/rabbitmq
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| [root@192 ~]# cd /opt [root@192 opt]# mkdir docker_rabbitmq [root@192 opt]# cd docker_rabbitmq/ [root@192 docker_rabbitmq]# vim docker-compose.yml [root@192 docker_rabbitmq]# docker-compose up -d Creating network "docker_rabbitmq_default" with the default driver Pulling rabbitmq (daocloud.io/library/rabbitmq:management)... management: Pulling from library/rabbitmq 01bf7da0a88c: Pull complete f3b4a5f15c7a: Pull complete 57ffbe87baa1: Pull complete 5ef3ef76b1b5: Pull complete 82a3ce07c0eb: Pull complete 1da219d9bd70: Pull complete 446554ac749d: Pull complete 8e4c09e200e7: Pull complete 7a8620611ebf: Pull complete c70a2924b273: Pull complete 3b0b9e36b4e9: Pull complete 7619a9a42512: Pull complete 965a8e1f1b1c: Pull complete Digest: sha256:4cc2267788b21e0f34523b4f2d9b32ee1c2867bf2de75d572158d6115349658c Status: Downloaded newer image for daocloud.io/library/rabbitmq:management Creating rabbitmq ... done
|
浏览器访问:http://ip:15672 (注:ip指当前云服务器的地址,云服务器记得开放 15672 和 5672 端口)
用户名和密码默认都是:guest
四、RabbitMQ架构【重点
】
4.1 官方的简单架构图
-
Publisher - 生产者:发布消息到RabbitMQ中的Exchange
-
Consumer - 消费者:监听RabbitMQ中的Queue中的消息
-
Exchange - 交换机:和生产者建立连接并接收生产者的消息
-
Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
-
Routes - 路由:交换机以什么样的策略将消息发布到Queue
简单架构图 |
|
4.2 RabbitMQ的完整架构图
完整架构图
完整架构图 |
|
4.3 RabbitMQ 通讯方式
https://www.rabbitmq.com/getstarted.html
4.4 Hello-World案例演示
- 导入依赖
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.15.0</version> </dependency>
|
- 创建生产者 Publisher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.couture.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class Publisher { public static void main(String[] args) throws Exception{ System.out.println("Publisher..."); ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.25.132"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("helloworldQueue",false,false,false,null); channel.basicPublish("","helloworldQueue",null,"helloworld".getBytes()); channel.close(); connection.close(); } }
|
- 创建消费者 Consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package com.couture.rabbitmq;
import com.rabbitmq.client.*; import java.io.IOException;
public class Consumer { public static void main(String[] args)throws Exception { System.out.println("Consumer..."); ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.25.132"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("来自生产者的消息:"+new String(body)); } };
channel.basicConsume("helloworldQueue",true,defaultConsumer); channel.close(); connection.close(); } }
|
分别启动生产者和消费者进行测试(生产一次才能消费一次)
4.5 基本原理
RabbitMQ是消息队列的一种实现,那么一个消息队列到底需要什么?答案是队列,即Queue,那么接下来所有名词都是围绕这个Queue来拓展的。
就RabbimtMQ而言,Queue是其中的一个逻辑上的实现,我们需要连接到RabbitMQ来操作队列进而实现业务功能,所以就会有Connection,我们发一条消息连接一次,这样很显然是浪费资源的,建立连接的过程也很耗时,所以我们就会做一个东西让他来管理连接,当我用的时候,直接从里边拿出来已经建立好的连接发信息,那么ConnectionFactory应运而生。
接下来,当程序开发时,可能不止用到一个队列,可能有订单的队列、消息的队列、任务的队列等等,那么就需要给不同的queue发信息,那么和每一个队列连接的这个概念,就叫Channel。
再往下来,当我们开发的时候还有时候会用到这样一种功能,就是当我发送一条消息,需要让几个queue都收到,那么怎么解决这个问题呢,难道我要给每一个queue发送一次消息?那岂不是浪费带宽又浪费资源,我们能想到什么办法呢,当然是我们发送给RabbitMQ服务器一次,然后让RabbitMQ服务器自己解析需要给哪个Queue发,那么Exchange就是干这件事的
但是我们给Exchange发消息,他怎么知道给哪个Queue发呢?这里就用到了RoutingKey和BindingKey
BindingKey是Exchange和Queue绑定的规则描述,这个描述用来解析当Exchange接收到消息时,Exchange接收到的消息会带有RoutingKey这个字段,Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,如果满足要求,就往BindingKey所绑定的Queue发送消息,这样我们就解决了我们向RabbitMQ发送一次消息,可以分发到不同的Queue的过程
至此,我们就把所有的名词贯通咯,接下来做个概要描述:
- Broker:提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输
- ConnectionFactory:与RabbitMQ服务器连接的管理器
- Connection:与RabbitMQ服务器的TCP连接
- Channel:与Exchange的连接,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,为了多路复用。RabbitMQ建议客户端线程之间不要共用Channel,但是建议尽量共用Connection。
- Queue:消息的载体,每个消息都会被投到一个或多个队列。
- Exchange:接受消息生产者的消息,并根据消息的RoutingKey和 Exchange绑定的BindingKey,以及Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。
- Message Queue:消息队列,用于存储还未被消费者消费的消息。
- Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。
- RoutingKey:由Producer发送Message时指定,指定当前消息被谁接受
- BindingKey:由Consumer在Binding Exchange与Message Queue时指定,指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中
- Binding:联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
- Server: 接受客户端连接,实现AMQP消息队列和路由功能的进程。
- Virtual Host:其实是一个虚拟概念,类似于权限控制组,可以通过命令分配给用户Virtual Host的权限,默认的guest用户是管理员权限,初始空间有/,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host
五、SpringBoot整合RabbitMQ的使用【重点
】
5.1 导入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
5.2 在application.properties中增加配置
1 2 3 4 5
| spring.rabbitmq.host=192.168.153.136 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
|
5.3 Hello-World 简单队列
一个生产者,一个默认的交换机,一个队列,一个消费者
结构图 |
|
1)创建配置类,用于创建队列对象
1 2 3 4 5 6 7 8 9 10 11 12 13
| package com.couture.simple; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class SimpleQueueConfig {
@Bean public Queue simple(){ return new Queue("simpleQueue"); } }
|
2)创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package com.couture.simple; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class SimpleQueueProducer {
@Autowired private RabbitTemplate rabbitTemplate;
public void send(){ System.out.println("SimpleQueueProducer"); rabbitTemplate.convertAndSend("simpleQueue","简单模式"); } }
|
3)创建消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.couture.simple; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SimpleQueueCustomer { @RabbitListener(queues="simpleQueue") public void receive(String content){ System.out.println("SimpleQueueCustomer"); System.out.println("来SimpleQueueProducer的信息:"+content); } }
|
4)在src\test\java\com\qf\Rabbitmq01ApplicationTests.java进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.couture; import com.couture.simple.SimpleQueueProducer; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest class Rabbitmq01ApplicationTests { @Test void contextLoads() { } @Autowired private SimpleQueueProducer simpleQueueProducer;
@Test public void testSimpleQueueProducer(){ simpleQueueProducer.send(); } }
|
如果传递的是 JavaBean 对象,该实体类需要实现序列化接口,具体流程如下:
- 导入lombok依赖,创建User类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.couture.pojo;
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private String username; private String password; }
|
- 修改生产者中的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package com.couture.simple;
import com.couture.pojo.User; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class SimplePublisher {
@Autowired private RabbitTemplate rabbitTemplate;
public void send(){ System.out.println("SimplePublisher..."); rabbitTemplate.convertAndSend("","simpleQueue",new User("张三","123")); } }
|
- 修改消费者中的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.couture.simple;
import com.couture.pojo.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SimpleConsumer {
@RabbitListener(queues = "simpleQueue") public void receive(User user){ System.out.println("SimpleConsumer..."); System.out.println("来自SimplePublisher的消息:"+user); } }
|
- 运行测试类即可!
5.4 Work 工作队列
一个生产者,一个默认的交换机,一个队列,两个消费者
结构图 |
|
1)创建配置类,用于创建队列对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.couture.work;
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class WorkQueueConfig { @Bean public Queue work(){ return new Queue("workQueue"); } }
|
2)创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package com.couture.work;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class WorkQueueProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(){ System.out.println("WorkQueueProducer"); rabbitTemplate.convertAndSend("workQueue","工作队列模式"); } }
|
3)创建消费者,本案例创建两个消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.couture.work;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class WorkQueueCustomer_01 { @RabbitListener(queues="workQueue") public void receive(String content){ System.out.println("WorkQueueCustomer_01:"+content); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.couture.work;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class WorkQueueCustomer_02 {
@RabbitListener(queues="workQueue") public void receive(String content){ System.out.println("WorkQueueCustomer_02:"+content); } }
|
4)在测试类中添加对象和方法进行测试
1 2 3 4 5 6 7 8 9 10
| @Autowired private WorkQueueProducer workQueueProducer;
@Test public void testWorkQueueProducer(){
for (int i = 0; i<100; i++){ workQueueProducer.send(); } }
|
5.5 Publish/Subscribe 发布订阅模式
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
|
使用该模式需要借助交换机,生产者将消息发送到交换机,再通过交换机到达队列.
有四种交换机:direct/topic/headers/fanout,默认交换机是direct,发布与订阅的实现使用第四个交换器类型fanout
使用交换机时,每个消费者有自己的队列,生产者将消息发送到交换机(X),每个队列都要绑定到交换机
本例中:
创建2个消息队列
创建一个fanout交换机对象
Bind交换机和队列
1)创建配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.couture.fanout;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class FanoutConfig {
@Bean public Queue fanoutQueue1(){ return new Queue("fanoutQueue1"); }
@Bean public Queue fanoutQueue2(){ return new Queue("fanoutQueue2"); }
@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); }
@Bean public Binding bindingFanoutQueue1(){ return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); }
@Bean public Binding bindingFanoutQueue2(){ return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); } }
|
2)创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.couture.fanout;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class FanoutProducer {
@Autowired private RabbitTemplate rabbitTemplate;
public void send(){ System.out.println("FanoutProducer"); rabbitTemplate.convertAndSend("fanoutExchange","","发布/订阅"); } }
|
3)创建消费者,本案例创建两个消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.couture.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class FanoutCustomer_01 {
@RabbitListener(queues = "fanoutQueue1") public void receive(String content){ System.out.println("FanoutCustomer_01:"+content); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.couture.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class FanoutCustomer_02 {
@RabbitListener(queues = "fanoutQueue2") public void receive(String content){ System.out.println("FanoutCustomer_02:"+content); } }
|
4)在测试类中添加对象和方法进行测试
1 2 3 4 5 6 7
| @Autowired private FanoutProducer fanoutProducer;
@Test public void testFanoutProducer(){ fanoutProducer.send(); }
|
5.6 Routing 路由模式
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
|
生产者将消息发送到direct交换机(路由模式需要借助直连交换机实现),在绑定队列和交换机的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。也就是让消费者有选择性的接收消息。
本例中:
创建2个消息队列
创建一个direct交换机对象
Bind交换机和队列
1)创建配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.couture.direct;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class DirectConfig {
@Bean public Queue directQueue1(){ return new Queue("directQueue1"); }
@Bean public Queue directQueue2(){ return new Queue("directQueue2"); }
@Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); }
@Bean public Binding bingDirectQueue1(){ return BindingBuilder.bind(directQueue1()).to(directExchange()).with("zhangsan"); }
@Bean public Binding bingDirectQueue2(){ return BindingBuilder.bind(directQueue2()).to(directExchange()).with("lisi"); }
}
|
2)创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.couture.direct;
import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class DirectProducer {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private DirectExchange directExchange;
public void send(){ System.out.println("DirectProducer"); rabbitTemplate.convertAndSend(directExchange.getName(),"zhangsan","zhangsanContent"); rabbitTemplate.convertAndSend(directExchange.getName(),"lisi","lisiContent"); } }
|
3)创建两个消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.couture.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class DirectCustomer_01 {
@RabbitListener(queues = "directQueue1") public void receive(String content){ System.out.println("DirectCustomer_01:"+content); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.couture.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class DirectCustomer_02 {
@RabbitListener(queues = "directQueue2") public void receive(String content){ System.out.println("DirectCustomer_02:"+content); } }
|
4)在测试类中添加对象和方法进行测试
1 2 3 4 5 6 7
| @Autowired private DirectProducer directProducer;
@Test public void testDirectProducer(){ directProducer.send(); }
|
5.7 Topic 主题模式
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
|
又称通配符模式(可以理解为模糊匹配,路由模式相当于精确匹配)
使用直连交换机可以改善我们的系统,但是它仍有局限性,它不能实现多重条件的路由。
在消息系统中,我们不仅想要订阅基于路由键的队列,还想订阅基于生产消息的源。这时候可以使用topic交换机。
使用主题交换机时不能采用任意写法的路由键,路由键的形式应该是由点分割的有意义的单词。例如"goods.stock.info"等。路由key最多255字节。
*号代表一个单词
#号代表0个或多个单词
1)创建配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.couture.topic;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class TopicConfig {
@Bean public Queue topicQueue1(){ return new Queue("topicQueue1"); }
@Bean public Queue topicQueue2(){ return new Queue("topicQueue2"); }
@Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); }
@Bean public Binding bingTopicQueue1(){ return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("wangwu.*"); }
@Bean public Binding bingTopicQueue2(){ return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("zhaoliu.#"); }
}
|
2)创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.couture.topic;
import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class TopicProducer {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private TopicExchange topicExchange;
public void send(){ System.out.println("TopicProducer"); rabbitTemplate.convertAndSend(topicExchange.getName(),"wangwu.abc","wangwuContent"); rabbitTemplate.convertAndSend(topicExchange.getName(),"zhaoliu.xyz.qwer","zhaoliuContent"); } }
|
3)创建两个消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.couture.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class TopicCustomer_01 {
@RabbitListener(queues = "topicQueue1") public void receive(String content){ System.out.println("TopicCustomer_01:"+content); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package com.couture.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class TopicCustomer_02 {
@RabbitListener(queues = "topicQueue2") public void receive(String content){ System.out.println("TopicCustomer_02:"+content); } }
|
4)在测试类中添加对象和方法进行测试
1 2 3 4 5 6 7
| @Autowired private TopicProducer topicProducer;
@Test public void testTopicProducer(){ topicProducer.send(); }
|
5.8手动Ack
RabbitMQ中的Ack: 主要是确认消息被消费者消费完成后通知服务器将队列里面的消息清除,spring-boot-data-amqp 是自动ACK机制,就意味着 MQ 会在消息发送完毕后,自动帮我们去ACK,然后删除队列中的消息,这样会存在一些问题:如果消费者处理消息需要较长时间,或者在消费消息的时候出现异常,都会出现问题,手动Ack可以避免消息重复消费。
5.8.1 原生方式测试
1.以简单模式为例,只需要修改消费者即可,启动生产者进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package com.couture.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { System.out.println("消费者启动...");
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("192.168.25.134"); factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { System.out.println(new String(body, "UTF-8")); int i = 1/0; channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(envelope.getDeliveryTag(), false, false); e.printStackTrace(); connection.close(); } } };
channel.basicConsume("helloworld",false,defaultConsumer);
} }
|
5.8.2 SpringBoot中测试
1.在 application.properties 中添加配置
1 2
| spring.rabbitmq.listener.simple.acknowledge-mode=manual
|
2.在之前测试的任意模式中添加 AckCustomer 演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package com.couture.simple;
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component @RabbitListener(queues = "simpleQueue") public class AckCustomer {
@RabbitHandler public void receive(String message,Channel channel,Message msg){ System.out.println("AckCustomer..."); if(message!=null && message.length()>0){ try { System.out.println("获取消息:"+message); int i = 1/0; long deliveryTag = msg.getMessageProperties().getDeliveryTag(); System.out.println("deliveryTag:"+deliveryTag); channel.basicAck(deliveryTag,false); } catch (Exception e) { System.out.println("消息处理..."); try { channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false); e.printStackTrace(); channel.getConnection().close(); } catch (Exception ex) { ex.printStackTrace(); } }
}else{ System.out.println("没有消息"); } } }
|
注释或打开异常进行测试即可!
六、事务与confirm机制
6.1 消息的可靠性
思考?
1.如果消息已经到达RabbitMQ,RabbitMQ宕机了,消息是不是就丢失了?
可以使用Queue的持久化机制
2.消费者在消费消息的时候,程序执行到一半,消费者宕机了怎么办?
可以手动Ack
3.生产者发送消息时,由于网络问题,导致消息没有发送到RabbitMQ怎么办?
RabbitMQ提供了事务操作和Confirm以及Return机制
保证消息的传递可以使用RabbitMQ中的事务,事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息,但是事务操作效率太低。
RabbitMQ中除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。
6.2 RabbitMQ事务
RabbitMQ的事务是对AMQP协议的实现,通过设置Channel
的模式来完成,具体操作如下:
1 2 3 4
| channel.txSelect();
channel.txCommit(); channel.txRollback();
|
特别说明:RabbitMQ的事务机制是同步操作,会极大的降低RabbitMQ的性能。
6.3 Confirm机制
由于RabbitMQ的事务性能的问题,于是就又推出了发送方确认模式。
6.3.1 创建工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package com.couture.utils;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException;
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
static { connectionFactory = new ConnectionFactory(); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setHost("192.168.25.134"); }
public static Connection getConnection() { Connection connection = null; try { connection = connectionFactory.newConnection(); } catch (IOException ioException) { ioException.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return connection; }
public static void close(Channel channel, Connection connection) { try { if(null != channel) { channel.close(); } if(null != connection){ connection.close(); } }catch (Exception ex) { ex.printStackTrace(); } } }
|
6.3.2 单条消息确认
1
| channel.confirmSelect();
|
1.在RabbitMq控制台页面,创建一个direct类型的交换机,再创建一个队列并绑定
channel.waitForConfirms(); //对于单条消息的确认,true表示成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.couture.confirm;
import com.couture.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class ComfirmTest1 { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.confirmSelect(); channel.basicPublish("myExchange","my",null,"消息内容".getBytes());
if(channel.waitForConfirms()){ System.out.println("消息已到达交换机"); }
RabbitMQUtils.close(channel,connection); } }
|
6.3.2 批量消息确认
channel.waitForConfirmsOrDie(); //批量消息确认,如果有一条消息没有发送成功,会抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.couture.confirm;
import com.couture.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class ComfirmTest2 { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.confirmSelect(); for (int i = 0; i < 10; i++) { if(i == 5){ channel.basicPublish("myExchange2", "my", null, ("消息内容"+ i).getBytes()); continue; } channel.basicPublish("myExchange", "my", null, ("消息内容"+ i).getBytes()); }
channel.waitForConfirmsOrDie();
RabbitMQUtils.close(channel,connection); } }
|
6.3.3 回调方式确认
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.couture.confirm;
import com.couture.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection;
import java.io.IOException;
public class ComfirmTest3 { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.confirmSelect();
channel.basicPublish("myExchange","my",null,"消息内容".getBytes());
channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("成功达到交换机"); }
@Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("没有到达交换机"); } });
RabbitMQUtils.close(channel,connection); } }
|
6.4 Return机制
Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue
而且exchange是不能持久化消息的,queue是可以持久化消息
采用Return机制来监听消息是否从exchange送到了指定的queue中
开启Return机制,在发送消息时,需要指定mandatory为true
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package com.couture.confirm;
import com.couture.utils.RabbitMQUtils; import com.rabbitmq.client.*;
import java.io.IOException;
public class ReturnTest { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.confirmSelect();
channel.basicPublish("myExchange","my2",true,null,"消息内容".getBytes());
channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("成功达到交换机"); }
@Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("没有到达交换机"); } });
channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("没有到达队列"); } });
RabbitMQUtils.close(channel,connection); } }
|
6.5 SpringBoot实现
1.在 application.properties 中添加配置
spring.rabbitmq.publisher-confirm-type 对应值的说明
- NONE :禁用发布确认模式,是默认值
- CORRELATED:发布消息成功到交换器后会触发回调方法
- SIMPLE:两种效果
- 和CORRELATED值一样会触发回调方法
- 在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker
1 2 3
| spring.rabbitmq.publisher-confirm-type: simple spring.rabbitmq.publisher-returns: true
|
2.创建配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package com.couture.direct;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void initMethod(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); }
@Override public void confirm(CorrelationData correlationData, boolean ack, String s) { if(ack){ System.out.println("到达交换机"); }else{ System.out.println("没有到达交换机"); } }
@Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("没有到达队列"); } }
|
3.修改生产者发送消息方法中的routingKey,然后启动测试类测试即可!
七. 死信队列
死信队列并不是一个特殊的队列,只是一个普通的队列,只是我们把他们取名叫做死信队列。
死信队列的设计是在某个队列的头信息中设定x-dead-letter-exchange
(死信交换机)和x-dead-letter-routing-key
(死信路由键)即可。关联到一个绑定到某个死信交换机的队列上。然后给该队列指定过期时间或者指定的消息的过期时间,那么该消息到期后会自动到达死信队列中。
7.1 场景
场景一:未支付订单在规定的时间取消。实现的方式为,将订单消息放入到一个队列中,并指定其过期时间。当过期时间到了之后,就进入到了死信队列,那么可以直接在死信队列的消费端取出对应的消息即可。
场景二:某条消息在消费端曾多次尝试消费,但是均未消费成功,那么就进入死信队列,让人工干预。
7.2 测试
提前创建RabbitMQUtils工具类并测试即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package com.couture.dead;
import com.couture.utils.RabbitMQUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.util.HashMap; import java.util.Map;
public class DeadQueueTest {
private static final String dead_letter_exchange = "dead_letter_exchange"; private static final String dead_letter_routing_key = "dead_letter_routing_key"; private static final String dead_letter_queue = "dead_letter_queue";
private static final String people_exchange = "people_exchange"; private static final String people_routing_key = "people_routing_key"; private static final String people_queue = "people_queue";
public static void main(String[] args) throws Exception{ Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(dead_letter_exchange, "direct"); channel.queueDeclare(dead_letter_queue, true, false, false, null); channel.queueBind(dead_letter_queue, dead_letter_exchange, dead_letter_routing_key);
Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", dead_letter_exchange); arguments.put("x-dead-letter-routing-key", dead_letter_routing_key);
channel.exchangeDeclare(people_exchange, "direct"); channel.queueDeclare(people_queue, true, false, false, arguments); channel.queueBind(people_queue, people_exchange, people_routing_key);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("15000").build();
channel.basicPublish(people_exchange, people_routing_key, properties, "dead_message".getBytes());
RabbitMQUtils.close(channel, connection); } }
|
八. 避免消息重复消费
8.1 幂等性
所有的消息中间件都会存在这样一个问题,那就是消息的重复消费问题,所以我们必须做幂等性设计,所谓幂等性设计就是,一条消息无论消费多少次所产生的结果都是相同的。
重复消费消息,是对非幂等性操作造成问题,重复消费消息的原因是因为消费者没有给RabbitMQ一个Ack
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
8.2 解决方案
方案一:为每条消息生成全局唯一ID,将ID和业务数据放在同一个事务中,每次消费消息之后都将ID在表中插入一条数据,每次消费之前先查询ID是否存在,如果不存在就执行对应的逻辑;如果存在则直接确认。
方案二(推荐):利用redis+数据库的方案来实现幂等性的设计,实现的思路与redis的缓存击穿方案类似;当插入数据的时候,将唯一ID同时插入数据库,然后放入到redis中,设置过期时间,每次从redis中判断。
8.3 在springboot中测试
8.3.1 导入依赖
1 2 3 4 5 6 7 8 9
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
|
8.3.2 配置application.properties
1 2 3 4 5 6 7 8 9 10
| spring.rabbitmq.host=192.168.25.140 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.redis.host=127.0.0.1 spring.redis.port=6379
|
8.3.3 这里以简单模式演示
1.编写配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.couture.simple;
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class SimpleQueueConfig {
@Bean public Queue simpleQueue(){ return new Queue("simpleQueue"); }
@Bean public MessagePostProcessor correlationIdProcessor() { MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message, Correlation correlation) { MessageProperties messageProperties = message.getMessageProperties(); if (correlation instanceof CorrelationData) { String correlationId = ((CorrelationData) correlation).getId(); messageProperties.setCorrelationId(correlationId); } return message; }
@Override public Message postProcessMessage(Message message) throws AmqpException { return message; } }; return messagePostProcessor; } }
|
2.编写生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.couture.simple;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.UUID;
@Component public class SimpleProducer {
@Autowired private MessagePostProcessor messagePostProcessor;
@Autowired private RabbitTemplate rabbitTemplate;
public void send(){ CorrelationData correlationData = new CorrelationData(); System.out.println(correlationData.getId()); rabbitTemplate.convertAndSend("","simpleQueue","简单队列",messagePostProcessor,correlationData); }
}
|
3.编写消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package com.couture.simple;
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component;
import java.io.IOException; import java.util.concurrent.TimeUnit;
@Component
@RabbitListener(queues = "simpleQueue") public class AckCustomer {
@Autowired private StringRedisTemplate redisTemplate;
@RabbitHandler public void getMessage(String msg, Channel channel, Message message) throws IOException { String messageId = message.getMessageProperties().getCorrelationId(); System.out.println(messageId); Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(messageId, "0", 600, TimeUnit.SECONDS); if(aBoolean) { System.out.println("正在处理消息...."); return; }
if("0".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){ System.out.println("接收到消息:" + msg); redisTemplate.opsForValue().set(messageId,"1",600, TimeUnit.SECONDS); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }else { if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
} }
|
4.测试类测试即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.couture;
import com.couture.simple.SimpleProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest class Springboot14RabbitmqApplicationTests {
@Test void contextLoads() { }
@Autowired private SimpleProducer simpleProducer;
@Test public void test(){ simpleProducer.send(); }
}
|
测试成功后,到redis中查看即可!