中间件基础
约 7057 字大约 24 分钟
(1)Nacos-discovery
理论基础
Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service的首字母简称,一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。
Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。
Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。
总的来说nacos就是服务注册中心和配置中心的组合
可以替代Eureka做服务注册中心
可以替代Config做服务配置中心
服务发现是微服务架构体系中最关键的组件之一。如果尝试着用手动的方式来给每一个客户端来配置所有服务提供者的服务列表是一件非常困难的事,而且也不利于 服务的动态扩缩容。Nacos Discovery Starter 可以帮助您将服务自动注册到 Nacos 服务端并且能够动态感知和刷新某个服务实例的服务列表。除此之外,Nacos Discovery Starter 也将服务实例自身的一些元数据信息-例如 host,port,健康检查URL,主页等-注册到 Nacos 。Nacos 的获取和启动方式可以参考 Nacos 官网。
https://nacos.io/zh-cn/docs/quick-start-spring-cloud.html
项目应用
<!-- nacos注册中心 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
server:
port: 9004
spring:
application:
name: nacos-feign
cloud:
nacos:
discovery:
server-addr: 192.168.3.41:8848
username: nacos
password: nacos
namespace: public
mvc:
pathmatch:
matching-strategy: ant_path_matcher
(2)Nacos-config
理论基础
配置文件的不足
1.配置存在公共的配置,缺少统一的管理
2.环境参数的配置在每个项目中都有,一旦泄露,不知道是谁泄露的
3.配置缺少git版本管理
4.配置文件的配置无法实现动态更新
配置中心的思想
1.首先把项目中各种配置全部放到一个集中的地方进行统一管理,并提供一套标准的接口
2.当各个微服务需要获取配置的时候,就来配置中心的接口拉取自己的配置
3.当配置中心中的各种参数有更新的时候,也能通知到各个服务实时的过来同步最新的消息,使之动态更新
Nacos 提供用于存储配置和其他元数据的 key/value 存储,为分布式系统中的外部化配置提供服务器端和客户端支持。使用 Spring Cloud Alibaba Nacos Config,您可以在 Nacos Server 集中管理你 Spring Cloud 应用的外部属性配置。
注意:不能使用原来的application.yml作为配置文件,而是新建一个bootstrap.yml作为配置文件!
项目应用
<!-- nacos配置中心 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- 读取bootstrap.yaml文件的依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
【bootstrap.yaml】
spring:
application:
name: nacos-feign
cloud:
nacos:
discovery:
server-addr: 192.168.3.41:8848
namespace: public
username: nacos
password: nacos
config:
server-addr: 192.168.3.41:8848
file-extension: yaml
extension-configs:
- dataId: shopfeign.yaml
refresh: true
namespace: public
mvc:
pathmatch:
matching-strategy: ant_path_matcher
【nacos服务器添加配置列表】
配置的【Data Id】必须和配置文件中【dataId】相同。shopfeign.yaml
注意:文件名不用用下划线命名。文件的格式。文件的后缀。
(3)Openfeign
理论基础
Feign是一个声明式的Web服务客户端(Web服务客户端就是Http客户端),让编写Web服务客户端变得非常容易,只需创建一个接口并在接口上添加注解即可。
cloud官网介绍Feign:https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/
Java当中常见的Http客户端有很多,除了Feign,类似的还有Apache 的 HttpClient 以及OKHttp3,还有SpringBoot自带的RestTemplate这些都是Java当中常用的HTTP 请求工具。
OpenFeign是Spring Cloud 在Feign的基础上支持了SpringMVC的注解,如@RequesMapping等等。OpenFeign的@FeignClient可以解析SpringMVC的@RequestMapping注解下的接口,并通过动态代理的方式产生实现类,实现类中做负载均衡并调用其他服务。
OpenFeign 微服务使用步骤
微服务之间使用OpenFeign,肯定是要通过注册中心来访问服务的。提供者将自己的ip+端口号注册到注册中心,然后对外提供一个服务名称,消费者根据服务名称去注册中心当中寻找ip和端口。
项目应用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>
@FeignClient(name="nacos-order", path = "/order")
public interface OrderFeign {
@RequestMapping("/create")
boolean create(@SpringQueryMap Order order);
@RequestMapping("/update")
boolean update(@RequestParam("userId") Long userId, @RequestParam("status")Integer status);
}
【注意】
启动类需要添加@EnableFeignClients
【注意】
@SpringQueryMap注解
spring cloud项目使用feign的时候都会发现一个问题,就是get方式无法解析对象参数。其实feign是支持对象传递的,但是得是Map形式,而且不能为空,与spring在机制上不兼容,因此无法使用。spring cloud在2.1.x版本中提供了@SpringQueryMap注解,可以传递对象参数,框架自动解析。
【注意】
main:
##允许存在多个Feign调用相同Service的接口
allow-bean-definition-overriding: true
(4)Loadbalancer
理论基础
SpringCloud从2020版本开始移除了对Ribbon的依赖,官方使用Spring Cloud Loadbalancer正式替换Ribbon,而且Spring Cloud Loadbalancer成为了Spring Cloud负载均衡器的唯一实现。
Spring Cloud 通过ReactiveLoadBalancer 接口,提供了两种负载均衡算法:
RandomLoadBalancer 随机策略 。
RoundRobinLoadBalancer 轮询策略,默认实现。
项目应用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>
// 切换默认的负载均衡算法
// 创建分配策略的配置类(不用加@Configuration)
public class LoadBalancerConfig {
//将官方提供的 RandomLoadBalancer 注册为Bean
@Bean
public ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory){
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RandomLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}
@Configuration
@LoadBalancerClient(value = "nacos-order", //指定为 userservice 服务,只要是调用此服务都会使用我们指定的策略
configuration = LoadBalancerConfig.class) //指定我们刚刚定义好的配置类
public class BeanConfig {
@Bean
@LoadBalanced
RestTemplate template(){
return new RestTemplate();
}
}
(5)Gateway
理论基础
Gateway定义
Spring Cloud Gateway是Spring官方基于Spring5.0、SpringBoot2.0和Project Reactor等技术开发的网关,旨在为微服务框架提供一种简单而有效的统一的API路由管理方式,统一访问接口。
Gateway作用
Spring Cloud Gateway作为Spring Cloud生态体系中的网关,目标是替代Netflix的Zuul,其不仅提供统 一的路由方式,并且基于Filter链的方式提供了网关基本的功能,例如:安全、监控/埋点和限流等等。 它是基于Netty的响应式开发模式。
Gateway的组成
1路由(route):路由是网关最基础的部分,路由信息由一个ID,一个目的URL、一组断言工厂和一 组Filter组成。如果断言为真,则说明请求URL和配置的路由匹配。
2断言(Predicate):Java8中的断言函数,Spring Cloud Gateway中的断言函数输入类型是 Spring5.0框架中的ServerWebExchange。Spring Cloud Gateway中的断言函数允许开发者去定义匹配 来自http Request中的任何信息,比如请求头和参数等。
3️过滤器(Filter):一个标准的Spring WebFilter,Spring Cloud Gateway中的Filter分为两种类型: Gateway Filter和Global Filter。过滤器Filter可以对请求和响应进行处理。
项目应用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2021.1</version>
</dependency>
/**
* 跨域配置类
*/
@Configuration
public class CorsConfig {
@Bean
public CorsWebFilter corsFilter() {
CorsConfiguration config = new CorsConfiguration();
config.setAllowCredentials(true);
config.addAllowedMethod("*");
config.addAllowedOrigin("*");
config.addAllowedHeader("*");
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource(new PathPatternParser());
source.registerCorsConfiguration("/**", config);
return new CorsWebFilter(source);
}
}
server:
port: 9005
spring:
application:
name: nacos-gateway
cloud:
nacos:
discovery:
server-addr: 192.168.3.41:8848
username: nacos
password: nacos
namespace: public
gateway:
discovery:
locator:
lowerCaseServiceId: true
routes:
- id: order-api
uri: lb://nacos-feign
predicates:
- Path=/api/bg/order/**
filters:
- StripPrefix=3
(6)Mycat读写分离
理论基础
Mycat是数据库中间件,所谓中间件,是一类连接软件组件和应用的计算机软件,以便软件各部件之间的通信。
例如 tomcat,web的中间件。而数据库中间件是连接Java应用程序和数据库中间的软件。
为什么要用Mycat
Java与数据库的紧耦合
我们现在普遍的Java应用程序都是直接连接了MySQL软件进行读写操作,也就是我们在Java中的配置文件等定义了mysql的数据源,直接连接到了我们的mysql软件,但是当某些情况下我们可能需要用到了多个数据库,这个时候我们可能就需要配多个数据源去连接我们的多个数据库,这个时候我们进行sql操作的时候就会很麻烦,因为Java与数据库有了一个紧密的耦合度,但是如果我们在Java应用程序与mysql中间使用了mycat,我们只需要访问mycat就可以了,至于数据源等问题,mycat会直接帮我们搞定。
高访问量高并发对数据库的压力
再来说一下高访问量高并发,我们都知道mysql数据库实际上在数据查询上是有一个瓶颈的,当我们的数据太多的时候,已经互联网上有高并发的请求的时候,这个时候对我们mysql的压力是非常大的,当访问量一大,就可能会出现查不出数据,响应的时间太长等,这个时候我们可能需要有多个服务器对数据库进行读写分离,以及对数据库进行集群,这个时候我们的sql语句要进行分类,哪个sql语句要访问哪个数据库,这个时候只要交给mycat就可以了。
读写请求数据不一致
最后说一下,使用多个数据库的时候我们就会遇到一个读写数据不一致的问题,这个时候同样mycat可以进行主从复制,保证了数据的一致性。
mycat能干什么
1、读写分离
2、数据分片
3、多数据源整合
Mycat原理
Mycat 的原理中最重要的一个动词是“拦截”,它拦截了用户发送过来的 SQL 语句,首先对 SQL语句做了一些特定的分析:如分片分析、路由分析、读写分离分析、缓存分析等,然后将此 SQL 发往后端的真实数据库,并将返回的结果做适当的处理,最终再返回给用户。
这种方式把数据库的分布式从代码中解耦出来,程序员察觉不出来后台使用 Mycat 还是MySQL。
基本概念:
物理数据库:真实的数据库
物理表:真实的表
逻辑数据库:相对于物理数据库,是数据节点聚合后的结果
逻辑表:相对于物理表,是分片表聚合后的结果,对于客户端来说跟真实的表没有区别
schema.xml配置
注意:schema的name必须和mysql数据库名字一样一样!
<?xml version="1.0"?>
<!DOCTYPE mycat:schema SYSTEM "schema.dtd">
<mycat:schema xmlns:mycat="http://io.mycat/">
<schema name="db01" checkSQLschema="false" sqlMaxLimit="100" dataNode="dn1"></schema>
<dataNode name="dn1" dataHost="localhost1" database="db01" />
<dataHost name="localhost1" maxCon="1000" minCon="10" balance="3" writeType="0" dbType="mysql" dbDriver="native" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<!-- 可以配置多个主从 -->
<writeHost host="hostM1" url="192.168.31.215:3306" user="root" password="root">
<!-- 可以配置多个从库 -->
<readHost host="hostS2" url="192.168.31.83:3306" user="root" password="root" />
</writeHost>
</dataHost>
</mycat:schema>
server.xml配置
<property name="serverPort">8066</property> <property name="managerPort">9066</property>
<!-- 读写都可用的用户 -->
<user name="root" defaultAccount="true">
<property name="password">root</property>
<property name="schemas">db01</property>
</user>
<!-- 只读用户 -->
<user name="user">
<property name="password">user</property>
<property name="schemas">db01</property>
<property name="readOnly">true</property>
<property name="defaultSchema">TESTDB</property>
</user>
运行
启动Mycat服务器:startup_nowrap.bat
读写账号
ip:192.168.31.215
port:8066
账号:root
密码:root
只读账号
ip:192.168.31.215
port:8066
账号:user
密码:user
配置
<dependency>
<groupId>io.mycat</groupId>
<artifactId>mycat-dao</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
spring:
datasource:
########写数据源
update:
jdbc-url: jdbc:mysql://192.168.31.215:8066/db01?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
######读数据源
select:
jdbc-url: jdbc:mysql://192.168.31.215:8066/db01?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true
driver-class-name: com.mysql.cj.jdbc.Driver
username: user
password: 123456
type: com.alibaba.druid.pool.DruidDataSource
切面
@Aspect
@Component
@Lazy(false)
// Order设定AOP执行顺序 使之在数据库事务上先执行
@Order(0)
public class DataSourceAOP {
//横切点
@Before("execution(* com.zhaoyang.controller.*.*(..))")
public void process(JoinPoint joinPoint) {
String methodName = joinPoint.getSignature().getName();
if (methodName.startsWith("get") || methodName.startsWith("count") || methodName.startsWith("find")
|| methodName.startsWith("list") || methodName.startsWith("select") || methodName.startsWith("check")) {
System.out.println("使用的是读数据源");
DataSourceContextHolder.setDbType("selectDataSource");
} else {
System.out.println("使用的是写数据源");
DataSourceContextHolder.setDbType("updateDataSource");
}
}
}
/**
* 配置读写数据源
*/
@Configuration
public class DataSourceConfig {
@Bean(name = "selectDataSource")
@ConfigurationProperties(prefix = "spring.datasource.select")
public DataSource dataSource1() {
return DataSourceBuilder.create().build();
}
@Bean(name = "updateDataSource")
@ConfigurationProperties(prefix = "spring.datasource.update")
public DataSource dataSource2() {
return DataSourceBuilder.create().build();
}
}
/**
* 保存本地多数据源
*/
@Component
@Lazy(false)
public class DataSourceContextHolder {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
// 设置数据源类型
public static void setDbType(String dbType) {
contextHolder.set(dbType);
}
public static String getDbType() {
return contextHolder.get();
}
public static void clearDbType() {
contextHolder.remove();
}
}
/**
* 该类继承自 AbstractRoutingDataSource 类,在访问数据库时会调用该类的 determineCurrentLookupKey() 方法获取数据库实例的 key
*/
@Component
@Primary
public class DynamicDataSource extends AbstractRoutingDataSource {
@Autowired
@Qualifier("selectDataSource")
private DataSource selectDataSource;
@Autowired
@Qualifier("updateDataSource")
private DataSource updateDataSource;
/**
* 返回生效的数据源名称
*/
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDbType();
}
/**
* 配置使用的数据源信息,如果不存在就使用默认的数据源
*/
@Override
public void afterPropertiesSet() {
Map<Object, Object> map = new HashMap<>();
map.put("selectDataSource", selectDataSource);
map.put("updateDataSource", updateDataSource);
//注册数据源
setTargetDataSources(map);
setDefaultTargetDataSource(updateDataSource);
super.afterPropertiesSet();
}
}
使用
@RestController
public class PersonController {
@Autowired
PersonService personService;
@RequestMapping("/getPerson")
public Person getPerson(int id) {
return personService.getById(id);
}
@RequestMapping("/findPerson")
public Person findPerson(int id) {
return personService.getById(id);
}
@RequestMapping("/save")
public boolean save(Person person) {
return personService.save(person);
}
}
(7)Mycat分库分表
理论基础
分片:将原来单个数据库的数据切分后分散存储在不同的数据库节点
分片节点:分片以后数据存储的节点
分片键:分片依据的字段
分片算法:分片的规则,例如随机、取模、范围、哈希、枚举以及各种组合算法
分库分表的中心思想都是将数据分散存储,使得单一数据库/表的数据量变小来缓解单一数据库的性能问题,从而达到提升数据库性能的目的。
MySQL分库分表主要有垂直分库、垂直分表、水平分库和水平分表4种:
1、垂直分库:以表为依据,根据业务将不同表拆分到不同库中。
特点:
每个库的表结构都不一样。
每个库的数据也不一样。
所有库的并集是全量数据。
2、垂直分表:以字段为依据,根据字段属性将不同字段拆分到不同表中。特点:
每个表的结构都不一样。
每个表的数据也不一样,一般通过一列(主键/外键)关联。
所有表的并集是全量数据。
3、水平分库:以字段为依据,按照一定策略,将一个库的数据拆分到多个库中。
特点:
每个库的表结构都一样。
每个库的数据都不一样。
所有库的并集是全量数据。
4、水平分表:以字段为依据,按照一定策略,将一表的数据拆分到多个表中。
特点:
每个表的表结构都一样。
每个表的数据都不一样。
所有表的并集是全量数据
schema.xml配置
<?xml version="1.0"?>
<!DOCTYPE mycat:schema SYSTEM "schema.dtd">
<mycat:schema xmlns:mycat="http://io.mycat/">
<!-- DB01:逻辑数据库,TB_ORDER: 逻辑表,dataNode中的dn1、dn2、dn3位三个逻辑节点,rule为分片规则 -->
<schema name="db01" checkSQLschema="true" sqlMaxLimit="100">
<table name="t_person" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />
</schema>
<!-- 配置三个逻辑节点,dn1对应的主机是 dhost1,其中的物理数据库为db01 -->
<dataNode name="dn1" dataHost="dhost1" database="db01" />
<dataNode name="dn2" dataHost="dhost2" database="db02" />
<dataNode name="dn3" dataHost="dhost3" database="db03" />
<!-- 主句 dhost1 的物理配置 ,这里面需要配置MySQL的url、user、password等等-->
<dataHost name="dhost1" maxCon="1000" minCon="10" balance="0"
writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<writeHost host="master" url="jdbc:mysql://127.0.0.1:3306?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true" user="root" password="root">
</writeHost>
</dataHost>
<dataHost name="dhost2" maxCon="1000" minCon="10" balance="0"
writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<writeHost host="master" url="jdbc:mysql://127.0.0.1:3306?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true" user="root" password="root">
</writeHost>
</dataHost>
<dataHost name="dhost3" maxCon="1000" minCon="10" balance="0"
writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<writeHost host="master" url="jdbc:mysql://127.0.0.1:3306?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true" user="root" password="root">
</writeHost>
</dataHost>
</mycat:schema>
server.xml配置
<user name="root" defaultAccount="true">
<property name="password">root</property>
<property name="schemas">db01</property>
</user>
<user name="user">
<property name="password">user</property>
<property name="schemas">db01</property>
<property name="readOnly">true</property>
<property name="defaultSchema">TESTDB</property>
</user>
分片规则
启动mycat服务、使用mycat连接MySQL、使用mycat创建表添加数据
db01:t_person:添加数据
id 为 1、2、3、5000000的数据添加到了ip为192.168.31.215的db01数据库中
id 为5000001的数据添加到了ip为192.168.31.215的db02数据库中
id 为15000000的数据添加到了ip为192.168.31.215的db03数据库中
分片原理如下:
1、分片规则配置在 schema.xml中 ,rule="auto-sharding-long" 就是分片规则
2、auto-sharding-long 为引用,具体规则在 conf/rule.xml 中配置
3、<columns>id</columns> 可以看出 时根据 id 列来分配的,分配算法为 rang-long,这里的rang-long也是一个引用,同样在这个文件中
4、可以看出具体的算法在 文件autopartition-long.txt 中,这个文件就在conf文件夹中
## range start-end ,data node index
## K=1000,M=10000.
0-500M=0
500M-1000M=1
1000M-1500M=2
-- 能够看出,id 在 0到500M在 0 号逻辑库,等等。最大可以存储id = 1500M的数据,存放在 2号逻辑库。
-- 若此时存储 id = 15000001的数据会怎样呢?
mysql> insert into tb_order(`id`,`title`) values(15000001,"order_15000001");
ERROR 1064 (HY000): can't find any valid datanode :TB_ORDER -> ID -> 15000001
mysql>
-- 能够看出插入失败,若想插入成功,就需要修改分片规则了。
(8)Seata
理论基础
Seata是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。
致力于提供高性能和简单易用的分布式事务服务,为用户打造一站式的分布式解决方案。
官网地址:http://seata.io/,其中的文档、播客中提供了大量的使用说明、源码分析。
Seata架构
Seata事务管理中有三个重要的角色:
TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
Seata提供了四种不同的分布式事务解决方案:
XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
TCC模式:最终一致的分阶段事务模式,有业务侵入
AT模式:最终一致的分阶段事务模式,无业务侵入,也是Seata的默认模式
SAGA模式:长事务模式,有业务侵入
【注意:创建seata数据库、创建Seata所需要的数据库表mysql.sql】
事务ACID原则
原子性:事务中的所有操作,要么全部成功,要么全部失败
一致性:要保证数据库内部完整性约束、声明性约束
隔离性:对同一资源操作的事务不能同时发生
持久性:对数据库做的一切修改将永久保存,不管是否出现故障
CAP理论
Consistency(一致性)
Availability(可用性)
Partition tolerance (分区容错性)
分布式系统无法同时满足这三个指标。 这个结论就叫做 CAP 定理。
CAP定理- Consistency
Consistency(一致性):用户访问分布式系统中的任意节点,得到的数据必须一致
CAP定理- Availability
Availability (可用性):用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝
CAP定理-Partition tolerance
Partition(分区):因为网络故障或其它原因导致分布式系统中的部分节点与其它节点失去连接,形成独立分区。
Tolerance(容错):在集群出现分区时,整个系统也要持续对外提供服务
BASE理论
BASE理论是对CAP的一种解决思路,包含三个思想:
Basically Available (基本可用):分布式系统在出现故障时,允许损失部分可用性,即保证核心可用。
Soft State(软状态):在一定时间内,允许出现中间状态,比如临时的不一致状态。
Eventually Consistent(最终一致性):虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致。
而分布式事务最大的问题是各个子事务的一致性问题,因此可以借鉴CAP定理和BASE理论:
AP模式:各子事务分别执行和提交,允许出现结果不一致,然后采用弥补措施恢复数据即可,实现最终一致。
CP模式:各个子事务执行后互相等待,同时提交,同时回滚,达成强一致。但事务等待过程中,处于弱可用状态
file.conf(数据库配置)
mode = "db"
dbType = "mysql"
driverClassName = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true"
user = "root"
password = "root"
registry.conf(注册中心)
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = "48fef45a-f67d-415a-90fa-4beb523011a9"
cluster = "default"
username = "nacos"
password = "nacos"
}
registry.conf(配置中心)
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = "48fef45a-f67d-415a-90fa-4beb523011a9"
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
nacos配置中心
在之前新创建的命名空间seata_test(48fef45a-f67d-415a-90fa-4beb523011a9)
中创建配置文件
Data ID:seataServer.properties
Group:SEATA_GROUP
配置格式:Properties
修改后的内容如下:
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.zhaoyang_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=db
store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.sentinel.masterName=
store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
运行
windows系统运行seata-server.bat启动seata即可
Linux系统命令如下:
sh nacos-config.sh -h ip -p 端口 -g 组 -t namespace -u nacos用户名 -w nacos密码
sh nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -t 48fef45a-f67d-415a-90fa-4beb523011a9 -u nacos -w nacos
项目应用
在需要进行事务控制的子服务所连接的数据库添加表格、回滚SQL
CREATE TABLE `undo_log` (
`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(128) NOT NULL COMMENT 'global transaction id',
`context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='AT transaction mode undo table';
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.5.1</version>
</dependency>
spring:
datasource:
dynamic:
seata: true ## 是否启动对 Seata 的集成
seata:
enabled: true
enable-auto-data-source-proxy: true ##是否开启数据源自动代理,默认为true
tx-service-group: zhaoyang_test_tx_group ##要与配置文件中的vgroupMapping一致
registry: ##registry根据seata服务端的registry配置
type: nacos ##默认为file
nacos:
application: seata-server ##配置自己的seata服务
server-addr: 127.0.0.1:8848 ##根据自己的seata服务配置
username: nacos ##根据自己的seata服务配置
password: nacos ##根据自己的seata服务配置
namespace: 766c55b9-fa1d-4f3a-aa92-cfbb57445c51 ##根据自己的seata服务配置
cluster: default ## 配置自己的seata服务cluster, 默认为 default
group: SEATA_GROUP ##根据自己的seata服务配置
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848 ##配置自己的nacos地址
group: SEATA_GROUP ##配置自己的dev
username: nacos ##配置自己的username
password: nacos ##配置自己的password
namespace: 766c55b9-fa1d-4f3a-aa92-cfbb57445c51 ##配置自己的namespace
dataId: seataServer.properties ##配置自己的dataId,由于搭建服务端时把客户端的配置也写在了seataServer.properties,所以这里用了和服务端一样的配置文件,实际客户端和服务端的配置文件分离出来更好
数据源注解
@DS("db01")
分布式事务注解
@GlobalTransactional
配置文件信息记录备忘:
dataid:service.vgroupMapping.zhaoyang_test_tx_group
group:SEATA_GROUP
数据:default
(9)Canal
理论基础
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。canal 就是一个同步增量数据的一个工具。
canal 应用场景1
我们在做mysql与redis的数据同步时,往往采用的是代码层实现,或者通过spring-cache等缓存框架。但是仍然有某些场景,比如说原项目无源码,或者不能进行二开时,就需要独立的第三方来实现数据同步。我们需要一种无代码入侵式的数据同步,完全由第三方组件管理。这就需要借助canal来实现mysql到redis的数据同步
canal 应用场景2
将用户的订单信息传入后台。
后台服务器将订单信息保存到mysql数据库。
又 canal 进行监控mysql中的写操作变化,将发生修改(Insert) 的数据写入到kafka
通过sparkStreaming读取Kafka中的数据,进行计算。
将计算好的结果,重新写入到服务器中,并返回到浏览器。
canal的作用
它可以实现增量同步,拿A商品举例,第一批数据中,A商品有100条,canal便会将这批新增的数据写入Kafka,再交给spark处理。第二次又新增一批数据,于是canal又将监控到新增数据写入到Kafka中。依次类推,最终由spark计算出结果返回出去。
canal 除了写入kafka 还能将数据写入到其他中间件(mq、elasticsearch、hbase等)
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
环境配置
1.【mysql准备】
使用命令查看数据库是否开启binlog模式:
log_bin属性值为ON,则binlog模式开启;为OFF则binlog模式关闭。
show variables like 'log_%';
2.【canal解压】
canal.deployer-1.1.6.tar.gz
3. 【canal配置】
conf\example\instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.filter.regex=.*\\..*
conf\canal.properties
canal.port = 11111
canal.destinations = example
4. 【Mysql读取位置】
show master status;
reset master;
特别注意:读取位置position必须和meta.dat中一致
5.【启动canal】
bat
6. 【查看日志】
logs\example\example.log
项目应用
1. 【pom.xml】
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
2. 【application.yml】
canal:
server: 127.0.0.1:11111
destination: example
3. 【Canal开发】
EntryHandler<T>接口实现
// 说明:监听表中数据的变化【添加、修改、删除】、自动调用对应的方法
@Component
@CanalTable("person")
public class PersonHandler implements EntryHandler<Person> {
@Override
public void insert(Person person) {
System.out.println("表中添加了数据");
System.out.println(person);
}
@Override
public void update(Person before, Person after) {
System.out.println("表中修改了数据");
System.out.println(before);
System.out.println(after);
}
@Override
public void delete(Person person) {
System.out.println("表中删除了数据");
System.out.println(person);
}
}