请选择 进入手机版 | 继续访问电脑版
搜索
房产
装修
汽车
婚嫁
健康
理财
旅游
美食
跳蚤
二手房
租房
招聘
二手车
教育
茶座
我要买房
买东西
装修家居
交友
职场
生活
网购
亲子
情感
龙城车友
找美食
谈婚论嫁
美女
兴趣
八卦
宠物
手机

分布式事务之解决方案(最大努力通知)

[复制链接]
查看: 90|回复: 0

8158

主题

8158

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
24484
发表于 2019-12-3 07:09 | 显示全部楼层 |阅读模式
6.散布式事务打点计划之最大积极看护

6.1. 什么是最大积极看护

最大积极看护也是一种打点散布式事务的计划,下边是一个是充值的例子:
我的关键词 散布式事务之处理计划(最大尽力告诉)  热门消息 20191201201156778

交互流程 :
1、账户系统挪用充值系统接口
2、充值系统完成支出处置惩罚向账户系统倡议充值成果看护
若看护失利,则充值系统按计谋举行反复看护
3、账户系统罗致到充值成果看护点窜充值状态
4、账户系统未罗致到看护会自动挪用充值系统的接口查询充值成果
经过上边的例子我们总结最大积极看护计划的目标 :
目标 :倡议看护方经过必定的机制最大积极将营业处置惩罚成果看护到罗致方。
具体包括 :
1、有必定的消息反复看护机制。
由于罗致看护方大要没有罗致到看护,此时要有必定的机制抵消息反复看护。
2、消息校订机制。
假如尽最大积极也没有看护到罗致方,大概罗致方消耗消息后要再次消耗,此时可由罗致方自意向看护方查询消息信息来满足需求。
最大积极看护与牢靠消息同等性有什么不同?
1、打点计划脑筋不同
牢靠消息同等性,倡议看护方必要保证将消息发进来,而且将消息发到罗致看护方,消息的牢靠性关键由倡议看护方来保证。
最大积极看护,倡议看护方尽最大的积极将营业处置惩罚成果看护为罗致看护方,可是大要消息罗致不到,此时必要罗致看护方自动挪用倡议看护方的接口查询营业处置惩罚成果,看护的牢靠性关键在罗致看护方。
2、两者的营业利用处景不同
牢靠消息同等性关注的是买卖营业进程的事务同等,以异步的方式完成买卖营业。
最大积极看护关注的是买卖营业后的看护事务,行将买卖营业成果牢靠的看护进来。
3、技术打点偏向不同
牢靠消息同等性要打点消息从发出到罗致的同等性,即消息发出而且被罗致到。
最大积极看护没法保证消息从发出到罗致的同等性,只供给消息罗致的牢靠性机制。牢靠机制是,最大积极的将消息看护给罗致方,当消息没法被罗致方罗致时,由罗致方自动查询消耗(营业处置惩罚成果)。

6.2. 打点计划

经过对最大积极看护的大白,采取MQ的ack机制便可以实现最大积极看护。
计划1 :
我的关键词 散布式事务之处理计划(最大尽力告诉)  热门消息 20191201204222670

本计划是利用MQ的ack机制由MQ向罗致看护方发送看护,流程以下 :
1、倡议看护方将看护发给MQ。
利用普通消息机制将看护发给MQ。
留意 :假如消息没有发进来可由罗致看护方自动请求倡议看护方查询营业实行成果。
2、罗致看护方监听MQ。
3、罗致看护方罗致消息,营业处置惩罚完成回应ack。
4、罗致看护方若没有回应ack则MQ会反复看护。
MQ会依照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,渐渐拉大看护间隔(假如MQ采取rocketMq,在broker中可举行设备),直到到达看护要求的时候窗口上限。
5、罗致看护方可经过消息校订接口来校订消息的同等性。
计划2 :
本计划也是利用MQ的ack机制,与计划1差此外是利用步伐向罗致看护方发送看护,以下图 :
我的关键词 散布式事务之处理计划(最大尽力告诉)  热门消息 2019120121174777

交互流程以下 :
1、倡议看护方将看护发给MQ。
利用牢靠消息同等计划中的事务消息保证当地事务与消息的原子性,终极将看护先发给MQ。
2、看护步伐监听MQ,罗致MQ的消息。
计划1中罗致看护方间接监听MQ,计划2中由看护步伐监听MQ。
看护步伐若没有回应ack则MQ会反复看护。
3、看护步伐经过互联网接口协议(如http、webservice)挪用罗致看护计划接口,完成看护。
看护步伐挪用罗致看护计划接口乐成就表现看护乐成,即消耗MQ消息乐成,MQ将不再向看护步伐投递看护消息。
4、罗致看护方可经过消息校订接口来校订消息的同等性。
计划1和计划2的不同点 :
1、计划1中罗致看护方与MQ接口,即罗致看护计划监听MQ,此计划垂危利用与内部利用之间的看护。
2、计划2中由看护步伐与MQ接口,看护步伐监听MQ,收到MQ的消息后由看护步伐经过互联网接口协议挪用罗致看护方。此计划垂危利用于内部利用之间的看护,例如支出宝、微信的支出成果看护。

