天道酬勤,学无止境

RabbitMQ消息队列处理库存解锁及关闭订单问题

文章目录

  • 一、RabbitMQ延时队列
    • 消息的TTL
    • 死信
  • 二、实战
    • 延时关单
    • 规范设计
  • 三、消息队列处理库存解锁及关单
    • 1、流程分析
    • 2、库存微服务
      • 2.1 解锁库存配置
      • 2.2 解锁库存流程
      • 2.3 业务代码
      • 2.4 调试
  • 四、RMQ 延时队列处理关单及库存解锁整合
    • 1、流程分析
    • 2、订单关单
    • 3、订单释放和库存释放进行绑定
  • 五、消息丢失、重复、积压等解决方案
    • 1、消息丢失
    • 2、消息重复
    • 3、消息积压

一、RabbitMQ延时队列

RabbitMQ延时队列实现定时任务。
场景:
比如未付款订单,超过一定时间后,系统自动取消订单并释放占有的库存。
常用解决方案:
spring的schedule定时任务轮训数据库
缺点:
消耗系统内存,增加了数据库的压力、存在较大的时间误差
解决:
Rabbit的消息 TTL 和死信Exchange结合。

消息的TTL

消息的TTL(Time To Live)就是消息的存活时间,单位是毫秒。。
RabbitMQ 可以对队列消息分别设置TTL。

  • 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就是死了,称之为死信
  • 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration 字段或者 x-message-ttl 属性来设置时间,两者是一样的效果。

注意:延时消息放入到队列中,没有被任何消费者监听,如果监听就拿到了,也就被消费了,队列里边的消息只要一过设置的过期时间,就成了死信队列,服务器就会丢弃。
那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put(“x-message-ttl”, 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

这样所有被投递到该队列的消息都最多不会存活超过6s。
另一种方式便是针对每条消息设置TTL,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration(“6000”);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, “msg body”.getBytes());

这样这条消息的过期时间也被设置成了6s。
但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列没有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

死信

死信:Dead Letter Exchange(DLX)
一个消息在满足如下条件,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

  • 一个消息被Consumer拒收了,并且reject方法的参数里 requeue 是false。也就是说不会被放在队列里,被其他消费者使用。(basic.reject/basic.nack) requeue=false
  • 上面的消息的TTL到了,消息过期了。
  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由。

Dead Letter Exchange 其实就是一种普通的 exchange,和创建其他exchange一样。只是在某一个设置Dead Letter Exchange 的队列中有信息过期了,会自动触发消息的转发,发送到 Dead Letter Exhange中去。
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。
在这里插入图片描述

二、实战

延时关单

场景:用户下单,过了30分钟没有支付,系统会默认关闭该订单,以前可以用定时任务做,现在使用延时队列。
在这里插入图片描述

规范设计

设计建议规范(基于事件模型的交换机设计):
1、交换机命名:业务+exchange;交换机为Topic
2、路由键:事件.需要感知的业务(可以不写)
3、队列命名:事件+想要监听服务名+queue
4、绑定关系:事件.感知的业务(#)
整体业务设计:
在这里插入图片描述
按照上边的规范设计,对关单业务进行升级设计:
在这里插入图片描述
上图说明:交换机 order-event-exchange 绑定了一个延时队列order.delay.queue,路由key是 order.create.order, 当创建了一个订单时,会发消息到该延时队列,等到TTL过期,变为死信,会自动触发消息的转发,发送到 Dead Letter Exhange(order-event-exchange) 中去,注意死信路由是 order.release.order,然后exchange根据路由key order.release.order转发消息到 order.release.order.queue队列,客户端监听该队列获取消息。
根据上图的业务设计分析,需要创建两个队列,一个交换机,和两个绑定。
gulimall-order/xxx/order/config/MyMQConfig.java

package com.atguigu.gulimall.order.config;

import com.atguigu.gulimall.order.entity.OrderEntity;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.HashMap;

/**
 * @author: kaiyi
 * @create: 2020-09-16 13:53
 */
@Configuration
public class MyMQConfig {

  /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */

  /**
   * 客户端监听队列(测试)
   * @param orderEntity
   * @param channel
   * @param message
   * @throws IOException
   */
  @RabbitListener(queues = "order.release.order.queue")
  public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {

    System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn());
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

  }

  /**
   * 死信队列
   *
   * @return
   */
  @Bean
  public Queue orderDelayQueue(){

     /*
            Queue(String name,  队列名字
            boolean durable,  是否持久化
            boolean exclusive,  是否排他
            boolean autoDelete, 是否自动删除
            Map<String, Object> arguments) 属性
         */
    HashMap<String, Object> arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange", "order-event-exchange");
    arguments.put("x-dead-letter-routing-key", "order.release.order");
    arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟

    Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
    return queue;
  }

  /**
   * 普通队列
   *
   * @return
   */
  @Bean
  public Queue orderReleaseQueue(){

    Queue queue = new Queue("order.release.order.queue", true, false, false);
    return queue;
  }

  /**
   * TopicExchange
   *
   * @return
   */
  @Bean
  public Exchange orderEventExchange(){
    /*
     *   String name,
     *   boolean durable,
     *   boolean autoDelete,
     *   Map<String, Object> arguments
     * */

    return new TopicExchange("order-event-exchange", true, false);
  }

  @Bean
  public Binding orderCreateBinding() {
    /*
     * String destination, 目的地(队列名或者交换机名字)
     * DestinationType destinationType, 目的地类型(Queue、Exhcange)
     * String exchange,
     * String routingKey,
     * Map<String, Object> arguments
     * */
    return new Binding("order.delay.queue",
        Binding.DestinationType.QUEUE,
        "order-event-exchange",
        "order.create.order",  // 路由key一般为事件名
        null);
  }

  @Bean
  public Binding orderReleaseBinding() {

    return new Binding("order.release.order.queue",
        Binding.DestinationType.QUEUE,
        "order-event-exchange",
        "order.release.order",
        null);
  }

}

然后在控制器创建测试消息:
gulimall-order/xxx/order/web/HelloController.java

**
 * @author: kaiyi
 * @create: 2020-09-12 18:09
 */
@Controller
public class HelloController {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @ResponseBody
  @GetMapping(value = "/test/createOrder")
  public String createOrderTest() {

    //订单下单成功
    OrderEntity orderEntity = new OrderEntity();
    orderEntity.setOrderSn(UUID.randomUUID().toString());
    orderEntity.setModifyTime(new Date());

    //给MQ发送消息
    rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);

    return "ok";
  }
}

