RocketMQ部署

igxiaoshan Lv5

RocketMQ部署

RabbitMQRocketMQ是属于不同的消息队列系统,虽然都是MQ,但是它们的配置和部署方式不同

RabbitMQ部署回顾

RabbitMQRocketMQ对比

类型不同

  • RabbitMQ是一个广泛使用的AMQP协议的消息队列系统.
  • RocketMQ是由阿里巴巴开源的,支持高吞吐量,低延迟的分布式消息中间件.

组件不同

  • RabbitMQ是一个单一的服务,通常只需要一个容器即可运行.
  • RocketMQ需要多个组件(NameServer,Broker,Console)协同工作,通常需要多个容器.

端口配置

  • RabbitMQ通常使用5672(AMQP协议)和15672(管理控制台).
  • RocketMQNameServer使用9876端口,Borker使用109091091110912等端口,Console使用8080端口.

镜像和数据卷

  • RabbitMQ的镜像包含管理插件,并使用一个数据卷来持久化数据.
  • RocketMQ使用多个镜像和数据卷来分离不同组件的功能.

总结

尽管两者都是消息队列服务,但它们的架构和部署方式有很大的不同

RabbitMQ更适合单机或者简单集群部署,

RocketMQ更适合分布式,高并发的场景,因此需要更多的容器和配置满足其功能需求.

RocketMQ部署

部署RocketMQ NameServer

NameServerRocketMQ的一个核心组件,主要负责路由信息的管理

1
2
3
4
5
6
7
docker run -d \
--name rmqnamesrv \
-p 9876:9876 \
-v /path/to/namesrv/logs:/opt/logs \
-v /path/to/namesrv/store:/opt/store \
rocketmqinc/rocketmq:4.9.4 \
sh mqnamesrv
  • --name rmqnamesrv:容器名
  • -p 9876:9876: 映射NameServer的默认端口
  • -v /path/to/namesrv/logs:/opt/logs: 挂载日志目录
  • -v /path/to/namesrv/store:/opt/store: 挂载存储目录
  • rocketmqinc/rocketmq:4.9.4: 使用的RocketMQ镜像版本
  • sh mqnamesrv: 启动NamerServer服务

部署RocketMQ Broker

BorkerRocketMQ的核心服务,负责消息的存储和传递

1
2
3
4
5
6
7
8
9
10
11
docker run -d \
--name rmqbroker \
-p 10911:10911 \
-p 10912:10912 \
-p 10909:10909 \
-v /path/to/broker/logs:/opt/logs \
-v /path/to/broker/store:/opt/store \
--link rmqnamesrv:namesrv \
-e "NAMESRV_ADDR=namesrv:9876" \
rocketmqinc/rocketmq:4.9.4 \
sh mqbroker -c /opt/rocketmq-4.9.4/conf/broker.conf
  • --name rmqbroker: 容器名
  • -p 10911:10911: 映射Broker的通信端口
  • -p 10912:10912: 映射Broker的HA端口
  • -p 10909:10909: 映射Broker的服务端口
  • -v /path/to/broker/logs:/opt/logs: 挂载日志目录
  • -v /path/to/broker/store:/opt/store: 挂载存储目录
  • -- link rmqnamesrv:namesrv: 将NameServer连接到Broker容器中
  • -e "NAMESRV_ADDR=namesrv:9876": 设置NameServer的地址
  • rocketmqinc/rocketmq:4.9.4: 使用的RocketMQ镜像版本
  • sh mqbroker -c /opt/rocketmq-4.9.4/conf/broker.conf: 启动Broker服务并指定配置文件

部署RocketMQ Console

ConsoleRocketMQ的管理界面,方便我们监控和管理RocketMQ集群

1
2
3
4
5
docker run -d \
--name rmqconsole \
-p 8080:8080 \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
styletang/rocketmq-console-ng
  • --name rmqconsole: 容器名
  • -p 8080:8080: 映射Console的Web管理界面端口
  • -e "JAVA_OPTS=-Dockermq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false": 设置NameServer地址和禁用VIP通道
  • styletang/rocketmq-console-ng: 使用的RocketMQ Console镜像