6.3.RocketMQ实现最大积极看护型事务

6.3.1.营业说明

本实例经过RocketMq中心件实现最大积极看护型散布式事务,模拟充值进程。
本案例有账户系统和充值系吐浣个微办事,其中账户系统的数据库是bank1数据库,其中有张三账户。充值系统的数据库利用bank1_pay数据库,记载了账户的充值记载。
营业流程以下图 :
我的关键词 散布式事务之处理计划(最大尽力告诉)  热门消息 20191201221041340

交互流程以下 :
1、用户请求充值系统举行充值。
2、充值系统完成充值将充值成果发给MQ。
3、账户系统监听MQ,罗致充值成果看护,假如罗致不到消息,MQ会反复发送看护。罗致到充值成果看护账户系统增加充值金额。
4、账户系统也可以自动查询充值系统的充值成果查询接口,增加金额。

6.3.2.步伐组成部分

本示例步伐组成部分以下 :
数据库:MySQL-5.7.25
包括bank1和bank1_pay两个数据库。
JDK:64位 jdk1.8.0_201
rocketmq 办事端:RocketMQ-4.5.0
rocketmq 客户端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE 微办事框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
微办事及数据库的关系 :
dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-bank1 银行1,操纵张三账户, 毗连数据库bank1 dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-pay 银行2,操纵充值记载,毗连数据库bank1_pay
我的关键词 散布式事务之处理计划(最大尽力告诉)  热门消息 20191201221539316

交互流程以下 :
1、用户请求充值系统举行充值。
2、充值系统完成充值将充值成果发给MQ。
3、账户系统监听MQ,罗致充值成果看护,假如罗致不到消息,MQ会反复发送看护。罗致到充值成果看护账户系统增加充值金额。
4、账户系统也可以自动查询充值系统的充值成果查询接口,增加金额。

6.3.3.建立数据库

建立bank1库,并导入以下表结构和数据(包含张三账户)
  1. CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户 主姓名',`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT'帐户密码',`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` (`tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