然后访问该路径 http://order.gulimall.com/test/createOrder, 发送消息,然后去RMQ管理界面可以看到创建的消息已经成功了。
交换机:
在这里插入图片描述
交换机绑定的队列(路由key):
在这里插入图片描述
队列:
在这里插入图片描述
可以看到第一个队列是死信队列,第二个事普通队列
收到的消息为实体对象json:
在这里插入图片描述
控制器输出的监控信息:

收到过期的订单信息:准备关闭订单321c3329-d57a-4613-a4ff-331066d4105a
收到过期的订单信息:准备关闭订单44fcf65f-1e7a-40c6-8336-a6c60362920b

三、消息队列处理库存解锁及关单

1、流程分析

在这里插入图片描述
在这里插入图片描述

2、库存微服务

2.1 解锁库存配置

1、库存微服务gulimall-ware 引入高级消息队列amqp依赖:
gulimall-ware/pom.xml

<!-- 使用高级消息队列来解决分布式事务一致性 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

2、添加RMQ配置
gulimall-ware/src/main/resources/application.properties

# ===== RabbitMQ配置 ======
spring.rabbitmq.host=192.168.10.10
spring.rabbitmq.port=5672
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3、创建RMQ配置文件
gulimall-ware/xxx/ware/config/MyRabbitMQConfig.java

package com.atguigu.gulimall.ware.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * RMQ配置
 *
 * @author: kaiyi
 * @createTime: 2020-09-15 16:40
 **/

@Configuration
public class MyRabbitMQConfig {

    /**
     * 使用JSON序列化机制,进行消息转换
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    // @RabbitListener(queues = "stock.release.stock.queue")
    // public void handle(Message message) {
    //
    // }

    /**
     * 库存服务默认的交换机
     * @return
     */
    @Bean
    public Exchange stockEventExchange() {
        //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
        TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false);
        return topicExchange;
    }

    /**
     * 普通队列
     * @return
     */
    @Bean
    public Queue stockReleaseStockQueue() {
        //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        Queue queue = new Queue("stock.release.stock.queue", true, false, false);
        return queue;
    }

    /**
     * 延迟队列
     * @return
     */
    @Bean
    public Queue stockDelay() {

        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "stock-event-exchange");
        arguments.put("x-dead-letter-routing-key", "stock.release");
        // 消息过期时间 2分钟
        arguments.put("x-message-ttl", 120000);

        Queue queue = new Queue("stock.delay.queue", true, false, false,arguments);
        return queue;
    }

    /**
     * 交换机与普通队列绑定
     * @return
     */
    @Bean
    public Binding stockLocked() {
        //String destination, DestinationType destinationType, String exchange, String routingKey,
        //          Map<String, Object> arguments
        Binding binding = new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.release.#",
                null);

        return binding;
    }

    /**
     * 交换机与延迟队列绑定
     * @return
     */
    @Bean
    public Binding stockLockedBinding() {
        return new Binding("stock.delay.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.locked",
                null);
    }
}

2.2 解锁库存流程

解锁库存流程:

在这里插入图片描述
可以看到,在锁定库存时,我们增加了库存工作单,用来记录库存锁定的明细记录,如果库存锁定异常,则会回滚,该表不会有数据记录,如果锁定成功,则会有具体的锁定记录,锁定成功后会发送消息到延时队列,过段时间会根据订单创建的状态(订单取消或订单未创建成功)来解锁库存。
解锁库存具体步骤:
在这里插入图片描述

2.3 业务代码

锁库存
锁库存,并发送消息到延时队列,方法 orderLockStock(WareSkuLockVo vo)
gulimall-ware/xxx/ware/service/impl/WareSkuServiceImpl.java

package com.atguigu.gulimall.ware.service.impl;

import com.alibaba.fastjson.TypeReference;
import com.atguigu.common.exception.NoStockException;
import com.atguigu.common.to.mq.StockDetailTo;
import com.atguigu.common.to.mq.StockLockedTo;
import com.atguigu.common.utils.R;
import com.atguigu.gulimall.ware.entity.WareOrderTaskDetailEntity;
import com.atguigu.gulimall.ware.entity.WareOrderTaskEntity;
import com.atguigu.gulimall.ware.feign.OrderFeignService;
import com.atguigu.gulimall.ware.feign.ProductFeignService;
import com.atguigu.gulimall.ware.service.WareOrderTaskDetailService;
import com.atguigu.gulimall.ware.service.WareOrderTaskService;
import org.springframework.transaction.annotation.Transactional;

@Service("wareSkuService")
public class WareSkuServiceImpl extends ServiceImpl<WareSkuDao, WareSkuEntity> implements WareSkuService {

    @Autowired
    WareSkuDao wareSkuDao;

    @Autowired
    ProductFeignService productFeignService;

    @Autowired
    private WareOrderTaskService wareOrderTaskService;

    @Autowired
    private WareOrderTaskDetailService wareOrderTaskDetailService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private OrderFeignService orderFeignService;
    /**
     * 为某个订单锁定库存
     * @param vo
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public boolean orderLockStock(WareSkuLockVo vo) {

        /**
         * 保存库存工作单详情信息
         * 追溯
         */
        WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();
        wareOrderTaskEntity.setOrderSn(vo.getOrderSn());
        wareOrderTaskEntity.setCreateTime(new Date());
        wareOrderTaskService.save(wareOrderTaskEntity);

        //1、按照下单的收货地址,找到一个就近仓库,锁定库存
        //2、找到每个商品在哪个仓库都有库存
        List<OrderItemVo> locks = vo.getLocks();

        List<SkuWareHasStock> collect = locks.stream().map((item) -> {
            SkuWareHasStock stock = new SkuWareHasStock();
            Long skuId = item.getSkuId();
            stock.setSkuId(skuId);
            stock.setNum(item.getCount());
            //查询这个商品在哪个仓库有库存
            List<Long> wareIdList = wareSkuDao.listWareIdHasSkuStock(skuId);
            stock.setWareId(wareIdList);

            return stock;
        }).collect(Collectors.toList());

