📦 Rocket版本 4.9.6

🏢 官方文档: https://rocketmq.apache.org/zh/docs/4.x/

💻 使用的系统是 ubuntu22.04

🏆 处理消息堆积和消息丢失

⭐️ 消息堆积

1️⃣ 什么是消息堆积?

当因为某些原因很多消息并没有被消费,还在在

MQ中,慢慢的 MQ中的消息变多就形成了消息堆积,一般在生产太快消费者来不及消费或消费者出现故障时发生

2️⃣ 如何解决消息堆积?

提高消费并行度

绝大部 行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:

  • 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效,消费者<=队列)。可以通过加机器,或者在已有机器启动多个进程的方式。
  • 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现,默认是20。IO密集型通常设置2n(n表示cpu线程数)如果是CPU密集型,那就是n+1

生产者限制生产

动态扩容队列数量,从而增加消费者数量

如果是消费者问题则排查消费者

⭐️ 消息丢失

使用数据库来保存

  1. 生产者使用同步发送模式 ,收到mq的返回确认以后 顺便往自己的数据库里面写 msgId status(0) time

  2. 消费者消费以后 修改数据这条消息的状态 = 1

  3. 写一个定时任务 间隔两天去查询数据 如果有status = 0 and time < day-2 就拿出来在发一遍

  4. 将mq的刷盘机制设置为同步刷盘

    1. 同步刷盘: 生产者发送消息到mq,mq回应的时候会先将数据存储到磁盘中
    2. 异步刷盘: 将数据存储到内存,然后同一存储到磁盘
  5. 使用集群模式 ,搞主备模式,将消息持久化在不同的硬件上

可以开启mq的trace机制,消息跟踪机制

1.在broker.conf中开启消息追踪

traceTopicEnable=true

2.重启broker即可

3.生产者配置文件开启消息轨迹

enable-msg-trace: true
  1. 消费者开启消息轨迹功能,可以给单独的某一个消费者开启
enableMsgTrace = true

在rocketmq的面板中可以查看消息轨迹

默认会将消息轨迹的数据存在 RMQ_SYS_TRACE_TOPIC 主题里面

⭐️ RocketMQ安全配置

🏢 官方配置文件: https://rocketmq.apache.org/zh/docs/4.x/bestPractice/04access

权限控制(ACL)主要为 RocketMQ提供 Topic资源级别的用户访问控制。用户在使用 RocketMQ权限控制时,可以在 Client客户端通过 RPCHook注入 AccessKeySecretKey签名;同时,将对应的权限控制属性(包括 Topic访问权限、IP白名单和 AccessKeySecretKey签名等)设置在 distribution/conf/plain_acl.yml的配置文件中。Broker端对 AccessKey所拥有的权限进行校验,校验不过,抛出异常; ACL客户端可以参考:org.apache.rocketmq.example.simple包下面的AclClient代码

1️⃣ 配置RocketMQ Borker的ACL认证,修改配置文件

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=172.22.242.163
autoCreateTopicEnable=true
## 配置开启acl
aclEnable=true 

2️⃣ 查看 plain_acl.yml文件,开启重启后会毒读取此文件配置的用户名和密码

[rocketmq@b43180cfe44c bin]$ cat ../conf/plain_acl.yml
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

globalWhiteRemoteAddresses:
  - 10.10.103.*
  - 192.168.0.*

accounts:
  - accessKey: RocketMQ
    secretKey: 12345678
    whiteRemoteAddress:
    admin: false ##表示是不是管理员用户
    defaultTopicPerm: DENY
    defaultGroupPerm: SUB
    topicPerms:
      - topicA=DENY
      - topicB=PUB|SUB
      - topicC=SUB
    groupPerms:
      # the group should convert to retry topic
      - groupA=DENY
      - groupB=PUB|SUB
      - groupC=SUB

  - accessKey: rocketmq2
    secretKey: 12345678
    whiteRemoteAddress: 192.168.1.*
    # if it is admin, it could access all resources
    admin: true

2️⃣ 编写 users.properties文件,这个是dashboard配置自己的登录面板时读取的用户名和密码

# This file supports hot change, any change will be auto-reloaded without Console restarting.
# Format: a user per line, username=password[,N] #N is optional, 0 (Normal User); 1 (Admin)

# Define Admin
# =============用户名和密码规则「用户名=密码,权限」,这里的权限为1表示管理员,为0表示普通用户=============
# 例如:admin=admin123,1
admin=admin,1
user=user,0


# Define Users
# =============屏蔽下边两个账户=============
#user1=user1
#user2=user2

3️⃣ 修改 docker-compose文件,使得 dashboard链接上 rocketmq,然后在打开 Dashboard本身自带的登录面板

version: '3.8'
services:
  namesrv:
    image: apache/rocketmq:4.9.6
    container_name: rmqnamesrv
    restart: always
    ports:
      - 9876:9876
    networks:
      - rocketmq
    command: sh mqnamesrv

  broker:
    image: apache/rocketmq:4.9.6
    container_name: rmqbroker
    restart: always
    ports:
      - 10909:10909
      - 10911:10911
      - 10912:10912
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    volumes:
      - ./broker.conf:/home/rocketmq/rocketmq-4.9.6/conf/broker.conf
    depends_on:
      - namesrv
    networks:
      - rocketmq
    command: sh mqbroker -c /home/rocketmq/rocketmq-4.9.6/conf/broker.conf

  dashboard:
    image: apacherocketmq/rocketmq-dashboard:latest
    ## 开启配置登录后会读取users.properties文件,而docker镜像工作目录为如下,所以将此文件挂载至此
    volumes:
      - ./users.properties:/tmp/rocketmq-console/data/users.properties
    container_name: rmqdashabord
    restart: always
    ports:
      - 9090:8080
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false -Drocketmq.config.loginRequired=true -Drocketmq.config.accessKey=rocketmq2 -Drocketmq.config.secretKey=12345678
    depends_on:
      - namesrv
      - broker
    networks:
      - rocketmq


networks:
  rocketmq:
    driver: bridge

Drocketmq.config.loginRequired=true: 启动 dashboard登录面板

Drocketmq.config.accessKey=rocketmq2: 连接 rocketmq的用户名

Drocketmq.config.accessKey=rocketmq2:连接 rocketmq的密码

4️⃣ 启动查看

image-20240803210137380

查看是否可以链接到 RocketMQ

image-20240803210205665