王尘宇王尘宇

研究百度干SEO做推广变成一个被互联网搞的人

分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性

分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性

交易流程

流程说明
在商品服务中,商品减库存后,记录商品交易明细,如果没有异常,就将商品交易记录的状态位设置为“1—准备提交”,并且记录在Redis的状态表中。
商品服务通过RESTFUL调用资金服务,如果成功,就将账户交易明细表的记录的状态位设置为“1—准备提交”,并且记录在Redis的状态表中。
最后,读取Redis相关的所有状态位,确定是否所有的操作都为“1—准备提交”状态,如果是,则更新产品服务的记录状态为“2—提交成功”,然后发起资金服务调用,将对应的记录(可通过业务流水号关联)的状态也更新为“2—提交成功”,这样就完成了整个交易。
如果不全部为“1—准备提交”状态,则发起各库的冲正交易,冲掉原有的记录,并且归还商品库存和账户金额。发起冲正交易,把原明细记录状态更新为3–被冲正,并往明细表中添加对应的新记录,状态为4–冲正记录

RabbitMQ可靠事件

使用RabbitMQ等消息队列中间件的可靠事件,来实现分布式事务,这里结合SpringBoot
前面有介绍过SpringBoot整合多数据库的文章,这里可以用到,具体参考《Spring Boot学习:MyBatis配置Druid多数据源》,切换数据源使用@DataSource注解,如下

@DataSource(value = DataSourceType.MASTER) //切换到商品数据库
@DataSource(value = DataSourceType.SLAVE) //切换到账户数据库

在此基础上我们加入RabbitMQ实现分布式事务功能

  1. 在pom.xml文件中加入依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  1. yml配置文件中,关于RabbitMQ的配置如下:
# Spring 配置
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
    #使用发布者确认模式,发布消息者会得到一个“消息是否被服务提供者接收”的确认消息
    publisher-confirms: true

#RabbitMQ 队列名称配置
rabbitmq:
  queue:
    fund: fund

3.创建RabbitMQ配置文件RabbitConfig.java

package com.zhlab.demo.config;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName RabbitConfig
 * @Description //RabbitMQ消息队列配置
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/11 0011 上午 11:10
 **/
@Configuration
public class RabbitConfig {

    // 读取配置属性
    @Value("${rabbitmq.queue.fund}")
    private String fundQueueName = null;

    // 创建RabbitMQ消息队列
    @Bean(name="fundQueue")
    public Queue createFundQueue() {
        return new Queue(fundQueueName);
    }
}
  1. 创建数据传输对象FundParams.java
package com.zhlab.demo.model;

import java.io.Serializable;

/**
 * @ClassName FundParams
 * @Description //FundParams
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/11 0011 上午 11:30
 **/
public class FundParams implements Serializable {
    // 序列化版本号
    public static final long serialVersionUID = 989878441231256478L;
    private Long xid; // 业务流水号
    private Long userId; // 用户编号
    private Double amount; // 交易金额

    public FundParams() {
    }

    public FundParams(Long xid, Long userId, Double amount) {
        this.xid = xid;
        this.userId = userId;
        this.amount = amount;
    }

    public Long getXid() {
        return xid;
    }

    public void setXid(Long xid) {
        this.xid = xid;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }
}
  1. 创建商品服务 业务逻辑PurchaseService.java
package com.zhlab.demo.service.goods;

import com.zhlab.demo.db.DataSourceType;
import com.zhlab.demo.db.annotation.DataSource;
import com.zhlab.demo.model.FundParams;
import com.zhlab.demo.utils.SnowFlakeUtil;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

/**
 * @ClassName PurchaseService
 * @Description //商品 业务逻辑
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/11 0011 上午 11:24
 **/
@Service
public class PurchaseService implements RabbitTemplate.ConfirmCallback {
    //实现RabbitTemplate.ConfirmCallback接口
    //需要实现它定义的confirm方法,这样它便可以作为一个发布者检测消息是否被消费者所接收的确认类

    // SnowFlake算法生成ID
    SnowFlakeUtil worker = new SnowFlakeUtil(003);

    // RabbitMQ模板
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 读取配置属性
    @Value("${rabbitmq.queue.fund}")
    private String fundQueueName;