        //2、锁定库存
        for (SkuWareHasStock hasStock : collect) {
            boolean skuStocked = false;
            Long skuId = hasStock.getSkuId();
            List<Long> wareIds = hasStock.getWareId();

            if (org.springframework.util.StringUtils.isEmpty(wareIds)) {
                //没有任何仓库有这个商品的库存,抛出异常,前边已经锁定的库存也一起会回滚
                throw new NoStockException(skuId);
            }

            //1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ
            //2、锁定失败。前面保存的工作单信息都回滚了。发送出去的消息,即使要解锁库存,由于在数据库查不到指定的id,所有就不用解锁
            for (Long wareId : wareIds) {
                //锁定成功就返回1,失败就返回0
                Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum());
                if (count == 1) {
                    skuStocked = true;
                    WareOrderTaskDetailEntity taskDetailEntity = WareOrderTaskDetailEntity.builder()
                        .skuId(skuId)
                        .skuName("")
                        .skuNum(hasStock.getNum())
                        .taskId(wareOrderTaskEntity.getId())
                        .wareId(wareId)
                        .lockStatus(1)
                        .build();

                    wareOrderTaskDetailService.save(taskDetailEntity);

                    //TODO 告诉MQ库存锁定成功
                    StockLockedTo lockedTo = new StockLockedTo();
                    lockedTo.setId(wareOrderTaskEntity.getId());
                    StockDetailTo detailTo = new StockDetailTo();
                    BeanUtils.copyProperties(taskDetailEntity,detailTo);
                    lockedTo.setDetailTo(detailTo);
                    rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo);

                    break;
                } else {
                    //当前仓库锁失败,重试下一个仓库
                }
            }

            if (skuStocked == false) {
                //当前商品所有仓库都没有锁住
                throw new NoStockException(skuId);
            }
        }

        //3、肯定全部都是锁定成功的
        return true;
    }

    @Override
    public void unlockStock(StockLockedTo to) {
        //库存工作单的id
        StockDetailTo detail = to.getDetailTo();
        Long detailId = detail.getId();

        /**
         * 解锁
         * 1、查询数据库关于这个订单锁定库存信息
         *   有:证明库存锁定成功了
         *      解锁:订单状况
         *          1、没有这个订单,必须解锁库存
         *          2、有这个订单,不一定解锁库存
         *              订单状态:已取消:解锁库存
         *                      已支付:不能解锁库存
         */
        WareOrderTaskDetailEntity taskDetailInfo = wareOrderTaskDetailService.getById(detailId);
        if (taskDetailInfo != null) {
            //查出wms_ware_order_task工作单的信息
            Long id = to.getId();
            WareOrderTaskEntity orderTaskInfo = wareOrderTaskService.getById(id);
            //获取订单号查询订单状态
            String orderSn = orderTaskInfo.getOrderSn();
            //远程查询订单信息
            R orderData = orderFeignService.getOrderStatus(orderSn);
            if (orderData.getCode() == 0) {
                //订单数据返回成功
                OrderVo orderInfo = orderData.getData("data", new TypeReference<OrderVo>() {});

                //判断订单状态是否已取消或者支付或者订单不存在
                if (orderInfo == null || orderInfo.getStatus() == 4) {
                    //订单已被取消,才能解锁库存
                    if (taskDetailInfo.getLockStatus() == 1) {
                        //当前库存工作单详情状态1,已锁定,但是未解锁才可以解锁
                        unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId);
                    }
                }
            } else {
                //消息拒绝以后重新放在队列里面,让别人继续消费解锁
                //远程调用服务失败
                throw new RuntimeException("远程调用服务失败");
            }
        } else {
            //无需解锁
        }

    }

    /**
     * 解锁库存的方法
     * @param skuId
     * @param wareId
     * @param num
     * @param taskDetailId
     */
    public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) {

        //库存解锁
        wareSkuDao.unLockStock(skuId,wareId,num);

        //更新工作单的状态
        WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity();
        taskDetailEntity.setId(taskDetailId);
        taskDetailEntity.setLockStatus(2); //变为已解锁
        wareOrderTaskDetailService.updateById(taskDetailEntity);

    }

    @Data
    class SkuWareHasStock {
        private Long skuId;
        private Integer num;
        private List<Long> wareId;
    }
}

监听队列:
gulimall-ware/xxx/ware/listener/StockReleaseListener.java

package com.atguigu.gulimall.ware.listener;

import com.atguigu.common.to.mq.StockLockedTo;
import com.atguigu.gulimall.ware.service.WareSkuService;
import com.rabbitmq.client.Channel;

/**
 * 库存解锁监听
 *
 * @desc
 * 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。
 *
 * @author: kaiyi
 * @create: 2020-09-16 19:01
 */
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {

  @Autowired
  private WareSkuService wareSkuService;

  /**
   * 1、库存自动解锁
   *  下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁
   *
   *  2、订单失败
   *      库存锁定失败
   *
   *   只要解锁库存的消息失败,一定要告诉服务解锁失败
   */
  @RabbitHandler
  public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
    log.info("******收到解锁库存的信息******");
    try {

      //当前消息是否被第二次及以后(重新)派发过来了
      // Boolean redelivered = message.getMessageProperties().getRedelivered();

      //解锁库存
      wareSkuService.unlockStock(to);
      // 手动删除消息
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (Exception e) {
      // 解锁失败 将消息重新放回队列,让别人消费
      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    }
  }

}

上边就是创建订单后锁库存,发消息到延时队列,监听队列,创建的订单出现异常是否来解锁库存,手动确认消息的核心代码逻辑。

2.4 调试

订单结算页,购买了一件商品。
在这里插入图片描述
在提交订单时,远程锁库存成功后模拟代码异常:
在这里插入图片描述
提交订单,由于异常会回滚订单并且回退到结算页,连续提交三次,我们可以看到延时队列里边有三条信息。
在这里插入图片描述
库存表wms_ware_sku,原来锁定了 3 件库存,现在库存锁定为6,因为库存是远程锁定的,所以,主程序事务回滚对远程的不起作用,不过在锁定库存成功时发库存锁定成功的消息,后边通过消息会检查是否释放库存。
在这里插入图片描述
库存工作单主表:
在这里插入图片描述
库存工作单明细表,我们可以看到新增的3条记录,明细状态lock_status(1-已锁定 2-已解锁 3-扣减)
在这里插入图片描述
订单服务订单表:
在这里插入图片描述
然后等到消息过期进入死信路由,TTL后客户端监听消息判断是否释放库存,消息在判断的时候先根据生成的订单号远程查询gulimall-order是否存在对应的订单,如果不存在,则直接释放锁定的库存,因为在生成订单的时候抛出异常生成的订单回滚了,所以 oms_order 表不存在订单,这时监听的消息拿到延时消息后,做完判断后会触发解锁库存的动作。
过了几分钟后,我们可以看到消息已经被消费了,并且锁定的库存也释放了,变回原来的 3 件。
消息队列:
在这里插入图片描述
库存表:
在这里插入图片描述
可以看到,RMQ在解决分布式事务一致性问题上非常强大,不仅实现了解耦,而且还保证了可靠消息+最终一致性。

四、RMQ 延时队列处理关单及库存解锁整合

1、流程分析

在这里插入图片描述
步骤:

  • 1、订单创建成功,发送消息给MQ
  • 2、订单服务订单关单监听器(死信之后判断是否关单)
  • 3、订单关单成功后发消息给MQ(订单释放直接和库存释放进行绑定)
  • 4、库存服务是释放库存监听器监听是否解锁库存队列(stock.release.stock.queue)
  • 5、库存解锁处理逻辑

