📦 Rocket版本 4.9.6

🏢 官方文档: https://rocketmq.apache.org/zh/docs/4.x/

💻 使用的系统是 ubuntu22.04

RocketMQ消息秒杀系统

秒杀在短时间内需要处理大量的高并发请求

    • 并行性是指两个或多个事件在同一时刻发生→宏观并行,微观并行
    • 并发性是指两个或多个事件在同一时间间隔内发生→宏观并行,微观串行

提高并发考虑的层面:
1.硬件

2.软件,尽量把代码写好,考虑到高并发内容

QPS:每秒钟处理请求的数量

TPS: 每秒交易(或事务)处理量

秒杀/大型促销/抢购要尽量减少接口的处理时间

1.能异步就异步
2.减少IO(统一查统一写)
3.提前校验,能尽早return就return
4.加锁粒度要小
5.事务控制粒度小

前端界面在这种高并发场景下可以使用图片校验/滑块校验等等方法进行分流,也顺带实现了人机校验

秒杀架构图

img

首先架构是通过两个服务,一个是生产者接收端口信息添加到消息队列,如何另一个服务为消费者,从消息队列中拿去服务对数据库进行操作

生产者端

1️⃣ 请求进入 Controller,查看此用户是不是抢过此商品(通过 setnx去重https://blog.tanc.fun:9999/archives/7964eb1c-22a5-4056-8fc2-8058f004aa74#---Redis-%E6%93%8D%E4%BD%9C)

为什么使用setnx命令而不是直接使用set集合,通义千问:

SETNX(Set if Not eXists)命令在Redis中提供了一种原子性的设置键值对的方法,只有当键不存在时才会设置,这使得它非常适合用于实现分布式锁或者避免重复处理某些操作,比如防止重复消息处理、重复任务执行等

SET集合用于去重

SET集合在Redis中是一种存储唯一元素的数据结构。当你使用 SADD命令向集合中添加元素时,Redis会自动确保集合中没有重复的元素。这是因为 SET集合的定义就是其所有元素都是唯一的。因此,SET集合非常适合用于在单个Redis实例内部去重,比如收集一系列不重复的用户ID、URL或其他标识符。

SETNX命令用于去重

SETNX命令则是在键级别的原子性操作,它尝试仅在键不存在时设置键值对。在分布式系统中,SETNX常被用来实现分布式锁,或者在多个客户端之间协调去重。例如,当多个客户端尝试处理同一任务时,可以使用 SETNX来确保只有一个客户端获得处理权,从而避免重复处理。

为什么在某些场景下使用 SETNX而不是 SET集合

尽管 SET集合提供了很好的去重功能,但在分布式系统中,你需要处理的去重问题可能超越了单个数据结构的范围,而是涉及多个客户端或服务实例之间的同步。在这种情况下,SETNX的原子性特性可以保证即使在高并发环境下,也只有第一个执行 SETNX的客户端能够成功设置键,其他客户端将因键已存在而失败,从而避免了重复。

此外,SETNX通常与锁的实现相关联,例如在分布式环境中实现一个任务恰好被一个工作者处理一次,而 SET集合则主要用于数据层面的去重,比如存储一系列唯一ID。

总之,SET集合和 SETNX都有各自的使用场景。SET集合适用于在单个Redis实例内部保持数据的唯一性,而 SETNX命令更适合用于跨客户端或服务实例的原子性操作,如分布式锁或协调去重。在设计系统时,你应该根据具体的需求来选择最适合的工具。

2️⃣ 提前通过 redis和数据库之间进行数据同步,将要秒杀的商品信息、库存数量添加到 redis中,这样可以减少 io,就不用进入数据库判断了

3️⃣ 通过异步的方式将消息传递给消息队列,主要传递商品的信息

消费者端

1️⃣ 从消息队列中拿取数据

以下步骤需要开启事务:

2️⃣ 将数据库中的商品-1

3️⃣ 将抢购信息添加到抢购关联表中(用户和商品),在这里由于高并发很多个请求都在同时读写数据会造成数据问题,这里就可以通过分部署锁来进行操作,有3种方法:

  1. 直接将事务在go语言层面上锁,可以但是如果是多个集群之间的锁不共享,只能用于单节点

  2. 使用Mysql的行锁(innodb)比如有如下 sql语句

    update goods set total_stocks = total_stocks - 1 where goods_id = goodsId and total_stocks - 1 >= 0
    

    关键在于者 total_stocks = total_stocks - 1total_stocks - 1 >= 0,但是这样就把压力给到了数据库,也不适合并发量特别大的场景

    或者使用唯一约束也是可以但是情况和上面一致

    1. 使用 redis,通过 setnx来设置锁https://blog.tanc.fun:9999/archives/7964eb1c-22a5-4056-8fc2-8058f004aa74#SET-GET-%E8%AE%BE%E7%BD%AE-%E8%AF%BB%E5%8F%96%E6%95%B0%E6%8D%AE

但是一般在 Controller层前面还需要有一些其他代理去顶住压力,比如使用 Nginx的负载均衡

image-20240805120757500

代码演示

前置条件: 安装 redisrocketmq

使用Go语言代码来演示

由于使用的是gin框架,踩了很多坑,顺带也学会了go是如何使用rocketmq

🌟 生产端

使用

Gin框架来搭建生产者端,这里我直接使用的我自己的 GoWebCreate一个脚手架项目地址: https://github.com/txbxxx/GinProjectCreate

image-20240807202139472

1️⃣ 首先需要编写go代码来连接 rocketMQ,一般工具类代码都存放在 utils包中,这里需要编写一个 RocketMQProducer代码,开始这里坑就很多,在官网看的代码和使用的不一致,找了好久在 rocket-go中找到了,地址:https://github.com/apache/rocketmq-client-go/tree/master/examples,

可以看到这里我配置文件填写都是用的 os.Getenv,因为这里使用了一个 env环境变量配置,可以在项目中的 .env文件中添加配置或者修改,可以自己修改一下

image-20240807203518371

import (
	rocketmq "github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	producerSvc "github.com/apache/rocketmq-client-go/v2/producer"
	"github.com/apache/rocketmq-client-go/v2/rlog"
	"os"
)
func QueueUtil(ProducerGroup string) (rocketmq.Producer, error) {
	// 设置日志级别为 Error 级别(关闭大多数日志)
	rlog.SetLogLevel("error")
	return rocketmq.NewProducer(
		//设置组名
		producerSvc.WithGroupName(ProducerGroup),
		//设置nameserver地址
		producerSvc.WithNameServer([]string{os.Getenv("NameSvcAddr")}),
		//设置了连接验证需要配置,如果没有设置则忽略
		producerSvc.WithCredentials(primitive.Credentials{
			AccessKey: os.Getenv("RMQ_AKEY"),
			SecretKey: os.Getenv("RMQ_SKEY"),
		}),
	)
}

由于 main.go代码文件是 conf.Init开始,也就说 gin服务启动之前的配置提前加载好,所以首先需要配置 conf/Init函数,在里面需要配置你 RedisProducer(需要先修改 .env中的连接配置)

然后这里需要配置一个全局的producer,这里就和 springboot不一样因为在 springboot配置好 rocketmq就会去连接然后可以直接注入,而 go需要自己配置全局,如果不配置全局在本次秒杀中会有两个问题:


1.Producer启动问题: 在前面官方代码正常流程是 创建 Producer对象-->启动 Producer-->发送消息-->关闭消息,如果是将 producer对象创建放入一个请求中创建,那么每来一个请求就需要创建一个 producer,这会出现一个问题,就是创建 producer时需要生产者组,那么你每次创建就都会去创建一个生产者组,这里我在做压力测试的时候就发生了如下报错,提示我生产者组已经被创建,

为什么在一个一个手动请求就不会出现这种情况,因为在你手动发送请求之后的下一个请求之前这个生产者就已经 shutdown了,压力测试时一秒钟 1000个或者 10000个请求

time="2024-08-07T19:39:38+08:00" level=error msg="启动Producer失败producer group has been created" func="go-RocketMQProducer/service/seckillSvc.(*SecKillService).ASyncSendProducer" file="G:/learn_code/RocketMQ_Golang/Producer/service/seckillSvc/SecKillSVC.go:99"
2.发送消息只会发送到

topic中的一个队列中: 因为在你 producer创建发送请求的时候它会遵循轮询,但是你每一次请求之前都会关闭掉这个 producer,又会重新开始轮询

image-20240807205426348

/**
 * @Author tanchang
 * @Description //TODO
 * @Date 2024/7/11 16:14
 * @File:  config
 * @Software: GoLand
 **/

package conf

import (
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/go-redis/redis/v8"
	"github.com/joho/godotenv"
	"github.com/sirupsen/logrus"
	"go-RocketMQProducer/utils"
	"os"
	"strconv"
)
//添加两个全局变量一个是Redis,一个是Procuder的
var RDB *redis.Client
var PD rocketmq.Producer

func Init() {
	err := godotenv.Load()
	if err != nil {
		fmt.Println("读取配置文件环境失败" + err.Error())
	}

	//连接数据库
	//utils.DBUntil(os.Getenv("DB_USER"), os.Getenv("DB_PWD"), os.Getenv("DB_ADDR"), os.Getenv("DB_NAME"), os.Getenv("TABLE_NAME"))

	//连接redis
	RDB = utils.RedisUtils(os.Getenv("RDB_ADDR"), os.Getenv("RDB_PWD"), os.Getenv("RDB_DEFAULT_DB"))

	//logrus配置
	logLevel, _ := strconv.Atoi(os.Getenv("LOG_LEVEL"))
	logrus.SetLevel(logrus.Level(logLevel))
	logrus.SetReportCaller(true)

	//初始化producer
	PD, err = utils.QueueUtil("SeckillProducerGroup")
	if err != nil {
		logrus.Error("连接RocketMQ失败")
		return
	}

}

main函数中关闭这个 producer,表示进程结束后才会关闭

func main() {
	//初始化配置
	conf.Init()
	gin.SetMode(os.Getenv("GIN_MODE"))
	r := router.Router()
	//启动http服务
	err := r.Run(os.Getenv("GIN_PORT"))
	if err != nil {
		return
	}
	//进程结束关闭Producer
	defer func(PD rocketmq.Producer) {
		err := PD.Shutdown()
		if err != nil {

		}
	}(conf.PD)
}

2️⃣ 然后需要先写一个 route代码,在 route目录

func Router() *gin.Engine {
	httpServer := gin.Default()
	//跨域
	httpServer.Use(middleware.Cors())

	user := httpServer.Group("/user")
	{
		user.POST("/login", control.Login)
		user.POST("/register", control.Register)
	}

	httpServer.GET("/doseckill", control.DoSecKill) //前面代码可以忽略主要看这段
	return httpServer
}

control包内编写一个 DoSecKill处理函数

package control

import (
	"github.com/gin-gonic/gin"
	"go-RocketMQProducer/service/seckillSvc"
)

func DoSecKill(c *gin.Context) {
	var svc seckillSvc.SecKillService //这里使用了一个自定义验证器
	err := c.ShouldBind(&svc)
	if err != nil {
		c.JSON(200, gin.H{"err": err})
	}
	c.JSON(200, svc.Seckill())
}

4️⃣ 在 service包中创建 SeckillService包存放秒杀处理代码,这里的代码坑机很多

1.首先是使用 atomic.AddInt64,来模拟用户(在生产环境中不需要,因为独立用户),通过接收前端指定的秒杀商品id,来组成一个唯一key值,在使用 SETNX将这个uk来写入 rediskey中如果插入成功则表示第一次抢这个商品

2.随后使用 DECR自减这里的数据需要通过 consumer从数据库中拿去库存信息同步到 redis中,然后这里先验证库存数量是否充足

3.发送异步消息到消息队列,也就是发送uk,发送和官网代码一致

总体流程和架构图一致

package seckillSvc

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/gin-gonic/gin"
	"github.com/sirupsen/logrus"
	"go-RocketMQProducer/conf"
	"strconv"
	"sync"
	"sync/atomic"
)

