RabbitMQ整合Spring Boot项目
添加依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
application-dev.yml
1 2 3 4 5 6 7
| spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /
|
交换机\路由Key\队列的配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package cn.tedu.csmall.stock.webapi.quartz;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig { public static final String STOCK_EX="stock_ex"; public static final String STOCK_ROUT="stock_rout"; public static final String STOCK_QUEUE="stock_queue";
@Bean public DirectExchange stockDirectExchange(){ return new DirectExchange(STOCK_EX); } @Bean public Queue stockQueue(){ return new Queue(STOCK_QUEUE); } @Bean public Binding stockBinding(){ return BindingBuilder.bind(stockQueue()).to(stockDirectExchange()).with(STOCK_ROUT); } }
|
计划任务–先前的计划任务代码先注释
1 2 3 4 5 6 7 8 9 10
| @Autowired private RabbitTemplate rabbitTemplate; @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { rabbitTemplate.convertAndSend( RabbitMQConfig.STOCK_EX,RabbitMQConfig.STOCK_ROUT,"消息:执行减少库存的操作"); } }
|
修改cron表达式
1
| CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
|
RabbitMQ的消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package cn.tedu.csmall.stock.webapi.quartz;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @RabbitListener(queues = {RabbitMQConfig.STOCK_QUEUE}) public class RabbitMQConsumer { @RabbitHandler public void process(String str){ System.out.println("消息的接受者收到消息:"+str); } }
|
启动Nacos\RabbitMQ\Seata
启动stock-webapi
根据Cron表达式,消息会在0/10/20/30/40/50秒数时运行
测试成功表示一切正常
开发酷鲨秒杀执行流程
准备流控和降级的处理类
秒杀业务肯定是一个高并发的处理,并发数超过程序设计的限制时,就需要对请求的数量进行限流
Sentinel是阿里提供的SpringCloud组件,主要用于外界访问当前服务器的控制器方法的限流操作
先编写限流异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package cn.tedu.mall.seckill.exception;
import cn.tedu.mall.common.restful.JsonResult; import cn.tedu.mall.common.restful.ResponseCode; import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO; import com.alibaba.csp.sentinel.slots.block.BlockException; import lombok.extern.slf4j.Slf4j;
@Slf4j public class SeckillBlockHandler { public static JsonResult seckillBlock(String randCode, SeckillOrderAddDTO seckillOrderAddDTO, BlockException e){ log.error("一个请求被限流了"); return JsonResult.failed(ResponseCode.INTERNAL_SERVER_ERROR,"服务器繁忙!"); } }
|
再创建降级类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package cn.tedu.mall.seckill.exception;
import cn.tedu.mall.common.restful.JsonResult; import cn.tedu.mall.common.restful.ResponseCode; import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO; import lombok.extern.slf4j.Slf4j;
@Slf4j public class SeckillFallback { public static JsonResult seckillFall(String randCode, SeckillOrderAddDTO seckillOrderAddDTO, Throwable throwable){ log.error("一个请求被降级了!"); return JsonResult.failed(ResponseCode.INTERNAL_SERVER_ERROR,throwable.getMessage()); } }
|
提交秒杀订单
开发业务层
我们之前完成了秒杀的预热,预热中完成了秒杀商品sku库存数,spu随机码保存在redis中的操作
也完成了查询秒杀商品列表,和显示秒杀商品详情的方法
下面要开始进行秒杀商品生成订单的操作
如果用户选择商品规格(sku)提交订单,那么就要按照提交秒杀订单的业务流程处理
秒杀提交订单和普通订单的区别
1.要判断当前用户是否为重复购买
2.从Redis中判断是否有库存
3.秒杀订单转换成普通订单,需要使用dubbo在order模块完成
4.用消息队列(RabbitMQ)的方式将秒杀成功信息保存在success表中
创建一个SeckillServiceImpl业务逻辑层实现类,完成上面的业务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
| package cn.tedu.mall.seckill.service.impl;
import cn.tedu.mall.common.exception.CoolSharkServiceException; import cn.tedu.mall.common.pojo.domain.CsmallAuthenticationInfo; import cn.tedu.mall.common.restful.ResponseCode; import cn.tedu.mall.order.service.IOmsOrderService; import cn.tedu.mall.pojo.order.dto.OrderAddDTO; import cn.tedu.mall.pojo.order.dto.OrderItemAddDTO; import cn.tedu.mall.pojo.order.vo.OrderAddVO; import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO; import cn.tedu.mall.pojo.seckill.model.Success; import cn.tedu.mall.pojo.seckill.vo.SeckillCommitVO; import cn.tedu.mall.seckill.config.RabbitMqComponentConfiguration; import cn.tedu.mall.seckill.service.ISeckillService; import cn.tedu.mall.seckill.utils.SeckillCacheUtils; import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.config.annotation.DubboReference; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.security.authentication.UsernamePasswordAuthenticationToken; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Service;
import java.util.ArrayList; import java.util.List;
@Service @Slf4j public class SeckillServiceImpl implements ISeckillService { @Autowired private StringRedisTemplate stringRedisTemplate; @DubboReference private IOmsOrderService dubboOrderService; @Autowired private RabbitTemplate rabbitTemplate;
@Override public SeckillCommitVO commitSeckill(SeckillOrderAddDTO seckillOrderAddDTO) { Long skuId = seckillOrderAddDTO.getSeckillOrderItemAddDTO().getSkuId(); Long userId = getUserId(); String reseckillCheckKey = SeckillCacheUtils.getReseckillCheckKey(skuId, userId); Long seckillCounts = stringRedisTemplate.boundValueOps(reseckillCheckKey).increment(); if(seckillCounts>1) throw new CoolSharkServiceException(ResponseCode.FORBIDDEN,"您已经购买过该商品了"); String stockKey = SeckillCacheUtils.getStockKey(skuId); Long seckillStocks = stringRedisTemplate.boundValueOps(stockKey).decrement(); if(seckillStocks<0){ stringRedisTemplate.boundValueOps(reseckillCheckKey).decrement(); throw new CoolSharkServiceException(ResponseCode.BAD_REQUEST,"对不起,您购买的商品已经无货了"); } OrderAddDTO orderAddDTO = converSeckillOrderTOOrder(seckillOrderAddDTO); orderAddDTO.setUserId(userId); OrderAddVO orderAddVO = dubboOrderService.addOrder(orderAddDTO); Success success = new Success(); BeanUtils.copyProperties(seckillOrderAddDTO.getSeckillOrderItemAddDTO(),success); success.setUserId(userId); success.setOrderSn(orderAddVO.getSn()); rabbitTemplate.convertAndSend(RabbitMqComponentConfiguration.SECKILL_EX ,RabbitMqComponentConfiguration.SECKILL_RK,success); SeckillCommitVO seckillCommitVO = new SeckillCommitVO(); BeanUtils.copyProperties(orderAddVO,seckillCommitVO); return seckillCommitVO; } private OrderAddDTO converSeckillOrderTOOrder(SeckillOrderAddDTO seckillOrderAddDTO) { OrderAddDTO orderAddDTO = new OrderAddDTO(); BeanUtils.copyProperties(seckillOrderAddDTO,orderAddDTO); OrderItemAddDTO orderItemAddDTO = new OrderItemAddDTO(); BeanUtils.copyProperties(seckillOrderAddDTO.getSeckillOrderItemAddDTO(),orderItemAddDTO); List<OrderItemAddDTO> orderItemAddDTOS = new ArrayList<>(); orderItemAddDTOS.add(orderItemAddDTO); orderAddDTO.setOrderItems(orderItemAddDTOS); return orderAddDTO; }
public CsmallAuthenticationInfo getUserInfo(){ UsernamePasswordAuthenticationToken authenticationToken = (UsernamePasswordAuthenticationToken) SecurityContextHolder.getContext().getAuthentication(); if(authenticationToken==null) throw new CoolSharkServiceException(ResponseCode.UNAUTHORIZED,"没有登录信息"); CsmallAuthenticationInfo csmallAuthenticationInfo = (CsmallAuthenticationInfo) authenticationToken.getCredentials(); return csmallAuthenticationInfo; } public Long getUserId(){ return getUserInfo().getId(); } }
|
开发控制层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| package cn.tedu.mall.seckill.controller;
import cn.tedu.mall.common.exception.CoolSharkServiceException; import cn.tedu.mall.common.restful.JsonResult; import cn.tedu.mall.common.restful.ResponseCode; import cn.tedu.mall.pojo.seckill.dto.SeckillOrderAddDTO; import cn.tedu.mall.pojo.seckill.vo.SeckillCommitVO; import cn.tedu.mall.seckill.exception.SeckillBlockHandler; import cn.tedu.mall.seckill.exception.SeckillFallback; import cn.tedu.mall.seckill.service.ISeckillService; import cn.tedu.mall.seckill.utils.SeckillCacheUtils; import com.alibaba.csp.sentinel.annotation.SentinelResource; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController @RequestMapping("/seckill") @Api(tags = "提交秒杀订单") public class SeckillController { @Autowired private ISeckillService seckillService;
@Autowired private RedisTemplate redisTemplate;
@PostMapping("/{randCode}") @ApiOperation("随机码验证并提交订单") @ApiImplicitParam(value = "随机码",name = "randCode",required = true,dataType = "string") @PreAuthorize("hasRole('user')") @SentinelResource(value = "seckill", blockHandlerClass = SeckillBlockHandler.class,blockHandler = "seckillBlock", fallbackClass = SeckillFallback.class,fallback = "seckillFall") public JsonResult<SeckillCommitVO> commitSeckillOrder( @PathVariable String randCode, SeckillOrderAddDTO seckillOrderAddDTO ){ Long spuId = seckillOrderAddDTO.getSpuId(); String randCodeKey = SeckillCacheUtils.getRandCodeKey(spuId); if(redisTemplate.hasKey(randCodeKey)){ String redisRandCode = redisTemplate.boundValueOps(randCodeKey).get()+""; if(redisRandCode==null) throw new CoolSharkServiceException(ResponseCode.INTERNAL_SERVER_ERROR,"服务器内部错误,请联系客服"); if(!redisRandCode.equals(randCode)) throw new CoolSharkServiceException(ResponseCode.NOT_FOUND,"没有指定商品"); SeckillCommitVO seckillCommitVO = seckillService.commitSeckill(seckillOrderAddDTO); return JsonResult.ok(seckillCommitVO); } else { throw new CoolSharkServiceException(ResponseCode.NOT_FOUND,"没有指定商品"); } } }
|
启动Nacos\Seata\RabbitMQ\Redis\Sentinel
项目Leaf\product\passport\order\seckill
注意yml配置文件中的RabbitMQ的用户名和密码
为了方便测试:如果说已经购买过,就修改允许购买的数量 >1为 >100
为了方便测试:如果说没有库存,可以把判断库存的if注释掉
测试成功即可
还可以测试sentinel的限流
success成功信息的处理
开发持久层
我们要连接数据库,对这个表进行新增
还有对秒杀数据库sku库存的修改
1 2 3
| void updateReduceStockBySkuId(@Param("skuId") Long skuId, @Param("quantity") Integer quantity);
|
1 2 3 4 5 6 7 8 9
| <update id="updateReduceStockBySkuId"> update seckill_sku set seckill_stock=seckill_stock - #{quantity} where sku_id=#{skuId} </update>
|
下面再编写新增Success的方法
1 2 3 4 5
| @Repository public interface SuccessMapper { void saveSuccess(Success success); }
|
SuccessMapper.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| <insert id="saveSuccess"> insert into success( user_id, user_phone, sku_id, title, main_picture, seckill_price, quantity, bar_code, data, order_sn )values( #{userId}, #{userPhone}, #{skuId}, #{title}, #{mainPicture}, #{seckillPrice}, #{quantity}, #{barCode}, #{data}, #{orderSn} ) </insert>
|
开发消息的接收功能
我们当前触发新增Success的方法并不是常规的业务逻辑层
而是由RabbitMQ消息收发机制中接收消息的对象来调用
所有我们编写一个接收消息的监听器类来完成这个操作
创建consumer包,包中创建类SekillQueueConsumer代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component
@RabbitListener(queues = {RabbitMqComponentConfiguration.SECKILL_QUEUE}) public class SeckillQueueConsumer { @Autowired private SuccessMapper successMapper; @Autowired private SeckillSkuMapper skuMapper; @RabbitHandler public void process(Success success){ skuMapper.updateReduceStockBySkuId(success.getSkuId(),success.getQuantity()); successMapper.saveSuccess(success); } }
|
环境方面
Nacos\Sentinel\Seata\redis\RabbitMQ
服务方面
Leaf\product\order\seckill