这里出现了两个交换机绑定同一个队列的情况,即订单的交换机和库存的队列绑定在一起了。

2、订单关单

1、订单释放直接和库存释放进行绑定
gulimall-order/xxx/order/config/MyMQConfig.java

package com.atguigu.gulimall.order.config;

import com.atguigu.gulimall.order.entity.OrderEntity;
import com.rabbitmq.client.AMQP;

/**
 * @author: kaiyi
 * @create: 2020-09-16 13:53
 */
@Configuration
public class MyMQConfig {

  /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */

  /**
   * 客户端监听队列(测试)
   * @param orderEntity
   * @param channel
   * @param message
   * @throws IOException
   */
  /*
  @RabbitListener(queues = "order.release.order.queue")
  public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {

    System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn());
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

  }
   */

  /**
   * 死信队列
   *
   * @return
   */
  @Bean
  public Queue orderDelayQueue(){

     /*
            Queue(String name,  队列名字
            boolean durable,  是否持久化
            boolean exclusive,  是否排他
            boolean autoDelete, 是否自动删除
            Map<String, Object> arguments) 属性
         */
    HashMap<String, Object> arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange", "order-event-exchange");   // 信死了交给哪个交换机
    arguments.put("x-dead-letter-routing-key", "order.release.order"); // 信死了交给哪个路由key
    arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟

    Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
    return queue;
  }

  /**
   * 普通队列
   *
   * @return
   */
  @Bean
  public Queue orderReleaseQueue(){

    Queue queue = new Queue("order.release.order.queue", true, false, false);
    return queue;
  }

  /**
   * TopicExchange
   *
   * @return
   */
  @Bean
  public Exchange orderEventExchange(){
    /*
     *   String name,
     *   boolean durable,
     *   boolean autoDelete,
     *   Map<String, Object> arguments
     * */

    return new TopicExchange("order-event-exchange", true, false);
  }

  @Bean
  public Binding orderCreateBinding() {
    /*
     * String destination, 目的地(队列名或者交换机名字)
     * DestinationType destinationType, 目的地类型(Queue、Exhcange)
     * String exchange,
     * String routingKey,
     * Map<String, Object> arguments
     * */
    return new Binding("order.delay.queue",
        Binding.DestinationType.QUEUE,
        "order-event-exchange",
        "order.create.order",  // 路由key一般为事件名
        null);
  }

  @Bean
  public Binding orderReleaseBinding() {

    return new Binding("order.release.order.queue",
        Binding.DestinationType.QUEUE,
        "order-event-exchange",
        "order.release.order",
        null);
  }

  /**
   * 订单释放直接和库存释放进行绑定
   * @return
   */
  @Bean
  public Binding orderReleaseOtherBinding() {

    return new Binding("stock.release.stock.queue",
        Binding.DestinationType.QUEUE,
        "order-event-exchange",
        "order.release.other.#",
        null);
  }
}

2、提交订单增加订单创建成功,发送消息给MQ
gulimall-order/xxx/order/service/impl/OrderServiceImpl.java

/**
     * 提交订单
     * @param vo
     * @return
     */
    // @Transactional(isolation = Isolation.READ_COMMITTED) 设置事务的隔离级别
    // @Transactional(propagation = Propagation.REQUIRED)   设置事务的传播级别
    @Transactional(rollbackFor = Exception.class)
    // @GlobalTransactional(rollbackFor = Exception.class)
    @Override
    public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {

        confirmVoThreadLocal.set(vo);

        SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo();
        //去创建、下订单、验令牌、验价格、锁定库存...

        //获取当前用户登录的信息
        MemberResponseVo memberResponseVo = LoginUserInterceptor.loginUser.get();
        responseVo.setCode(0);

        //1、验证令牌是否合法【令牌的对比和删除必须保证原子性】
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        String orderToken = vo.getOrderToken();

        //通过lua脚本原子验证令牌和删除令牌
        Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
            Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()),
            orderToken);

        if (result == 0L) {
            //令牌验证失败
            responseVo.setCode(1);
            return responseVo;
        } else {
            //令牌验证成功
            //1、创建订单、订单项等信息
            OrderCreateTo order = createOrder();

            //2、验证价格
            BigDecimal payAmount = order.getOrder().getPayAmount();
            BigDecimal payPrice = vo.getPayPrice();

            if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {
                //金额对比
                //TODO 3、保存订单
                saveOrder(order);

                //4、库存锁定,只要有异常,回滚订单数据
                //订单号、所有订单项信息(skuId,skuNum,skuName)
                WareSkuLockVo lockVo = new WareSkuLockVo();
                lockVo.setOrderSn(order.getOrder().getOrderSn());

                //获取出要锁定的商品数据信息
                List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map((item) -> {
                    OrderItemVo orderItemVo = new OrderItemVo();
                    orderItemVo.setSkuId(item.getSkuId());
                    orderItemVo.setCount(item.getSkuQuantity());
                    orderItemVo.setTitle(item.getSkuName());
                    return orderItemVo;
                }).collect(Collectors.toList());
                lockVo.setLocks(orderItemVos);

                //TODO 调用远程锁定库存的方法
                //出现的问题:扣减库存成功了,但是由于网络原因超时,出现异常,导致订单事务回滚,库存事务不回滚(解决方案:seata)
                //为了保证高并发,不推荐使用seata,因为是加锁,并行化,提升不了效率,可以发消息给库存服务
                R r = wmsFeignService.orderLockStock(lockVo);
                if (r.getCode() == 0) {
                    //锁定成功
                    responseVo.setOrder(order.getOrder());
                    // int i = 10/0;   // 抛出异常,测试远程回滚

                    //TODO 订单创建成功,发送消息给MQ
                    rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());

                    //删除购物车里的数据
                    redisTemplate.delete(CartConstant.CART_PREFIX + memberResponseVo.getId());
                    return responseVo;
                } else {
                    //锁定失败
                    String msg = (String) r.get("msg");
                    throw new NoStockException(msg);
                    // responseVo.setCode(3);
                    // return responseVo;
                }

            } else {
                responseVo.setCode(2);
                return responseVo;
            }
        }
    }

3、关单监听
gulimall-order/xxx/order/listener/OrderCloseListener.java

package com.atguigu.gulimall.order.listener;

import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.service.OrderService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * 关单监听
 *
 * @author: kaiyi
 * @create: 2020-09-17 11:01
 */
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {

  @Autowired
  private OrderService orderService;

  @RabbitHandler
  public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
    System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());
    try {
      orderService.closeOrder(orderEntity);
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (Exception e) {
      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    }

  }

}

4、关单成功给库存服务发送MQ消息
gulimall-order/xxx/order/service/impl/OrderServiceImpl.java