var counter int64

type SecKillService struct {
	GoodsID string `form:"goods_id" json:"goods_id"`
}

// MQ报错返回
var mqErr = gin.H{
	"code": -1,
	"msg":  "抢购人数太火爆啦,请稍后再试",
}

func (service *SecKillService) Seckill() gin.H {
	// 初始化计数器
	atomic.AddInt64(&counter, 1)

	//生成uk
	ukey := service.GoodsID + "-" + strconv.FormatInt(atomic.LoadInt64(&counter), 10)
	//如果是第一抢这个商品,就生成的uk加入到redis里面,使用setnx
	nx := conf.RDB.SetNX(context.Background(), "uk:"+ukey, "", 0)
	if !nx.Val() {
		return gin.H{
			"code": -1,
			"msg":  "您已经来过了,去看看的商品吧",
		}
	}

	// 将redis内的库存-1,如果为0就表示商品已经卖完了
	decr := conf.RDB.Decr(context.Background(), "goodsID:"+service.GoodsID)
	if decr.Val() <= 0 {
		return gin.H{
			"code": -1,
			"msg":  "该商品已经被抢购完毕",
		}
	}

	err := service.ASyncSendProducer(ukey)
	if err != nil {
		return mqErr
	}


	// 返回gin.H
	return gin.H{
		"code": 200,
		"msg":  "抢购成功,请稍后去订单详情页查看",
	}
}