    // 购买业务方法
    @DataSource(value = DataSourceType.MASTER) //切换到商品数据库
    public Long purchase(Long productId, Long userId, Double amount) {

        rabbitTemplate.setConfirmCallback(this);//设置了回调类为当前类

        // SnowFlake算法生成序列号,用户跨服务的关联,这里用本地自定义方法,可以借助Leaf TinyID等分布式ID生成服务中间件
        Long xid = worker.nextId();

        // 传递给消费者的参数
        FundParams params = new FundParams(xid, userId, amount);
        // 发送消息给资金服务做扣款
        this.rabbitTemplate.convertAndSend(fundQueueName, params); // ④
        System.out.println("执行产品服务逻辑");
        return xid;
    }

    /**
     * 确认回调,会异步执行
     * @param correlationData --相关数据
     * @param ack -- 是否被消费
     * @param cause -- 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        /*
         * ack代表是否成功。
         * 如果投递消息失败,就会先停滞1秒,然后尝试进行冲正交易,冲掉原有交易,这样就可以使得数据平整
         */
        if (ack){ // 消息投递成功
            System.out.println("执行交易成功");
        } else { // 消息投递失败
            try {
                // 停滞1秒(稍微等待可能没有完成的正常流程),然后发起冲正交易
                Thread.sleep(1000);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            System.out.println("尝试产品减库存冲正交易。");
            System.out.println("尝试账户扣减冲正交易。");
            //在confirm方法中,如果参数ack为false,则说明消息传递失败,就要尝试执行冲正交易,把数据还原回来
            System.out.println(cause); // 打印消息投递失败的原因
        }
    }
}
  1. 创建账户服务业务逻辑AccountService.java
package com.zhlab.demo.service.fund;

import com.zhlab.demo.db.DataSourceType;
import com.zhlab.demo.db.annotation.DataSource;
import com.zhlab.demo.model.FundParams;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @ClassName AccountService
 * @Description //账户 业务逻辑
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/11 0011 上午 11:25
 **/
@Service
public class AccountService {

    /* 消息监听,取YAML文件配置的队列名
     *因为消息被消费,所以触发PurchaseService类的confirm方法
     *spring.rabbitmq.listener.simple.acknowledge-mode = manual
     *如果配置为手动,这里就需要手动确认消息,默认为自动的
     *自动确认:这种模式下,当发送者发送完消息之后,它会自动认为消费者已经成功接收到该条消息。
     *这种方式效率较高,当时如果在发送过程中,如果网络中断或者连接断开,将会导致消息丢失
     *手动确认:消费者成功消费完消息之后,会显式发回一个应答(ack信号),
     *RabbitMQ只有成功接收到这个应答消息,才将消息从内存或磁盘中移除消息。
     *这种方式效率较低点,但是能保证绝大部分的消息不会丢失,当然肯定还有一些小概率会发生消息丢失的情况
     *主要方法:basicAck、basicNack、basicReject根据具体业务情况使用,配合redis做幂等检验
    */
    @RabbitListener(queues = "${rabbitmq.queue.fund}")
    @DataSource(value = DataSourceType.SLAVE) //切换到账户数据库
    public void dealAccount(FundParams params) {
        //TODO具体业务逻辑需自己实现
        System.out.println("扣减账户金额逻辑......");
    }
}

7.写个测试接口来测试一下,创建MqController.java

package com.zhlab.demo.controller;

import com.zhlab.demo.service.goods.PurchaseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName MqController
 * @Description //RabbitMQ可靠消息 接口测试
 * @Author singleZhang
 * @Email 405780096@qq.com
 * @Date 2020/12/11 0011 下午 2:25
 **/
@RestController
@RequestMapping("/mq")
public class MqController {
    @Autowired
    private PurchaseService purchaseService;

    @GetMapping("/test")
    public String testMq() {
        return purchaseService.purchase(1L, 1L, 200.0) + "";
    }
}

以上就是基于RabbitMQ可靠消息 实现的分布式事务处理,逻辑和说明都在注释里了。