使用Demo

下面是一个Spring Boot项目整合RocketMQ的简单示例

添加依赖

pom.xml文件中添加RocketMQ的依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>

配置文件

application.properties中添加RocketMQ的相关配置

1
2
rocketmq.name-server=localhost:9876
rocketmq.producer.group=springboot-consumer-group

生产者代码

创建一个消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

@Autowired
private RocketMQTemplate rocketMQTemplate;

@GetMapping("/send")
public String send() {
rocketMQTemplate.convertAndSend("test-topic", "Hello, RocketMQ!");
return "Message sent!";
}
}

消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "springboot-consumer-group")
public class ConsumerService implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
System.out.println("Received message: " + message);
}
}

启动应用

运行Spring Boot应用,访问http://localhost:8080/send发送消息,并在控制台查看消费者接收到的消息。

访问RocketMQ Console

可以通过浏览器访问http://localhost:8080,进入RocketMQ Console的Web管理界面。

在RocketMQ Console中,你可以进行以下操作:

  • 查看集群状态
  • 查看Topic信息
  • 查看消费组信息
  • 查看消息轨迹
  • 发送和消费测试消息
  • 管理Broker

通过RocketMQ Console,管理员可以方便地监控和管理RocketMQ集群,提高运维效率和故障排查能力。

什么是AMQP消息队列

AMQP(Advanced Message Queuing Protocol) 是一种开放的应用层协议标准,用于消息导向的中间件.AMQP消息队列系统使用该协议来确保消息在发送方接受方之间可靠,安全地传递.它的主要目标是促进不同系统之间的消息传递,实现互操作性和灵活性.

AMQP消息队列系统的关键特性

  1. 消息的可靠性
  • 确保消息从生产者到消费者的传递是可靠,支持消息的确认机制和持久化
  1. 灵活的路由机制
  • 支持多种消息路由方式,如点对点,发布/订阅等,可以根据业务需求配置消息的传递路径
  1. 安全性
  • 提供基于SSL/TLS的传输加密和用户身份验证机制,保证消息传递的安全性
  1. 事务支持
  • 支持事务操作,可以确保一组消息要么全部成功处理,要么全部失败回滚

AMQP消息队列系统的核心组件

  1. Exchange(交换机)
    • 负责接受生产者发送的消息,并根据一定的路由规则将消息投递到一个或多个队列中
    • 常见的交换机类型包括
      • Direct: 根据消息的路由键精确匹配队列
      • Topic: 根据模式匹配的路由键将消息路由到一个或多个队列
      • Fanout: 将消息广播到所有绑定到交换机的队列
      • Headers: 根据消息头属性进行匹配
  2. Queue(队列)
    • 存储消息的容器,消费者从队列中获取并处理消息
    • 队列可以配置为持久化,临时或者自动删除等特性
  3. Binding(绑定)
    • 将交换机和队列关联起来,根据绑定规则决定消息的路由方式
  4. Message(消息)
    • 传递的数据单元,包括消息头和消息体,消息头包含路由键等元数据,消息体包括实际的业务数据

常见的AMQP实现

  1. RabbitMQ
    • 最流行的AMQP消息队列实现之一,支持多种协议,提供丰富的功能特性和插件扩展
    • 高度可配置,易于集成和扩展
  2. Apache Qpid
    • Apache基金会的开源项目,提供AMQP消息中间件实现
  3. OpenAMQ
    • 开源的AMQP消息队列实现,专注于高性能和低延迟

AMQP消息队列的典型应用场景

  1. 分布式系统的消息传递
    • 在微服务架构中,不同服务之间需要传递消息,AMQP消息队列提供了可靠的消息传递机制
  2. 异步处理
    • 将耗时的操作放入队列,有后台服务异步处理,提升系统的响应速度和用户体验
  3. 事件驱动架构
    • 系统内部或系统之间的事件通知和处理,通过消息队列实现事件驱动架构
  4. 日志和监控
    • 收集和传递日志,监控数据,通过消息队列实现集中处理和分析