func (service *SecKillService) ASyncSendProducer(ukey string) gin.H {
	err := conf.PD.Start()
	if err != nil {
		logrus.Error("启动Producer失败", err)
		return mqErr
	}
	var wg sync.WaitGroup
	wg.Add(1)
	err = conf.PD.SendAsync(context.Background(), func(ctx context.Context, result *primitive.SendResult, err error) {
		if err != nil {
			logrus.Error("发送失败 UserID:"+strconv.FormatInt(atomic.LoadInt64(&counter), 10)+" GoodsID: "+service.GoodsID, " err:", err, " result:", result)
		} else {
			fmt.Println("发送成功")
		}
		wg.Done()
	}, primitive.NewMessage("SecKillTopic", []byte(ukey)))
	if err != nil {
		logrus.Error("发送至RocketMQ失败: ", err.Error(), "msg: ", ukey)
	}
	wg.Wait()

	return nil
}

🌟 消费端

同样也是用 GoWebCrate自己的脚手架,也是使用 Gin

由于消费者端是需要进行数据库操作的操作的所以需要使用 Grom,在 utils包中的 DB可以看到数据库连接,

1️⃣ 开始也是先连接 RocketMQ代码

import (
	rocketmq "github.com/apache/rocketmq-client-go/v2"
	consumerSvc "github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	producerSvc "github.com/apache/rocketmq-client-go/v2/producer"
	"github.com/apache/rocketmq-client-go/v2/rlog"
	"os"
)

