📦 Rocket
版本 4.9.6
🏢 官方文档: https://rocketmq.apache.org/zh/docs/4.x/
💻 使用的系统是 ubuntu22.04
RocketMQ消息秒杀系统
秒杀在短时间内需要处理大量的高并发请求
-
- 并行性是指两个或多个事件在同一时刻发生→宏观并行,微观并行
- 并发性是指两个或多个事件在同一时间间隔内发生→宏观并行,微观串行
提高并发考虑的层面:
1.硬件
2.软件,尽量把代码写好,考虑到高并发内容
QPS
:每秒钟处理请求的数量
TPS
: 每秒交易(或事务)处理量
秒杀/大型促销/抢购要尽量减少接口的处理时间
1.能异步就异步
2.减少IO(统一查统一写)
3.提前校验,能尽早return就return
4.加锁粒度要小
5.事务控制粒度小
前端界面在这种高并发场景下可以使用图片校验/滑块校验等等方法进行分流,也顺带实现了人机校验
秒杀架构图
首先架构是通过两个服务,一个是生产者接收端口信息添加到消息队列,如何另一个服务为消费者,从消息队列中拿去服务对数据库进行操作
生产者端
1️⃣ 请求进入 Controller
,查看此用户是不是抢过此商品(通过 setnx
去重https://blog.tanc.fun:9999/archives/7964eb1c-22a5-4056-8fc2-8058f004aa74#---Redis-%E6%93%8D%E4%BD%9C)
为什么使用setnx命令而不是直接使用set集合,通义千问:
SETNX
(Set if Not eXists)命令在Redis中提供了一种原子性的设置键值对的方法,只有当键不存在时才会设置,这使得它非常适合用于实现分布式锁或者避免重复处理某些操作,比如防止重复消息处理、重复任务执行等SET集合用于去重
SET
集合在Redis中是一种存储唯一元素的数据结构。当你使用SADD
命令向集合中添加元素时,Redis会自动确保集合中没有重复的元素。这是因为SET
集合的定义就是其所有元素都是唯一的。因此,SET
集合非常适合用于在单个Redis实例内部去重,比如收集一系列不重复的用户ID、URL或其他标识符。SETNX命令用于去重
SETNX
命令则是在键级别的原子性操作,它尝试仅在键不存在时设置键值对。在分布式系统中,SETNX
常被用来实现分布式锁,或者在多个客户端之间协调去重。例如,当多个客户端尝试处理同一任务时,可以使用SETNX
来确保只有一个客户端获得处理权,从而避免重复处理。为什么在某些场景下使用
SETNX
而不是SET
集合尽管
SET
集合提供了很好的去重功能,但在分布式系统中,你需要处理的去重问题可能超越了单个数据结构的范围,而是涉及多个客户端或服务实例之间的同步。在这种情况下,SETNX
的原子性特性可以保证即使在高并发环境下,也只有第一个执行SETNX
的客户端能够成功设置键,其他客户端将因键已存在而失败,从而避免了重复。此外,
SETNX
通常与锁的实现相关联,例如在分布式环境中实现一个任务恰好被一个工作者处理一次,而SET
集合则主要用于数据层面的去重,比如存储一系列唯一ID。总之,
SET
集合和SETNX
都有各自的使用场景。SET
集合适用于在单个Redis实例内部保持数据的唯一性,而SETNX
命令更适合用于跨客户端或服务实例的原子性操作,如分布式锁或协调去重。在设计系统时,你应该根据具体的需求来选择最适合的工具。
2️⃣ 提前通过 redis
和数据库之间进行数据同步,将要秒杀的商品信息、库存数量添加到 redis
中,这样可以减少 io
,就不用进入数据库判断了
3️⃣ 通过异步的方式将消息传递给消息队列,主要传递商品的信息
消费者端
1️⃣ 从消息队列中拿取数据
以下步骤需要开启事务:
2️⃣ 将数据库中的商品-1
3️⃣ 将抢购信息添加到抢购关联表中(用户和商品),在这里由于高并发很多个请求都在同时读写数据会造成数据问题,这里就可以通过分部署锁来进行操作,有3种方法:
-
直接将事务在go语言层面上锁,可以但是如果是多个集群之间的锁不共享,只能用于单节点
-
使用Mysql的行锁(innodb)比如有如下
sql
语句update goods set total_stocks = total_stocks - 1 where goods_id = goodsId and total_stocks - 1 >= 0
关键在于者
total_stocks = total_stocks - 1
和total_stocks - 1 >= 0
,但是这样就把压力给到了数据库,也不适合并发量特别大的场景或者使用唯一约束也是可以但是情况和上面一致
- 使用
redis
,通过setnx
来设置锁https://blog.tanc.fun:9999/archives/7964eb1c-22a5-4056-8fc2-8058f004aa74#SET-GET-%E8%AE%BE%E7%BD%AE-%E8%AF%BB%E5%8F%96%E6%95%B0%E6%8D%AE
- 使用
但是一般在 Controller
层前面还需要有一些其他代理去顶住压力,比如使用 Nginx
的负载均衡
代码演示
前置条件: 安装 redis
和 rocketmq
使用Go语言代码来演示
由于使用的是gin框架,踩了很多坑,顺带也学会了go是如何使用rocketmq
🌟 生产端
使用
Gin
框架来搭建生产者端,这里我直接使用的我自己的 GoWebCreate
一个脚手架项目地址: https://github.com/txbxxx/GinProjectCreate
1️⃣ 首先需要编写go代码来连接 rocketMQ
,一般工具类代码都存放在 utils
包中,这里需要编写一个 RocketMQ
的 Producer
代码,开始这里坑就很多,在官网看的代码和使用的不一致,找了好久在 rocket-go
中找到了,地址:https://github.com/apache/rocketmq-client-go/tree/master/examples,
可以看到这里我配置文件填写都是用的 os.Getenv
,因为这里使用了一个 env
环境变量配置,可以在项目中的 .env
文件中添加配置或者修改,可以自己修改一下
import (
rocketmq "github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
producerSvc "github.com/apache/rocketmq-client-go/v2/producer"
"github.com/apache/rocketmq-client-go/v2/rlog"
"os"
)
func QueueUtil(ProducerGroup string) (rocketmq.Producer, error) {
// 设置日志级别为 Error 级别(关闭大多数日志)
rlog.SetLogLevel("error")
return rocketmq.NewProducer(
//设置组名
producerSvc.WithGroupName(ProducerGroup),
//设置nameserver地址
producerSvc.WithNameServer([]string{os.Getenv("NameSvcAddr")}),
//设置了连接验证需要配置,如果没有设置则忽略
producerSvc.WithCredentials(primitive.Credentials{
AccessKey: os.Getenv("RMQ_AKEY"),
SecretKey: os.Getenv("RMQ_SKEY"),
}),
)
}
由于 main.go
代码文件是 conf.Init
开始,也就说 gin
服务启动之前的配置提前加载好,所以首先需要配置 conf/Init
函数,在里面需要配置你 Redis
和 Producer
(需要先修改 .env
中的连接配置)
然后这里需要配置一个全局的producer,这里就和 springboot
不一样因为在 springboot
配置好 rocketmq
就会去连接然后可以直接注入,而 go
需要自己配置全局,如果不配置全局在本次秒杀中会有两个问题:
1.Producer
启动问题: 在前面官方代码正常流程是 创建 Producer
对象-->启动 Producer
-->发送消息-->关闭消息,如果是将 producer
对象创建放入一个请求中创建,那么每来一个请求就需要创建一个 producer
,这会出现一个问题,就是创建 producer
时需要生产者组,那么你每次创建就都会去创建一个生产者组,这里我在做压力测试的时候就发生了如下报错,提示我生产者组已经被创建,
为什么在一个一个手动请求就不会出现这种情况,因为在你手动发送请求之后的下一个请求之前这个生产者就已经
shutdown
了,压力测试时一秒钟1000
个或者10000
个请求
time="2024-08-07T19:39:38+08:00" level=error msg="启动Producer失败producer group has been created" func="go-RocketMQProducer/service/seckillSvc.(*SecKillService).ASyncSendProducer" file="G:/learn_code/RocketMQ_Golang/Producer/service/seckillSvc/SecKillSVC.go:99"
2.发送消息只会发送到
topic
中的一个队列中: 因为在你 producer
创建发送请求的时候它会遵循轮询,但是你每一次请求之前都会关闭掉这个 producer
,又会重新开始轮询
/**
* @Author tanchang
* @Description //TODO
* @Date 2024/7/11 16:14
* @File: config
* @Software: GoLand
**/
package conf
import (
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/go-redis/redis/v8"
"github.com/joho/godotenv"
"github.com/sirupsen/logrus"
"go-RocketMQProducer/utils"
"os"
"strconv"
)
//添加两个全局变量一个是Redis,一个是Procuder的
var RDB *redis.Client
var PD rocketmq.Producer
func Init() {
err := godotenv.Load()
if err != nil {
fmt.Println("读取配置文件环境失败" + err.Error())
}
//连接数据库
//utils.DBUntil(os.Getenv("DB_USER"), os.Getenv("DB_PWD"), os.Getenv("DB_ADDR"), os.Getenv("DB_NAME"), os.Getenv("TABLE_NAME"))
//连接redis
RDB = utils.RedisUtils(os.Getenv("RDB_ADDR"), os.Getenv("RDB_PWD"), os.Getenv("RDB_DEFAULT_DB"))
//logrus配置
logLevel, _ := strconv.Atoi(os.Getenv("LOG_LEVEL"))
logrus.SetLevel(logrus.Level(logLevel))
logrus.SetReportCaller(true)
//初始化producer
PD, err = utils.QueueUtil("SeckillProducerGroup")
if err != nil {
logrus.Error("连接RocketMQ失败")
return
}
}
main
函数中关闭这个 producer
,表示进程结束后才会关闭
func main() {
//初始化配置
conf.Init()
gin.SetMode(os.Getenv("GIN_MODE"))
r := router.Router()
//启动http服务
err := r.Run(os.Getenv("GIN_PORT"))
if err != nil {
return
}
//进程结束关闭Producer
defer func(PD rocketmq.Producer) {
err := PD.Shutdown()
if err != nil {
}
}(conf.PD)
}
2️⃣ 然后需要先写一个 route
代码,在 route
目录
func Router() *gin.Engine {
httpServer := gin.Default()
//跨域
httpServer.Use(middleware.Cors())
user := httpServer.Group("/user")
{
user.POST("/login", control.Login)
user.POST("/register", control.Register)
}
httpServer.GET("/doseckill", control.DoSecKill) //前面代码可以忽略主要看这段
return httpServer
}
在 control
包内编写一个 DoSecKill
处理函数
package control
import (
"github.com/gin-gonic/gin"
"go-RocketMQProducer/service/seckillSvc"
)
func DoSecKill(c *gin.Context) {
var svc seckillSvc.SecKillService //这里使用了一个自定义验证器
err := c.ShouldBind(&svc)
if err != nil {
c.JSON(200, gin.H{"err": err})
}
c.JSON(200, svc.Seckill())
}
4️⃣ 在 service
包中创建 SeckillService
包存放秒杀处理代码,这里的代码坑机很多
1.首先是使用 atomic.AddInt64
,来模拟用户(在生产环境中不需要,因为独立用户),通过接收前端指定的秒杀商品id,来组成一个唯一key值,在使用 SETNX
将这个uk来写入 redis
的 key
中如果插入成功则表示第一次抢这个商品
2.随后使用 DECR
自减这里的数据需要通过 consumer
从数据库中拿去库存信息同步到 redis
中,然后这里先验证库存数量是否充足
3.发送异步消息到消息队列,也就是发送uk,发送和官网代码一致
总体流程和架构图一致
package seckillSvc
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"go-RocketMQProducer/conf"
"strconv"
"sync"
"sync/atomic"
)
var counter int64
type SecKillService struct {
GoodsID string `form:"goods_id" json:"goods_id"`
}
// MQ报错返回
var mqErr = gin.H{
"code": -1,
"msg": "抢购人数太火爆啦,请稍后再试",
}
func (service *SecKillService) Seckill() gin.H {
// 初始化计数器
atomic.AddInt64(&counter, 1)
//生成uk
ukey := service.GoodsID + "-" + strconv.FormatInt(atomic.LoadInt64(&counter), 10)
//如果是第一抢这个商品,就生成的uk加入到redis里面,使用setnx
nx := conf.RDB.SetNX(context.Background(), "uk:"+ukey, "", 0)
if !nx.Val() {
return gin.H{
"code": -1,
"msg": "您已经来过了,去看看的商品吧",
}
}
// 将redis内的库存-1,如果为0就表示商品已经卖完了
decr := conf.RDB.Decr(context.Background(), "goodsID:"+service.GoodsID)
if decr.Val() <= 0 {
return gin.H{
"code": -1,
"msg": "该商品已经被抢购完毕",
}
}
err := service.ASyncSendProducer(ukey)
if err != nil {
return mqErr
}
// 返回gin.H
return gin.H{
"code": 200,
"msg": "抢购成功,请稍后去订单详情页查看",
}
}
func (service *SecKillService) ASyncSendProducer(ukey string) gin.H {
err := conf.PD.Start()
if err != nil {
logrus.Error("启动Producer失败", err)
return mqErr
}
var wg sync.WaitGroup
wg.Add(1)
err = conf.PD.SendAsync(context.Background(), func(ctx context.Context, result *primitive.SendResult, err error) {
if err != nil {
logrus.Error("发送失败 UserID:"+strconv.FormatInt(atomic.LoadInt64(&counter), 10)+" GoodsID: "+service.GoodsID, " err:", err, " result:", result)
} else {
fmt.Println("发送成功")
}
wg.Done()
}, primitive.NewMessage("SecKillTopic", []byte(ukey)))
if err != nil {
logrus.Error("发送至RocketMQ失败: ", err.Error(), "msg: ", ukey)
}
wg.Wait()
return nil
}
🌟 消费端
同样也是用 GoWebCrate
自己的脚手架,也是使用 Gin
由于消费者端是需要进行数据库操作的操作的所以需要使用 Grom
,在 utils
包中的 DB
可以看到数据库连接,
1️⃣ 开始也是先连接 RocketMQ
代码
import (
rocketmq "github.com/apache/rocketmq-client-go/v2"
consumerSvc "github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
producerSvc "github.com/apache/rocketmq-client-go/v2/producer"
"github.com/apache/rocketmq-client-go/v2/rlog"
"os"
)
func Producer(ProducerGroup string) (rocketmq.Producer, error) {
// 设置日志级别为 Error 级别(关闭大多数日志)
rlog.SetLogLevel("error")
return rocketmq.NewProducer(
producerSvc.WithGroupName(ProducerGroup),
producerSvc.WithNameServer([]string{os.Getenv("NameSvcAddr")}),
producerSvc.WithCredentials(primitive.Credentials{
AccessKey: os.Getenv("RMQ_AKEY"),
SecretKey: os.Getenv("RMQ_SKEY"),
}),
)
}
func Consumer(ConsumerGroup string) (rocketmq.PushConsumer, error) {
// 设置日志级别为 Error 级别(关闭大多数日志)
rlog.SetLogLevel("error")
return rocketmq.NewPushConsumer(
consumerSvc.WithGroupName(ConsumerGroup),
consumerSvc.WithNameServer([]string{os.Getenv("NameSvcAddr")}),
consumerSvc.WithCredentials(primitive.Credentials{
AccessKey: os.Getenv("RMQ_AKEY"),
SecretKey: os.Getenv("RMQ_SKEY"),
}),
)
}
2️⃣ 创建两个模型并且通过 gorm
创建
3️⃣ 新建一个 consumer
目录。里面存放 consumer
代码
1.先启动 consumer
,这里不想生产者那么特殊需要拿前端请求,这里只需要从 rocketmq
拿取消息即可,操作代码和官网一致
2.主Z
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
consumerSvc "github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/sirupsen/logrus"
"go-RocketMQConsumer/conf"
"go-RocketMQConsumer/model"
"go-RocketMQConsumer/utils"
"gorm.io/gorm"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
// Redis_time 次数时间
var Redis_time = 20
func ReceptionMsg() {
sig := make(chan os.Signal)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
//创建Consumer
consumer, err := utils.Consumer("SeckillConumserGoup")
if err != nil {
logrus.Error("创建消费者错误: ", err.Error())
return
}
if err != nil {
logrus.Error("创建主题失败")
}
//订阅Topic开始消费
err = Sub(err, consumer)
if err != nil {
logrus.Error("订阅主题错误", err)
return
}
//启动消费者
err = consumer.Start()
if err != nil {
logrus.Error("启动消费者错误", err.Error())
return
}
//挂起消费者
<-sig
err = consumer.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
return
}
}
// Sub 订阅主题开始消费工作
func Sub(err error, consumer rocketmq.PushConsumer) error {
// 订阅Topic开始消费
err = consumer.Subscribe("SecKillTopic", consumerSvc.MessageSelector{},
func(ctx context.Context, msgs ...*primitive.MessageExt) (consumerSvc.ConsumeResult, error) {
//开启事物
tx := utils.DB.Begin()
//return后判断是回滚还是提交
defer func() {
if r := recover(); r != nil {
logrus.Errorf("事物Panic: %v", r)
_ = tx.Rollback()
} else if err := tx.Error; err != nil {
logrus.Errorf("事物错误: %v", err)
_ = tx.Rollback()
} else {
_ = tx.Commit()
}
}()
for i := range msgs {
// 分割队列中的消息,0为GoodsID 1为UserID
fmt.Println("收到消息:", string(msgs[i].Body))
split := strings.Split(string(msgs[i].Body), "-")
err := processMsg(split, tx)
if err != nil {
logrus.Error("处理消息:", string(msgs[i].Body), " 错误", err)
return consumerSvc.ConsumeRetryLater, err
}
}
return consumerSvc.ConsumeSuccess, nil
},
)
return err
}
func processMsg(split []string, tx *gorm.DB) error {
for current := 0; current < Redis_time; current++ {
//开启Redis锁使用SETNX
if flag := conf.RDB.SetNX(context.Background(), "goods_lock:"+split[0], "", time.Second*5).Val(); flag {
//减库存
//如果事物报错则回滚
if txerr := tx.Model(&model.Goods{}).Where("goods_id = ?", split[0]).Update("stock", gorm.Expr("stock - ?", 1)).Error; txerr != nil {
return txerr
}
// 将这两项插入数据库中
if txerr := tx.Create(&model.Seckill{GoodsID: split[0], UserID: split[1]}).Error; txerr != nil {
return txerr
}
//操作完成没有报错就删除锁
conf.RDB.Del(context.Background(), "goods_lock:"+split[0])
return nil
} else {
//如果没拿到锁就等待2秒
time.Sleep(time.Second * 2)
}
}
return fmt.Errorf("未能获取锁 goods_lock:%s", split[0])
}