复制代码
建立bank1_pay库,并导入以下表结构:
  1. CREATE DATABASE `bank1_pay` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; CREATE TABLE `account_pay` (`id` varchar(64) COLLATE utf8_bin NOT NULL,`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账号', `pay_amount` double NULL DEFAULT NULL COMMENT '充值余额',`result` varchar(20) COLLATE utf8_bin DEFAULT NULL COMMENT '充值成果:success,fail',PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
复制代码
6.3.4.启动RocketMQ

rocketMQ启动方式与RocketMQ实现牢靠消息终极同等性事务中完全同等
6.3.5.discover-server

discover-server是办事注册中心,测试工程将自己注册至discover-server。
6.3.6.工程概述

(1)父工程maven依靠说明
在dtx父工程中指定了SpringBoot和SpringCloud版本
  1.         org.springframework.boot         spring‐boot‐dependencies                                                 2.1.3.RELEASEpomimport         org.springframework.cloud         spring‐cloud‐dependencies         Greenwich.RELEASE         pom        import
复制代码
在dtx-notifymsg-demo父工程中指定了rocketmq-spring-boot-starter的版本。
  1.         org.apache.rocketmq         rocketmq‐spring‐boot‐starter         2.0.2
复制代码
(2)设备rocketMQ
在application-local.properties中设备rocketMQ nameServer地址及生产组 :
  1. rocketmq.producer.group = producer_bank2rocketmq.name-server = 127.0.0.1:9876
复制代码
6.3.7 dtx-notifydemo-pay

dtx-notifydemo-pay实现以下功用 :
1、充值接口;
2、充值完成要看护;
3、充值成果查询接口。
(2)Dao
  1. @Mapper@Componentpublic interface AccountPayDao {@Insert("insert into account_pay(id,account_no,pay_amount,result) values(#{id},# {accountNo},#{payAmount},#{result})")int insertAccountPay(@Param("id") String id,@Param("accountNo") String accountNo, @Param("payAmount") Double pay_amount,@Param("result") String result);@Select("select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo}")AccountPay findByIdTxNo(@Param("txNo") String txNo); }
复制代码
(3)Service
  1. @Service@Slf4jpublic class AccountPayServiceImpl implements AccountPayService{@AutowiredRocketMQTemplate rocketMQTemplate;@AutowiredAccountPayDao accountPayDao;@Transactional@Overridepublic AccountPay insertAccountPay(AccountPay accountPay) {        int result = accountPayDao.insertAccountPay(accountPay.getId(),        accountPay.getAccountNo(), accountPay.getPayAmount(), "success");        if(result>0){ //发送看护                rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay);                return accountPay;         }        return null; }        @Override        public AccountPay getAccountPay(String txNo) {                AccountPay accountPay = accountPayDao.findByIdTxNo(txNo);                 return accountPay;          }}
复制代码
(4)Controller
  1. @RestControllerpublic class AccountPayController {@AutowiredAccountPayService accountPayService;//充值@GetMapping(value = "/paydo")public AccountPay pay(AccountPay accountPay){        //事务号        String txNo = UUID.randomUUID().toString(); accountPay.setId(txNo);        return accountPayService.insertAccountPay(accountPay);}//查询充值成果@GetMapping(value = "/payresult/{txNo}")public AccountPay payresult(@PathVariable("txNo") String txNo){                return accountPayService.getAccountPay(txNo);         }}
复制代码
6.3.8 dtx-notifydemo-bank1

dtx-notifydemo-bank1实现以下功用 :
1、监听MQ,罗致充值成果,按照充值成果完成账户金额点窜。
2、自动查询充值系统,按照充值成果完成账户金额点窜。
1)Dao
  1. @Mapper@Componentpublic interface AccountInfoDao {        //点窜账户金额        @Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}")        int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);        //查询幂等记载,用于幂等控制        @Select("select count(1) from de_duplication where tx_no = #{txNo}")         int isExistTx(String txNo);        //增加事务记载,用于幂等控制        @Insert("insert into de_duplication values(#{txNo},now());")         int addTx(String txNo);}
复制代码
2)AccountInfoService
  1. @Service@Slf4jpublic class AccountInfoServiceImpl implements AccountInfoService {        @Autowired        AccountInfoDao accountInfoDao;        @Autowired        PayClient payClient; /**        * 更新帐号余额,并发送消息 *        * @param accountChange */        @Transactional        @Override        public void updateAccountBalance(AccountChangeEvent accountChange) {        //幂等校验        int existTx = accountInfoDao.isExistTx(accountChange.getTxNo()); if(existTx >0){        log.info("已处置惩罚消息:{}", JSONObject.toJSONString(accountChange));        return ; }        //增加事务记载 accountInfoDao.addTx(accountChange.getTxNo()); //更新账户金额        accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount()); }        /**        * 自动查询充值成果 *        * @param tx_no */        @Override        public AccountPay queryPayResult(String tx_no) {                //自动请求充值系统查询充值成果                AccountPay accountPay = payClient.queryPayResult(tx_no); //充值成果                String result = accountPay.getResult();                 log.info("自动查询充值成果:{}",                 JSON.toJSONString(accountPay));                 if("success".equals(result)){                        AccountChangeEvent accountChangeEvent = new AccountChangeEvent();                        accountChangeEvent.setAccountNo(accountPay.getAccountNo());                        accountChangeEvent.setAmount(accountPay.getPayAmount());                        accountChangeEvent.setTxNo(accountPay.getId());                        updateAccountBalance(accountChangeEvent);                }                return accountPay;         }}
复制代码
  1. @FeignClient(value = "dtx‐notifymsg‐demo‐pay", fallback = PayFallback.class)public interface PayClient {        @GetMapping("/pay/payresult/{txNo}")        AccountPay queryPayResult(@PathVariable("txNo") String txNo); }@Componentpublic class PayFallback implements PayClient {        @Override        public AccountPay queryPayResult(String txNo) {                AccountPay accountPay = new AccountPay();                accountPay.setResult("fail");                return accountPay;        } }
复制代码
3)监听MQ
  1. @Component@Slf4j@RocketMQMessageListener(topic="topic_notifymsg",consumerGroup="consumer_group_notifymsg_bank1") public class NotifyMsgListener implements RocketMQListener {        @Autowired        AccountInfoService accountInfoService;        @Override        public void onMessage(AccountPay accountPay) {                log.info("罗致到消息:{}", JSON.toJSONString(accountPay));                 AccountChangeEvent accountChangeEvent = new AccountChangeEvent();                accountChangeEvent.setAmount(accountPay.getPayAmount());                accountChangeEvent.setAccountNo(accountPay.getAccountNo());                accountChangeEvent.setTxNo(accountPay.getId());                accountInfoService.updateAccountBalance(accountChangeEvent);                 log.info("处置惩罚消息完成:{}", JSON.toJSONString(accountChangeEvent));        } }
复制代码
4)Controller
  1. @RestController@Slf4jpublic class AccountInfoController {        @Autowired        private AccountInfoService accountInfoService;                //自动查询充值成果                @GetMapping(value = "/payresult/{txNo}")                public AccountPay result(@PathVariable("txNo") String txNo){                AccountPay accountPay = accountInfoService.queryPayResult(txNo);                return accountPay;         }}
复制代码
6.3.9 测试场景


  • 充值系统充值乐成,账户系统自动查询充值成果,点窜账户金额。
  • 充值系统充值乐成,发送消息,账户系统罗致消息,点窜账户金额。
  • 账户系统点窜账户金额幂等测试。
6.4. 小结

最大积极看护计划是散布式事务中对同等性要求最低的一种,适用于一些终极同等性时候敏感度低的营业;
最大积极看护计划必要实现以下功用 :
1、消息反复看护机制。
2、消息校订机制。


免责声明:假如加害了您的权益,请联系站长,我们会实时删除侵权内容,感谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Copyright © 2006-2014 淄博新闻网-淄博日报 淄博晚报 淄博财经新报 掌中淄博 淄博专业新闻资讯发布网站 版权所有 法律顾问:高律师 客服电话:0791-88289918
技术支持:迪恩网络科技公司  Powered by Discuz! X3.2
快速回复 返回顶部 返回列表