func Producer(ProducerGroup string) (rocketmq.Producer, error) {
	// 设置日志级别为 Error 级别(关闭大多数日志)
	rlog.SetLogLevel("error")
	return rocketmq.NewProducer(
		producerSvc.WithGroupName(ProducerGroup),
		producerSvc.WithNameServer([]string{os.Getenv("NameSvcAddr")}),
		producerSvc.WithCredentials(primitive.Credentials{
			AccessKey: os.Getenv("RMQ_AKEY"),
			SecretKey: os.Getenv("RMQ_SKEY"),
		}),
	)
}

func Consumer(ConsumerGroup string) (rocketmq.PushConsumer, error) {
	// 设置日志级别为 Error 级别(关闭大多数日志)
	rlog.SetLogLevel("error")
	return rocketmq.NewPushConsumer(
		consumerSvc.WithGroupName(ConsumerGroup),
		consumerSvc.WithNameServer([]string{os.Getenv("NameSvcAddr")}),
		consumerSvc.WithCredentials(primitive.Credentials{
			AccessKey: os.Getenv("RMQ_AKEY"),
			SecretKey: os.Getenv("RMQ_SKEY"),
		}),
	)

}

2️⃣ 创建两个模型并且通过 gorm创建


3️⃣ 新建一个 consumer目录。里面存放 consumer代码

