SpringBoot整合RabbitMQ
官网地址 : https://www.rabbitmq.com/
rabbitmq安装
docker 安装
- https://hub.docker.com/_/rabbitmq
- docker pull rabbitmq:management
docker 启动命令
1 | ## 启动rabbitMQ |
4369, 25672 (Erlang发现&集群端口)
5672, 5671 (AMQP端口)
15672 (web管理后台端口)
61613, 61614 (STOMP协议端口)
1883, 8883 (MQTT协议端口)
官方协议文档: https://www.rabbitmq.com/networking.html
http访问: http://192.168.56.101:15672
springBoot整合rabbitmq
引入 spring-boot-starter-amqp
1 | <!-- 引入 RabbitMQ 依赖--> |
引入 rabbitmq 场景启动器, RabbitAutoConfiguration 配置类会自动生效
它给容器中配置了一系列的组件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59/**
* 连接工厂
*/
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties,
ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
PropertyMapper map = PropertyMapper.get();
CachingConnectionFactory factory = new CachingConnectionFactory(
getRabbitConnectionFactoryBean(properties).getObject());
map.from(properties::determineAddresses).to(factory::setAddresses);
map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
map.from(properties::getPublisherConfirmType).whenNonNull().to(factory::setPublisherConfirmType);
RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis)
.to(factory::setChannelCheckoutTimeout);
RabbitProperties.Cache.Connection connection = properties.getCache().getConnection();
map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
map.from(connection::getSize).whenNonNull().to(factory::setConnectionCacheSize);
map.from(connectionNameStrategy::getIfUnique).whenNonNull().to(factory::setConnectionNameStrategy);
return factory;
}
/**
* RabbitTemplate 消息发送组件
*/
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
return template;
}
/**
* amqpAdmin 高级消息队列管理组件
* 使用 amqpAdmin 创建 exchanges, queue, binding
*/
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
/**
* RabbitMessagingTemplate
*/
public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
return new RabbitMessagingTemplate(rabbitTemplate);
}
.....
开启 RabbitMQ : 如果只是创建组件,发送消息可以不开启;但是如果需要接收消息,必须开启
1 | /** |
配置 配置属性
1 | spring: |
Demo 测试使用 mq 创建queue , exchanges , binding
1 | public class Test { |
Demo 测试使用 mq 发送消息
1 | // 可以直接发送一条字符串作为消息 |
Mq 接收消息
@RabbitListener(queues = {"hello-java-queue"}) // 标注在 类 + 方法上(监听多个队列)1
2
3
4
- ```java
@RabbitHandler
// 标注在 方法上(重载区分不同的消息)@RabbitListener(queues = {"hello-java-queue"}) public class RabbitService { @RabbitHandler public void message1(MessageEntity1 entity){ log.info("-----------接收到 MessageEntity1 类型的消息 ----------- {}",entity); } @RabbitHandler public void message2(MessageEntity2 entity){ log.info("===========接收到 MessageEntity2 类型的消息 =========== {}",entity); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
## rabbitMQ消息确认机制--可靠抵达
- 保证消息不丢失,可靠抵达,可以使用事务消息,但是性能下降250倍,为此引入确认机制
- **publisher** confirmCallback 消息确认模式
- **publisher** returnCallback 未投递到 queue 退回模式
- **consumer** ack模式
> 可靠抵达配置开启
```properties
## 开始发送端确认
spring.rabbitmq.publisher-confirms=true ## 可以不使用,只搭配使用下面两个配置
## 开启发送端消息未抵达队列确认
spring.rabbitmq.publisher-returns=true
## 只要抵达队列,以异步发送优先回调 return-confirm
spring.rabbitmq.template.mandatory=true
demo实例
1 |
|
可靠抵达-Ack消息确认机制
消费者获取到消息,成功处理,可以回复ack给Broker.此时才可以删除这条消息
1
2## 配置签收规则(默认 auto 自动签收)
spring.rabbitmq.listener.simple.acknowledge-mode=manual ## 手动签收- 系统默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息
- 消费者设置为手动确认模式,只要我们没有明确告诉MQ签收消息,就不会ack,这样消息一直都是unacked状态,即使consumer宕机,消息也不会丢失,会重新变为ready,下一次有新的consumer连接进来就会发给他
-basic.ack 用于肯定确认;broker将移除消息
-basic.nack 用于否定确认;可以指定broker是否丢弃此消息,支持批量操作
-basic.reject 用于否定确认;同上,但是不能批量
消费者收到消息,默认会自动ack,但是无法确定此消息是否被处理完成,因此最好开启手动ack模式
- 消息处理成功,ack(),接收下一个消息,此消息就会被broker移除掉
- 消息处理失败,nack()/reject(),重新会重新发给其他人进行处理,或者容错处理后ack
- 消息一直没有调用ack()/nack();broker会认为消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,但是会投递给别人
demo演示
1 |
|
RabbitMQ延时队列
消息的TTL (time to live)
消息的TTL 就是消息的存活时间
RabbitMQ 可以对队列和消息分别设置 TTL
- 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独设置,超过了这个时间,我们就认为这个消息死了,称之为
死信 - 如果队列设置了 TTL,消息也设置了,那么会
取小的.所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置).这里单讲单个消息的TTL,因为他才是实现延时任务的关键.可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间两者都是一样的效果
- 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独设置,超过了这个时间,我们就认为这个消息死了,称之为
死信 Dead Letter Exchanges (DLX)
- 一个消息在满足如下条件下,会进
死信路由,这里是路由而不是队列,一个路由可以对应很多队列- 一个消息被consumer拒收了,并且reject方法的参数里requeue是false,也就是说不会再次放进队列里,被其他消费者使用.(basic.reject / basic.nack) requeue = false
- 上面的消息的TTL 到期了,消息过期了
- 队列的长度限制满了,排在前面的消息会被丢弃或者扔到死信路由上
- Dead Letter Exchange 其实就是一种普通exchange,和创建其他exchange没有两样.只是在某一个设置 dead letter exchange队列中有消息过期了,会自动触发消息的转发,发送到dead letter exchange中去
- 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列
延时队列的实现-1
设置队列过期时间实现延时队列
- 生产者 — > (routing - key deal message) –> 交换机 (exchange) – >发送到队列
- 队列设置了死信路由交换机,设置了过期时间,路由的键 (x-dead-letter-exchange delay exchange ,x-dead-letter-routing-key delay message , x-message-ttl 60000) —> 到期的消息被转发到死信交换机—->(delay exchange) —>再根据路由键发送到死信队列 —( routing -key delay message) —> test queue —> consumer 消费
延时队列的实现-2
设置消息过期时间实现延时队列 —->不太推荐,由于消息队列是懒加载的.不太适合,推荐方式1
延时队列的实现 demo
创建队列
1 | // 项目启动队列创建好了之后,即使属性发生了变化,queue也不会改变,只有重新删除在重新启动就可以 |
消费者监听队列,接收消息
1 |
|
测试发送消息
1 |
|
消息的积压,重复,丢失等解决方案
消息丢失
消息发送出去,由于网络问题,没有抵达服务器
- 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定时扫描失败消息重新发送
- 做好日志记录,每个消息的状态是否被服务器收到都要做好记录
- 做好定期重发,如果消息没有发送成功,定期去数据库查询发送失败的消息进行重发
1
2
3
4
5
6
7
8
9
10
11
12
13
14DROP TABLE IF EXISTS `mq_message`;
CREATE TABLE `mq_message` (
`message_id` char(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息id',
`content` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '消息的内容',
`to_exchange` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '交换机',
`routing_key` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '路由键',
`class_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '数据类型',
`message_status` int(2) NOT NULL COMMENT '0-新建/1-已发送/2-错误抵达/3-已抵达',
`create_time` datetime(0) NULL DEFAULT NULL,
`update_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`message_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功,此时Broker尚未完成持久化,宕机
- publisher也必须加入确认回调机制,确认消息成功,修改数据库状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MyRabbirConfig{
/**
* 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。PostConstruct在构造函数之后执行,init()方法之前执行。
* <p>
* 通常我们会是在Spring框架中使用到@PostConstruct注解 该注解的方法在整个Bean初始化中的执行顺序:
* <p>
* Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
* <p>
* 应用:在静态方法中调用依赖注入的Bean中的方法。
*/
public void initRabbitTemplate() {
/*
消息发送确认回调
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 当前消息的唯一关联数据,每一个消息发送的时候可以给一个唯一id
* @param ack 消息是否成功收到
* @param cause 失败原因
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// TODO 服务器收到了,记录到数据库,保存好状态
System.out.println("--------消息发送成功------- correlationData=>[" + correlationData + "]==>ack=>[" + ack + "]==>cause=>[" + cause + "]");
}
});
/*
消息抵达回调
*/
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 哪个消息投递失败了,消息的具体内容
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 前这个消息发给哪个交换机
* @param routingKey 当前这个消息发送的时候,指定的哪个路由键
*/
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// TODO 报错误了,修改数据库当前消息的状态-->错误.在定期扫描失败消息,重新发送
System.out.println("Fail Message[" + message + "]==>replyCode[" + replyCode + "]==>replyText[" + replyText + "]==>exchange[" + exchange + "]==>routingKey[" + routingKey + "]");
}
});
}
}自动ACK的状态下,消费者收到消息,还没来得及消费消息就宕机
- 一定要开启手动ACK,消费成功才移除,失败或者没来得及处理的消息就noACK,并重新入队
1
2
3
4
5spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual ## 手动确认消息1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class StockClosureListener {
public void receiveMessage(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
// 手动消费消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 消息重新入队
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
消息重复
- 消息消费成功,事务已经提交,ack时候,机器宕机,导致没有ack成功,broker消息重新由unACK变成ready,并发送给其他消费者
- 消息消费失败,由于重试机制,自动又将消息发送出去
- 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
解决方法:
- 消费者的业务消费接口应该设计成幂等性的.比如扣减库存有工作单的状态标志
- 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标志,处理过的就不用处理
- rabbitMQ的每一个消息都有redelivered字段,可以获取是否是重新投递过来的,而不是第一次过来的消息,做一下处理
消息积压
- 消费者宕机消息积压
- 消费者消费能力不足积压
- 发送者发送流量太大积压
解决方法:
- 上线更多的消费者,进行正常消费
- 上线专门的队列消费服务,将消息先批量取出来,记录到数据库,在慢慢进行处理