/**
     * 关闭订单
     * @param orderEntity
     */
    @Override
    public void closeOrder(OrderEntity orderEntity) {

        //关闭订单之前先查询一下数据库,判断此订单状态是否已支付
        OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>().
            eq("order_sn",orderEntity.getOrderSn()));

        if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {
            //代付款状态进行关单
            OrderEntity orderUpdate = new OrderEntity();
            orderUpdate.setId(orderInfo.getId());
            orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode());
            this.updateById(orderUpdate);

            // 发送消息给MQ
            OrderTo orderTo = new OrderTo();
            BeanUtils.copyProperties(orderInfo, orderTo);

            try {
                //TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息
                rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
            } catch (Exception e) {
                //TODO 定期扫描数据库,重新发送失败的消息
            }
        }
    }

3、订单释放和库存释放进行绑定

1、库存服务-库存释放监听器增加订单关单库存释放处理方法
gulimall-ware/xxx/ware/listener/StockReleaseListener.java

package com.atguigu.gulimall.ware.listener;

import com.atguigu.common.to.OrderTo;
import com.atguigu.common.to.mq.StockLockedTo;
/**
 * 库存解锁监听
 *
 * @desc
 * 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。
 *
 * @author: kaiyi
 * @create: 2020-09-16 19:01
 */
@Slf4j
@RabbitListener(queues = "stock.release.stock.queue")
@Service
public class StockReleaseListener {

  @Autowired
  private WareSkuService wareSkuService;

  /**
   * 1、库存自动解锁
   *  下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁
   *
   *  2、订单失败
   *      库存锁定失败
   *
   *   只要解锁库存的消息失败,一定要告诉服务解锁失败
   */
  @RabbitHandler
  public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
    log.info("******收到解锁库存的信息******");
    try {

      //当前消息是否被第二次及以后(重新)派发过来了
      // Boolean redelivered = message.getMessageProperties().getRedelivered();

      //解锁库存
      wareSkuService.unlockStock(to);
      // 手动删除消息
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (Exception e) {
      // 解锁失败 将消息重新放回队列,让别人消费
      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    }
  }

  /**
   * 订单关单库存释放
   *
   * @param orderTo
   * @param message
   * @param channel
   * @throws IOException
   */
  @RabbitHandler
  public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {

    log.info("******收到订单关闭,准备解锁库存的信息******");

    try {
      wareSkuService.unlockStock(orderTo);

      // 手动删除消息
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (Exception e) {
      // 解锁失败 将消息重新放回队列,让别人消费
      channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
    }
  }
}

这个监听器既可以处理库存解锁又可以处理订单关单的处理业务,根据参数来决定具体调用哪一个,这是一个重载。

2、具体解锁库存实现
gulimall-ware/xxx/ware/service/impl/WareSkuServiceImpl.java

/**
     * 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理
     * 导致卡顿的订单,永远都不能解锁库存
     * @param orderTo
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void unlockStock(OrderTo orderTo) {

        String orderSn = orderTo.getOrderSn();
        //查一下最新的库存解锁状态,防止重复解锁库存
        WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);

        //按照工作单的id找到所有 没有解锁的库存,进行解锁
        Long id = orderTaskEntity.getId();
        List<WareOrderTaskDetailEntity> list = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>()
            .eq("task_id", id).eq("lock_status", 1));

        for (WareOrderTaskDetailEntity taskDetailEntity : list) {
            unLockStock(taskDetailEntity.getSkuId(),
                taskDetailEntity.getWareId(),
                taskDetailEntity.getSkuNum(),
                taskDetailEntity.getId());
        }

    }

五、消息丢失、重复、积压等解决方案

高并发场景的分布式事务,我们采用柔性事务+可靠消息+最终一致性方案(异步确保型),可靠性是最重要的,那么如何保证消息的可靠性呢?

1、消息丢失

1、消息发送出去,由于网络问题没有抵达服务器

  • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有容错机制,可记录到数据库,采用定期扫描重发的方式。
  • 做好日志记录,每个消息状态是否都被服务器收到都应该记录
  • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发

代码示例:

/**
     * 关闭订单
     * @param orderEntity
     */
    @Override
    public void closeOrder(OrderEntity orderEntity) {

        //关闭订单之前先查询一下数据库,判断此订单状态是否已支付
        OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>().
            eq("order_sn",orderEntity.getOrderSn()));

        if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {
            //代付款状态进行关单
            OrderEntity orderUpdate = new OrderEntity();
            orderUpdate.setId(orderInfo.getId());
            orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode());
            this.updateById(orderUpdate);

            // 发送消息给MQ
            OrderTo orderTo = new OrderTo();
            BeanUtils.copyProperties(orderInfo, orderTo);

            try {
                //TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息
                rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
            } catch (Exception e) {
                //TODO 定期扫描数据库,重新发送失败的消息
                                // while() 重试次数
            }
        }
    }

创建消息日志记录表:
在这里插入图片描述
2、消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。

  • publisher 也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

生产者消息确认回调应该增加日志记录,确认回调成功后修改记录日志的状态:
gulimall-order/xxx/order/config/MyRabbitConfig.java

/**
   * 定制RabbitTemplate
   * 1、服务收到消息就会回调
   * 1、spring.rabbitmq.publisher-confirms: true
   * 2、设置确认回调
   * 2、消息正确抵达队列就会进行回调
   * 1、spring.rabbitmq.publisher-returns: true
   * spring.rabbitmq.template.mandatory: true
   * 2、设置确认回调ReturnCallback
   * <p>
   * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
   */
  // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
  public void initRabbitTemplate() {

    /**
     * 1、只要消息抵达Broker就ack=true
     * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
     * ack:消息是否成功收到
     * cause:失败的原因
     */
    //设置确认回调
    rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
      /**
       * 1、做好消息确认机制(publisher,consumer【手动ack】】)
       * 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍
       */
       // 服务器收到生产者发送的消息了
             // 修改消息的状态
      System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
    });

    /**
     * 2、只要消息没有投递给指定的队列,就触发这个失败回调
     * message:投递失败的消息详细信息
     * replyCode:回复的状态码
     * replyText:回复的文本内容
     * exchange:当时这个消息发给哪个交换机
     * routingKey:当时这个消息用哪个路邮键
     */
    rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
      System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
          "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
    });

  }

3、自动ACK的状态下。消费者收到消息,但没来得及消费然后宕机。

  • 一定开启手动ACK,消费成功才移除,失败或者还没来得及处理就 noAck并重新入队。

防止消息丢失记住这两条:

1、做好消息确认机制(publisher,consumer【手动ack】】)
2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍

2、消息重复