1.先启动 consumer,这里不想生产者那么特殊需要拿前端请求,这里只需要从 rocketmq拿取消息即可,操作代码和官网一致

2.主Z

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	consumerSvc "github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/sirupsen/logrus"
	"go-RocketMQConsumer/conf"
	"go-RocketMQConsumer/model"
	"go-RocketMQConsumer/utils"
	"gorm.io/gorm"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"
)

// Redis_time 次数时间
var Redis_time = 20

func ReceptionMsg() {
	sig := make(chan os.Signal)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	//创建Consumer
	consumer, err := utils.Consumer("SeckillConumserGoup")
	if err != nil {
		logrus.Error("创建消费者错误: ", err.Error())
		return
	}
	if err != nil {
		logrus.Error("创建主题失败")
	}

	//订阅Topic开始消费
	err = Sub(err, consumer)
	if err != nil {
		logrus.Error("订阅主题错误", err)
		return
	}
	//启动消费者
	err = consumer.Start()
	if err != nil {
		logrus.Error("启动消费者错误", err.Error())
		return
	}
	//挂起消费者
	<-sig
	err = consumer.Shutdown()
	if err != nil {
		fmt.Printf("shutdown Consumer error: %s", err.Error())
		return
	}
}

// Sub 订阅主题开始消费工作
func Sub(err error, consumer rocketmq.PushConsumer) error {
	// 订阅Topic开始消费
	err = consumer.Subscribe("SecKillTopic", consumerSvc.MessageSelector{},
		func(ctx context.Context, msgs ...*primitive.MessageExt) (consumerSvc.ConsumeResult, error) {
			//开启事物
			tx := utils.DB.Begin()
			//return后判断是回滚还是提交
			defer func() {
				if r := recover(); r != nil {
					logrus.Errorf("事物Panic: %v", r)
					_ = tx.Rollback()
				} else if err := tx.Error; err != nil {
					logrus.Errorf("事物错误: %v", err)
					_ = tx.Rollback()
				} else {
					_ = tx.Commit()
				}
			}()
			for i := range msgs {
				// 分割队列中的消息,0为GoodsID 1为UserID
				fmt.Println("收到消息:", string(msgs[i].Body))
				split := strings.Split(string(msgs[i].Body), "-")
				err := processMsg(split, tx)
				if err != nil {
					logrus.Error("处理消息:", string(msgs[i].Body), " 错误", err)
					return consumerSvc.ConsumeRetryLater, err
				}
			}
			return consumerSvc.ConsumeSuccess, nil
		},
	)
	return err
}

func processMsg(split []string, tx *gorm.DB) error {

	for current := 0; current < Redis_time; current++ {
		//开启Redis锁使用SETNX
		if flag := conf.RDB.SetNX(context.Background(), "goods_lock:"+split[0], "", time.Second*5).Val(); flag {
			//减库存
			//如果事物报错则回滚
			if txerr := tx.Model(&model.Goods{}).Where("goods_id = ?", split[0]).Update("stock", gorm.Expr("stock - ?", 1)).Error; txerr != nil {
				return txerr
			}
			// 将这两项插入数据库中
			if txerr := tx.Create(&model.Seckill{GoodsID: split[0], UserID: split[1]}).Error; txerr != nil {
				return txerr
			}
			//操作完成没有报错就删除锁
			conf.RDB.Del(context.Background(), "goods_lock:"+split[0])
			return nil
		} else {
			//如果没拿到锁就等待2秒
			time.Sleep(time.Second * 2)
		}
	}
	return fmt.Errorf("未能获取锁 goods_lock:%s", split[0])
}