※说明:这样的确认方式,只是保证了事件的有效传递,但是不能保证消费类能够没有异常或者错误发生,当消费类有异常或错误发生时,数据依旧会存在不一致的情况。这样的方式,只是保证了消息传递的有效性,降低了不一致的可能性,从而大大降低了后续需要运维和业务人员处理的不一致数据的数量

TCC补偿事务

TCC代表的是

  • try(尝试)
  • confirm(确认)
  • cancel(取消)

在TCC事务中,要求任何一个服务逻辑都有3个接口,它们对应的就是尝试(try)方法、确认(confirm)方法和取消(cancel)方法。

分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性

TCC事务模型

TCC事务的一致性可达99.99%,是一种较为成熟的方案,因此在目前有着较为广泛的应用。

继续通过上面的商品交易流程来解析这个模型:

  1. 一阶段
    商品表减库存,商品交易明细表记录商品交易明细,并且将对应记录状态设置为“1—准备提交”。
    调用账户服务,用户账户表扣减账户资金,账户交易明细表记录交易明细,并且将对应记录状态设置为“1—准备提交”
    在一阶段的调用中,如果没有发生异常,就可以执行正常二阶段进行提交了
  2. 正常二阶段
    商品服务 更新对应记录的状态为“2—提交成功”,使得数据生效
    调用账户服务,使得对应的记录状态也为“2—提交成功”,这样正常的提交就完成了
    如果在一阶段发生异常,需要取消操作,可以执行异常二阶段
  3. 异常二阶段
    商品服务执行冲正交易,冲掉原有的产品交易,将库存归还给商品表
    调用账户服务,发起冲正交易,冲掉原有的资金交易,将资金归还到账户里

注意,这些提交和退出机制在TCC中,都需要开发者对接口作幂等性处理

TCC事务机制,也并不能保证所有的数据都是完全一致的,它只是提供了一个可以修复的机制,来降低不一致的情况,从而大大降低后续维护数据的代价。TCC事务也会带来两个较大的麻烦:第一个是,原本的一个方法实现,现在需要拆分为3个方法,代价较大;第二个是,需要开发者自已实现提交和取消方法的幂等性

总结

使用分布式事务,并不是很容易的事情,甚至有些方法还相当复杂。
在互联网中,并不是所有的数据都需要使用分布式事务,所以首先要考虑的是:在什么时候使用分布式事务。即使需要使用分布式事务,有时候也并非需要实时实现数据的一致性,因为可以在后续通过一定的手段来完成。例如电商网站,对买家来说,需要的是快速响应,但对商家来说,就未必需要得到实时数据了,过段时间得到数据也是可以的,而这段时间就可以考虑进行数据补偿了。无论我们如何使用分布式事务,也无法使数据完全达到百分之百的一致性,因此一般金融和电商企业会通过对账等形式来完成最终一致性的操作。

在分布式事务的选择中,都会采用弱一致性代替强一致性,相对来说,弱一致性更加灵活,更方便我们开发。从网站的角度来说,弱一致性可以获得更佳的性能,提升用户的体验,这是互联网应用需要首先考虑的要素。

拓展—电商中的高并发和分布式事务

电商网站中高并发是常见的,高并发是针对用户而言的,比如抢购中,用户只希望短时间内快速抢到商品,而商家对于交易信息可以延迟处理得到。
这就是意味着,对于用户交易部分,要尽可能通过分布式事务进行保证,但而对于商户数据部分,实时性要求相对不是那么高,可以过段时间通过后续手段来补偿修复,从而缩小分布式事务的范围。

分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性

确定需要分布式事务的范围

这里可以看出使用分布式事务的主要是请求数据,保证这个过程可以提高数据可靠性。对于商户数据,不需要使用分布式事务,这样可以提升性能,使抢购进行得更快,满足买家的需求,但是这也会引发数据的丢失。为了解决这个题,后续可以通过和请求数据进行对比来修复数据,使数据达到一致,这个过程可以在高并发过后(一般高并发都是时间段性的,如性价比高的产品发布点、购物节开始时间段)进行,这样商户最终也可以得到可靠的数据,只是不是实时的,但是这并不影响商户和用户的业务。

相关文章

评论列表

发表评论:
验证码

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。