出现重复的几种情况

  • 1、消息消费成功,事务已经提交,ack时,机器宕机,导致没有ack成功。
    • Broker的消息重新由 unack 变为ready,并发送给其他消费者
  • 2、消息消费失败,由于重试机制,自动又将消息发送出去。
  • 3、成功消费,ack时宕机,消息又unack变为ready,Broker又重新发送

解决方案

  • 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标识。
  • 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用再处理。
  • rabbitMQ的每一个消息都有 redilivered字段,可以获取是否被重新投递过来的,而不是第一次被投递过来的。

3、消息积压

  • 消费者宕机
  • 消费者消费能力不足
  • 发送者发送流量太大
    • 上线更多的消费者,进行正常的消费
    • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

受限制的 HTML

  • 允许的HTML标签:<a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • 自动断行和分段。
  • 网页和电子邮件地址自动转换为链接。

相关推荐
  • 本地事务、分布式事务以及解决方案
    本地事务 springboot本地事务失效的问题 解决方案: 分布式事务出现的原因 分布式一致性动画演示:http://thesecretlivesofdata.com/raft/ raft是一个实现分布式一致性的协议,主要有领导选举、日志复制两个机制维持数据的一致性 每个节点都有三个状态: followercandidateleader 以及两个时间: 自旋时间:从follower到candidate的时间(150ms-300ms)心跳时间:leader向follow消息发送心跳的时间,比自旋时间小得多 选举leader机制: 默认都以follower状态启动,刚开始的时候,每个节点会随机一个自旋时间,自选时间的长短在(150ms-300ms)由于自旋时间不一样,最先自旋完成的节点被称为candidate,开启投票选举,自己先自投一次,然后向其他follower节点发送消息,向自己投票,一个follower节点只能够投一票,投完票的节点会重新开始自旋,当candidate节点收到票数超过总节点一半就变成leader如果两个节点或者多个节点同时自旋完成,成为candidate,并获取相同的follower的投票,回重新进行第二轮选举,每个节点都重新开始自旋,直到一个candidate获取的偷票超过大多数,成为leader当leader选举完成之后
  • SpringCloud - 商城高级篇(下)
    文章目录 SpringCloud - 商城项目(下)1. 商城业务 - 认证服务1.1 环境搭建1.2 开通阿里云的短信服务1.3 整合阿里云短信服务1.4 发送验证码并防刷1.5 注册功能1.6 简单登录功能1.7 OAuth2.01.8 微博社交登录1.9 整合微博社交登录1.10 分布式Sesison问题1.11 整合SpringSession1.12 测试单点登录框架1.13 单点登录 2. 商城业务 - 购物车2.1 环境搭建2.2 购物车模型分析2.3 ThreadLocal用户身份鉴别2.4 添加商品到购物车2.5 获取购物车2.6 购物车操作 3. 商城业务 - 消息队列3.1 RabbitMQ简介与安装3.2 Exchange类型3.3 Spring Boot整合Rabbit MQ3.4 RabbitMQ消息确认机制 4. 商城业务 - 订单服务4.1 环境搭建4.2 订单登录拦截4.3 订单结算页4.4 接口幂等性4.5 令牌防止多次提交订单4.6 提交订单 5. 商城业务 - 分布式事务5.1 本地事务在分布式下的问题5.2 本地事务5.3 分布式事务理论5.4 分布式事务常见解决方案5.5 分布式事务Seata5.6 最终一致性库存解锁逻辑5.7 RabbitMQ延时队列 6. 商城业务 - 库存的解锁6.1 创建路由交换机和队列6.2 库存自动解锁6.3
  • 分布式事务——最终一致性的保证
    分布式事务可以使用seata实现,但是,对于高并发的场景,使用seata会感觉稍慢,尤其是对一致性要求不那么高的业务完全可以不需要使用seata,这时候,我们可以考虑最终一致性的方案。通过消息队列机制来保证最终一致性,即可。 思想:在MQ中新建两个队列,一个死信队列,一个普通队列,让同一个交换机绑定这两个队列,死信队列不处理任何消息,只是用于存放过期的消息,当指定的时间到了,自动路由到普通队列,在某个服务中单独监听这个队列的消息并处理该消息。 实现步骤: 1. 准备工作 1)引入MQ <!-- 引入mq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 2)mq相关配置文件 # 配置MQ基本信息 spring.rabbitmq.host=192.168.56.10 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/ 3)mq配置类(指定序列化机制、死信队列等的相关绑定关系) package com.bjc.gulimall.ware.config; import org.springframework.amqp.core.*
  • RabbitMQ死信队列对超时未支付订单进行交易关闭处理
    一、前言介绍 死信队列由三大核心组件组成:死信交换机+死信路由+TTL(消息存活时间~非必需的),而死信队列又可以由“面向生产者的基本交换机+基本路由”绑定而成,故而生产者首先是将消息发送至“基本交换机+基本路由”所绑定而成的消息模型中,即间接性地进入到死信队列中,当过了TTL,消息将“挂掉”,从而进入下一个中转站,即“面下那个消费者的死信交换机+死信路由”所绑定而成的消息模型中。如下图所示: 二、具体实现 1、项目目录结构 2、引入相关依赖 <!--SpringBoot 整合RabbitMq--> <dependencies> <!--springboot整合 mybatis --> <!--mybatis-plus--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.2</version> </dependency> <!--mysql依赖--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> </dependencies> 3、application.yml
  • 2020谷粒商城微服务kubernetes分布式实战
    目录:/IT-2417-谷粒商城2020分布式 [18.1G] ┣━━1.分布式基础篇 [7.4G] ┃ ┣━━视频 [7.4G] ┃ ┃ ┣━━[IT视频自学网-www.itspzx.com]_收集整理.url [175B] ┃ ┃ ┣━━01、简介-项目介绍 [96.4M] ┃ ┃ ┣━━02、简介-项目整体效果展示 [148.8M] ┃ ┃ ┣━━03、简介-分布式基础概念 [63.8M] ┃ ┃ ┣━━04、简介-项目微服务架构图 [45.9M] ┃ ┃ ┣━━05、简介-项目微服务划分图 [27.8M] ┃ ┃ ┣━━06、环境-使用vagrant快速创建linux虚拟机 [95.1M] ┃ ┃ ┣━━07、环境-虚拟机网络设置 [30.9M] ┃ ┃ ┣━━08、环境-linux安装docker [43.1M] ┃ ┃ ┣━━09、环境-配置docker阿里云镜像加速 [11.2M] ┃ ┃ ┣━━10、环境-docker安装mysql [71.6M] ┃ ┃ ┣━━11、环境-docker安装redis [51.7M] ┃ ┃ ┣━━12、环境-开发工具&环境安装配置 [60.4M] ┃ ┃ ┣━━13、环境-配置git-ssh [28.2M] ┃ ┃ ┣━━14、环境-项目结构创建&提交到码云 [57.1M] ┃ ┃ ┣━━15、环境-数据库初始化 [57.9M]
  • 本地事务与分布式事务
    本地事务与分布式事务 本地事务 事务的基本性质 事务的概念:事务是逻辑上一组操作,组成这组操作各个逻辑单元,要么一起成功,要么一起失败。 数据库事务的几个特性:原子性(Atomicity)、一致性( Consistency )、隔离性或独立性( lsolation)和持久性(Durabilily),简称就是ACID; 原子性:一系列的操作整体不可拆分,要么同时成功,要么同时失败 一 致性:数据在事务的前后,业务整体一致。 转账。A:1000; B:1000; 转200 事务成功; A: 800 B: 1200 隔离性:事务之间互相隔离。 持久性:一旦事务成功,数据一定会落盘在数据库。 在以往的单体应用中,我们多个业务操作使用同一条连接操作不同的数据表,一旦有异常,我们可以很容易的整体回滚; Business:我们具体的业分代码 Storage:库存业务代码;扣库存 Order:订单业务代码;保存订单 Account:账号业务代码;减账户余额 比如买东西业务,扣库存,下订单,账户扣款,是一个整体;必须同时成功或者失败一个事务开始,代表以下的所有操作都在同一个连接里面; 事务的隔离级别 概念 READ UNCOMMITTED (读未提交) 该隔离级别的事务会读到其它未提交事务的数据,此现象也称之为脏读。READ COMMITTED (读提交) 一个事务可以读取另一个已提交的事务
  • 消息队列应用应用场景与技术选型(ActiveMQ、RabbitMQ、RocketMQ、Kafka)
    一、消息队列概述消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ二、消息队列应用场景以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋和消息通讯四个场景。2.1异步处理场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?引入消息队列,将不是必须的业务逻辑,异步处理
  • 图文详解:阿里宠儿【小兔】RabbitMQ的养成攻略
    大家好,我是小羽今天给大家带来的的是关于小兔RabbitMQ的养成攻略,RabbitMQ 中的 Rabbit 是兔子的意思,就是形容跑的和兔子一样快。是一款轻量级的,支持多种消息传递协议的高可用的消息队列。RabbitMQ 是由 Erlang 语言编写的,而 Erlang 语言就是一款天生适合高并发的语言。是不是都对小兔很喜欢呢,可爱的小兔在我们工作中可是扮演者大哥大的角色,说它是阿里现在的宠儿一点不为过了。RabbitMQ 前面小兔我介绍过了,那么MQ代表的是什么意思呢?其实了解的都知道,Message Queue 的简写,用官方的话说 RabbitMQ 是一款开源的消息队列系统。下面跟着小羽一起来看看这只小兔是如何养成的吧。前言现在市场上主流的MQ有很多,比如 ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ 等。阿里巴巴最初也是使用 ActiveMQ ,不过随着业务的不断发展,ActiveMQ IO 模块出现瓶颈,后来阿里巴巴通过一系列优化但是还是不能很好的解决,之后阿里巴巴把注意力放到了主流消息中间件 kafka 上面,但是 kafka 并不能满足他们的要求,尤其是低延迟和高可靠性。所以 RocketMQ 是站在巨人的肩膀上(kafka),又对其进行了优化让其更满足互联网公司的特点。RabbitMQ 作为一款非常流行的消息中间件
  • 字节跳动上亿级别秒杀系统优化,用这个方案可行吗?
    秒杀相信大家都不陌生,商家会发布一些价格低廉、数量很少的商品,吸引用户抢购,例如每年双十一活动就属于典型的秒杀活动。还有类似春节12306抢票、小米手机限量发售等都可以理解为“秒杀”。 秒杀特点是持续时间短,抢购人数多,参与人数大大高于商品数量。抢购开始前后大量用户请求涌入,极易给服务造成巨大压力。如果系统设计不当,还容易造成超卖、数据丢失等问题。 本文我们主要讨论在秒杀的高并发场景下,传统订单架构存在的性能瓶颈,如何利用redis、MQ等中间件对系统做优化,解决缓存加速、防止重复提交、排队下单、超卖、少卖、削峰、异步下单等核心问题。 秒杀业务流程简介 秒杀总体业务流程可以简述为 商户创建秒杀活动,设定秒杀时间段,选择本次活动的商品,设置折扣、库存等;用户APP端在活动即将开始时会看到秒杀活动列表,点击活动可以看到商品列表,点击商品可以查看秒杀商品详情;商品详情页用户点击立即抢购;如果库存充足,则创建订单成功;否则秒杀失败提交订单后超时未支付,系统会自动关闭订单,回滚库存。 秒杀页面主要分为: (1)首页秒杀活动列表 (2)商品详情页 普通订单系统 我们来看看普通订单系统是如何处理订单请求的. 订单下单流程图 流程分析 在springcloud环境下,普通订单下单流程可以总结为: 1.用户确认订单、提交订单,发送下单请求至订单微服务; 2.订单服务会调用用户服务做一系列业务校验
  • 10分钟就能轻松入门消息中间件rabbitmq(附带教学源码)
    rabbitmq简介rabbitmq 是spring所在公司Pivotal自己的产品。因为跟spring有共同的血缘关系, 所以spring 全家桶对其的支持应该是相当完善的。本身基于AMQP高级队列协议的消息中间件,采用erlang开发, 因此安装需要erlang环境。具体安装根据自己的环境。使用场景一、异步处理用户注册(50ms),还需发送邮件(50ms)和短信(50ms)串行:(150ms)用户注册—》发送邮件----》发送短信并行(100ms):用户注册—》发送邮件a) |----》发送短信消息中间件(56ms):用户注册(50ms)—》(6ms)消息中间件《-----发送邮件《-----发送短信说明:一个用户注册流程,包含下述业务:1. 注册处理以及写数据库、2. 发送注册成功的手机短信3. 发送注册成功的邮件信息我们使用老方法的话,则会注册完执行发送短信再执行邮件发送。太low一般使用的是:在注册成功后,使用两个线程去做发送邮件,发送短信操作。如果用消息中间件:则将两个线程创建这些事情省了,直接发送消息给消息中间件,然后让邮件服务和短信服务自己去消息中间件里面去取消息,然后取到消息后再自己做对应的业务操作。就是这么方便二、应用解耦a) 订单系统---》库存系统(强耦合)b) 消息中间件:订单系统---》消息中间件《----库存系统(解耦)说明:用户购买一笔订单,订单成交
  • 牛逼哄哄的 RabbitMQ 到底有啥用?
    一. RabbitMQ 简介 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。 消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。 RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。 二. RabbitMQ 使用场景 1. 解耦(为面向服务的架构(SOA)提供基本的最终一致性实现) 场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。 传统模式的缺点: 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败订单系统与库存系统耦合 引入消息队列 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作假如
  • RabbitMQ高级特性
    学习目标 掌握RabbitMQ 高级特性理解RabbitMQ 应用问题能够搭建RabbitMQ 集群 1. RabbitMQ 高级特性 1.1 消息可靠性投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。 confirm 确认模式 return 退回模式 rabbitmq 整个消息投递的路径为: ​ producer —> rabbitmq broker —> exchange —> queue —> consumer 消息从 producer 到 exchange 则会返回一个 confirmCallback 。 消息从 exchange 到 queue 投递失败则会返回一个 returnCallback 。 我们将利用这两个 callback 控制消息的可靠性投递 1.1.1 confirm确认模式代码实现 创建maven工程,消息的生产者工程,项目模块名称:rabbitmq-producer-spring 添加依赖 <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.1.7
  • 尚硅谷2020微服务分布式电商项目《谷粒商城》-订单、库存
    1. 搭建订单工程 整理不易,拒绝白嫖,记得三连哦 关注公众号:java星星 获取全套课件资料 完成购物车页面之后,点击购物车页面的“去结算”按钮,跳转到订单结算页。 接下来,先搭建订单系统: pom.xml: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.atguigu</groupId> <artifactId>gmall</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>com.atguigu</groupId> <artifactId>gmall-order</artifactId> <version>0.0.1-SNAPSHOT<
  • 畅购商城项目,面试问答,项目详解及全部代码
    商城项目 商城—微服务技术栈 1 商城介绍 1.1 项目介绍 此电商项目属于B2C模式的线上商城,支持用户在线浏览商品,在线搜索商品,并且可以将喜欢的商品加入购物车从而下单购买商品,同时支持线上支付,支付模式支持支付宝、微信、银联支付。用户还可以参与低价商品秒杀。 畅购商城采用了微服务架构,微服务技术采用了SpringCloud技术栈,各个微服务站点基于SpringBoot构建,并采用SpringCloud Gateway将各个微服务的功能串联起来,形成一套套系统,同时在微服务网关Gateway中采用过滤和限流策略,实施对微服务进行保护和权限认证操作。项目采用了SpringSecurity OAuth2.0解决了各个微服务之间的单点登录和用户授权。采用了当前非常热门的Seata来解决微服务与微服务之间的分布式事务。采用了Elasticsearch解决了海量商品的实时检索。数据存储采用了MySQL,并结合Canal实现数据同步操作,利用Redis做数据缓存操作。各个微服务之间采用RabbitMQ实现异步通信。我们采用了OpenResty集成的Nginx来控制微服务最外层的大量并发,利用Keepalived+Nginx来解决Nginx单点故障问题。 1.1 项目架构 2 商城业务总结 2.1 购物车业务 购物车分为2种情况,一种是用户登录或者不登录均能使用购物½
  • springboot 集成消息队列 (一) ActiveMQ
    一、什么是消息队列? MQ全程(Message Queue)又名消息队列,是一种异步通讯的中间件。可以理解为邮局,发送者将消息投递到邮局,然后邮局帮我们发送给具体的接收者,具体发送过程和时间与我们无关。 消息队列是分布式系统中重要的组件,消息队列主要解决了应用耦合、异步处理、流量削锋等问题。 当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。 二、使用场景 消息队列在实际应用中包括如下四个场景: 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败; 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间; 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况; 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理; 下面详细介绍上述四个场景以及消息队列如何在上述四个场景中使用: 1、应用耦合 场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图 传统模式的缺点: 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败 订单系统与库存系统耦合
  • 常用消息队列应用场景及其对比
    一、消息队列应用场景 消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景 1、异步处理 场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 串行的方式;并行方式 (1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端 (2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间 小结:传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢? 引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下: 按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍 2、应用解耦 场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图 传统模式的缺点: 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败 订单系统与库存系统耦合,如下图: 订单系统:用户下单后,订单系统完成持久化处理
  • RabbitMQ
    RabbitMQ 1.MQ引言 1.1 什么是MQ MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。 1.2 MQ有哪些 当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。 1.3 不同MQ特点 # 1.ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎! # 2.Kafka Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费, 追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求, 适合产生大量数据的互联网服务的数据收集业务。 # 3
  • rabbitMQ学习笔记
    学习来源:MQ消息中间件之RabbitMQ以及整合SpringBoot2.x实战教程,已完结! 1.MQ引言 1.1 什么是MQ MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。 1.2 MQ有哪些 当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。 1.3 不同MQ特点 # 1.ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎! # 2.Kafka Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费, 追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失
  • RabbitMQ消息中间件技术精讲-深入RabbitMQ高级特性-100%投递成功-幂等性概念-TTL队列/消息
    RabbitMQ的高级特性和实际场景应用,包括消息如何保障 100% 的投递成功 ?幂等性概念详解,在海量订单产生的业务高峰期,如何避免消息的重复消费问题?Confirm确认消息、Return返回消息,自定义消费者,消息的ACK与重回队列,消息的限流,TTL消息,死信队列等 ... 消息如何保障100?投递成功? 幂等性概念详解 在海量订单产生的业务高峰期,如何避免消息的重复消费问题? Confirm确认消息、 Return返回消息 自定义消费者 消息的ACK与重回队列 消息的限流 TTL消息 死信队列 消息如何保障100%投递成功? 什么是生产端的可靠性投递? 保障消息的成功发出 保障MQ节点的成功接收 发送端收到MQ节点( Broker)确认应答 完善的消息进行补偿机制 生产端-可靠性投递 BAT/TMD互联网大厂的解决方案: 消息落库,对消息状态进行打标----数据中定义不同状态 消息的延迟投递,做二次确认,回调检查 消息落库,对消息状态进行打标(需要对数据库持久化两次,消息状态的修改) BIZ DB(业务数据库)MSG DB(消息数据库) step1业务入库和消息入库step2:step1成功,生产端的Sender进行消息发送(消息状态初始值为0)step3:Broker(Server)收到消息并发送应答给生产端的Confirm Listenerstep4:Confirm
  • RabbitMQ
    一、简介 RabbitMQ 是使用Eelang语言开发的开源消息队列系统,基于AMQP协议来实现,AMQP主要的特征是面向对象、队列、路由(包括点对点和发布订阅、可靠性、安全、)。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求比较高的场景,对性能和吞吐量的要求其次。 AMQP协议 是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件的不同产品,不同的开发语言等条件限制,Erlang中的实现有RabbitMQ等。 二、RabbitMQ 的五种模型 (1)直连模型(hello world) 生产者将消息发送到消息队列中,消费者一直等待消息队列中的消息。一个生产者对应一个消费者,比较容易造成消息的堆积。 package com.ldc.demo.SpringBoot_RabbitMQ; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener