中间件进阶
约 11751 字大约 39 分钟
(1)Zookeeper
理论基础
在分布式系统中,注册中心充当着重要角色,是服务发现、客户端负载均衡中不可缺少的一员。注册中心除了能够实现基本的功能外,他的稳定性、可用性和健壮性对整个分布式系统的流畅运行影响重大。zookeeper是最常使用的方式之一。
Zookeeper 是 Apache Hadoop 的子项目,是一个树型的目录服务,支持变更推送,工业强度较高。zookeeper是个CP系统,强一致性。
CAP
关于C、A、P三者的定义:
Consistency : Every read receives the most recent write or an error
Availability : Every request receives a (non-error) response – without the guarantee that it contains the most recent write
Partition tolerance : The system continues to operate despite an arbitrary number of messages being dropped (or delayed) by the network between nodes
即:
①一致性:对于客户端的每次读操作,要么读到的是最新的数据,要么读取失败。
换句话说,一致性是站在分布式系统的角度,对访问本系统的客户端的一种承诺:
要么我给您返回一个错误,要么我给你返回绝对一致的最新数据,其强调的是数据正确。
理解:所有节点在同一时间看到的数据是一致的。
②可用性:任何客户端的请求都能得到响应数据,不会出现响应错误。
换句话说,可用性是站在分布式系统的角度,对访问本系统的客户的另一种
承诺:我一定会给您返回数据,不会给你返回错误,但不保证数据最新,强调的是不出错。
③分区容忍性:由于分布式系统通过网络进行通信,网络是不可靠的。
当任意数量的消息丢失或延迟到达时,系统仍会继续提供服务,不会挂掉。
换句话说,分区容忍性是站在分布式系统的角度,对访问本系统的客户端的再一种
承诺:我会一直运行,不管我的内部出现何种数据同步问题,强调的是不挂掉。
CAP理论,为啥cp和ap互斥?
CP:一致性:支付,跟钱有关的业务
AP:可用性:电商大促,高并发场景
举例:应用两个节点a、b,数据库2个节点:db01,db02
应用a更新数据到db01,db01将数据同步给db02,应用b去查db02,得到同步后的数据,这个是正常情况。
a ——> db01(cluster)
|
b ——> db02(cluster)
假设一种极端情况,db01大量数据还没来得及同步给db02,这个时候用户请求应用b,
这个时候有二种选择:
第一,牺牲数据一致性,响应旧的数据给用户;
第二,牺牲可用性,阻塞等待,直到数据同步完成,再给用户响应最新的db02数据。
nacos是支持CP和AP切换的,这是其他注册中心不具备的。
配置
1. 【配置JAVA_HOME环境变量】
2. 【解压zookeeper】
apache-zookeeper-3.6.3-bin.zip
3. 【配置zookeeper】
conf\zoo_sample.cfg 改为 conf\zoo.cfg
4. 【启动zookeeper】
bin\zkServer.cmd
5. 【运行ZKUI】
IDEA
6. 【访问ZKUI】
http://127.0.0.1:9090/login
admin/manager
(2)Dubbo
理论基础
Dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和Spring框架无缝集成。
Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。
Provider(生产者): 暴露服务的服务提供方。
Consumer(消费者): 调用远程服务的服务消费方。
Registry(注册中心): 服务注册与发现的注册中心。dubbo推荐的是zookeeper。
1. 【解压】
dubbo-admin-develop.zip
2. 【后台配置dubbo-admin-server】
application.properties
注册中心
配置中心
元数据中心
admin.registry.address=zookeeper://127.0.0.1:2181
admin.config-center=zookeeper://127.0.0.1:2181
admin.metadata-report.address=zookeeper://127.0.0.1:2181
4. 【后台运行】
注意:Zookeeper是启动状态
实际项目中部署:Linux系统、Maven把springboot打jar包、java -jar 项目jar包。
6. 【前台dubbo-admin-ui】运行
注意:需要安装好node环境。
npm install
npm run dev
7. 【前台dubbo-admin-ui】访问
http://localhost:38082/
root/root
项目应用
Zookeeper + Dubbo 案例
1.【新建2个springboot项目,对应生产者和消费者、分别为dubbo1和dubbo2】
2.【dubbo1生产者:卖票的服务】
2.1【pom.xml】
<!-- dubbo-->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.3</version>
</dependency>
<!-- zkclient zookeeper客户端-->
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<!-- 导入zookeeper需要的包和插件-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.9</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
2.2【application.properties】
## 应用名称
spring.application.name=provider-service
## 应用服务 WEB 访问端口
server.port=8081
##服务应用名称
dubbo.application.name=provider-service
##注册中心地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
##哪些服务要被注册 会扫描这个包下的服务
dubbo.scan.base-packages=com.zhaoyang.service
2.3【接口和实现】
public interface TicketService {
public String getTicket();
}
import org.apache.dubbo.config.annotation.Service;
import org.springframework.stereotype.Component;
@Component
@Service(version = "1.0.0",timeout = 10000,interfaceClass = TicketService.class)
public class TicketServicelmpl implements TicketService {
@Override
public String getTicket() {
return "大连-->>北京 一等座";
}
}
3.【dubbo2消费者:卖票的服务】
3.1【application.properties】
## 应用名称
spring.application.name=consumer-service
## 应用服务 WEB 访问端口
server.port=8082
dubbo.application.name=consumer-service
dubbo.registry.address=zookeeper://127.0.0.1:2181
dubbo.protocol.port=20880
3.2【接口和实现】
public interface TicketService {
public String getTicket();
}
import org.apache.dubbo.config.annotation.Reference;
import org.springframework.stereotype.Service;
@Service
public class UserService {
//去注册中心拿服务
@Reference(version = "1.0.0",check = true)
TicketService tickerService;
public String buyTicker(){
String ticket = tickerService.getTicket();
return "在注册中心拿到:"+ticket;
}
}
4.【测试】
【开启Zookeeper,开启Dubbo-admin,消费者调用生产者】
(3)Sentinel
理论基础
http://sentinelguard.io/zh-cn/docs/introduction.html
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。
资源
资源是 Sentinel 的关键概念。它可以是 Java 应用程序中的任何内容,例如,由应用程序提供的服务,或由应用程序调用的其它应用提供的服务,甚至可以是一段代码。在接下来的文档中,我们都会用资源来描述代码块。
只要通过 Sentinel API 定义的代码,就是资源,能够被 Sentinel 保护起来。大部分情况下,可以使用方法签名,URL,甚至服务名称作为资源名来标示资源。
规则
围绕资源的实时状态设定的规则,可以包括流量控制规则、熔断降级规则以及系统保护规则。所有规则可以动态实时调整。
流量控制
流量控制在网络传输中是一个常用的概念,它用于调整网络包的发送数据。然而,从系统稳定性角度考虑,在处理请求的速度上,也有非常多的讲究。任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。Sentinel 作为一个调配器,可以根据需要把随机的请求调整成合适的形状
熔断降级
什么是熔断降级
除了流量控制以外,降低调用链路中的不稳定资源也是 Sentinel 的使命之一。由于调用关系的复杂性,如果调用链路中的某个资源出现了不稳定,最终会导致请求发生堆积。这个问题和 Hystrix 里面描述的问题是一样的。
Sentinel 和 Hystrix 的原则是一致的: 当调用链路中某个资源出现不稳定,例如,表现为 timeout,异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。
配置
(1)启动服务
java -jar -Dserver.port=9100 sentinel-dashboard-1.8.5.jar
(2)pom依赖
<!--SpringCloud ailibaba sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2021.1</version>
</dependency>
(3)yml配置
spring:
cloud:
sentinel:
eager: true ##服务启动直接建立心跳连接
transport:
port: 8719 ## 假如被占用了会自动从8719开始依次+1扫描。直至找到未被占用的端口,默认8719
dashboard: 127.0.0.1:9100 ## 指定控制台服务的地址
feign:
client:
config:
## 默认的超时时间设置
default:
connectTimeout: 5000
readTimeout: 5000
## 在指定的 FeignClient 设置超时时间,覆盖默认的设置
nacos-provider:
connectTimeout: 1000
readTimeout: 1000
loggerLevel: full
## 激活 Sentinel
sentinel:
enabled: true
(4)解决循环依赖问题:
spring:
main:
allow-circular-references: true
项目应用
@RestController
public class A {
// 流控测试:QPS
// 流控模式:直接、关联、链路
// 流控效果:快速失败、Warm up、排队等待
@RequestMapping("m1")
public String m1(){
System.out.println("m1...................................");
return "m1";
}
// 流控测试:并发线程数
@RequestMapping("m2")
public String m2(){
System.out.println("m2...................................");
return "m2";
}
}
/*
特别注意:
feign:
sentinel:
enabled: true
*/
@RestController
public class B {
// 熔断规则慢调用比例测试
// RT500、比例阈值1、熔断时长5、最小请求数1、统计时长1000
@RequestMapping("m3")
@SentinelResource(value = "m3", blockHandler = "test3")
public String m3() throws InterruptedException {
Thread.sleep(1000);
System.out.println("m3...................................");
return "m3";
}
String test3(BlockException e){
System.out.println(e);
return "慢调用熔断降级";
}
// 熔断规则异常数测试
// 比例阈值1、熔断时长5、最小请求数2、统计时长1000
@RequestMapping("m4")
@SentinelResource(value = "m4", blockHandler = "test4")
public String m4(){
int i = 4 / 0;
System.out.println("m4...................................");
return "m4";
}
String test4(BlockException e){
System.out.println(e);
return "异常数熔断降级";
}
}
(4)Rocketmq
理论基础
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息等。主要功能是异步解耦和流量削峰。
常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、RocketMQ
架构
NameServer:
NameServer 是一个服务与注册的发现中心。也是整个 RocketMQ 的“大脑”,所以 RocketMQ 需要先启动 NameServer,再启动 RocketMQ 中的 Broker
Broker:
消息服务器(Broker)是消息存储中心,主要作用是接收来自 Producer 的消息并存储,Consumer 从这里取得消息。存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等。
Producer:
Producer 也称为消息发布者(生产者),负责生产并发送消息至 Topic。生产者向 broker 发送由业务应用程序系统生成的消息。
RocketMQ 提供了发送:同步、异步和单向(one-way)的多种范例。
Consumer:
也称为消息订阅者,负责从 Topic 接收并消费消息。消费者从 brokers 那里拉取信息并将其输入应用程序。
配置
(1)配置JAVA_HOME和ROCKETMQ_HOME
(2)启动name server
bin目录cmd
mqnamesrv
The Name Server boot success. serializeType=JSON
(3)执行broker
bin目录cmd
mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true
The broker[zhaoyang, 192.168.3.22:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
(4)运维控制台Rocketmq-dashboard
Consumer消息订阅者
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
## mq service
rocketmq:
name-server: 127.0.0.1:9876
producer:
## 生产者组
group: my-mq-group-01
// 消费者
@Component
@RocketMQMessageListener(consumerGroup = "my-mq-group-01", topic = "topic-01")
public class MyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到的消息是\t" + message);
}
}
// 消费者:有序消息
@Component
@RocketMQMessageListener(consumerGroup = "my-mq-group-02", topic = "topic-02", consumeMode = ConsumeMode.ORDERLY)
public class MyConsumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到的消息是\t" + message);
}
}
Producer消息发布者
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
## mq service
rocketmq:
name-server: 127.0.0.1:9876
producer:
## 生产者组
group: my-mq-group-01
@SpringBootTest
class ProducerApplicationTests {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
void p1() {
for (int i = 0; i < 10; i++) {
// (1)入门案例:主题、消息、字符串转换为message对象再发送
rocketMQTemplate.convertAndSend("topic-01", "入门消息" + i);
}
}
@Test
void p2() {
for (int i = 0; i < 10; i++) {
// (2)同步消息:
// 原理:同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。
// 应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
// 注意:每次发送消息都有结果!
SendResult sendResult = rocketMQTemplate.syncSend("topic-01", "同步消息" + i);
System.out.println(sendResult);
}
}
@Test
void p3() {
for (int i = 0; i < 10; i++) {
// (3)异步消息:
// 原理:异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
// 消息队列RocketMQ版的异步发送,需要您实现异步发送回调接口(SendCallback)。
// 消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
// 应用场景:异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,
// 例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
rocketMQTemplate.asyncSend("topic-01", "异步消息" + i, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("回调:发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("回调:发送失败");
}
});
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Test
void p4() {
for (int i = 0; i < 10; i++) {
// (4)单向消息:
// 原理:
// 发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。
// 此方式发送消息的过程耗时非常短,一般在微秒级别。
// 应用场景:
// 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
rocketMQTemplate.sendOneWay("topic-01", "单向消息" + i);
}
}
@Test
void p5() {
// (5)顺序消息测试
// hashkey用来计算决定消息发送到哪个消息队列、一般是订单id或产品id
rocketMQTemplate.syncSendOrderly("topic-02", "10001创建", "10001");
rocketMQTemplate.syncSendOrderly("topic-02", "10001支付", "10001");
rocketMQTemplate.syncSendOrderly("topic-02", "10001完成", "10001");
rocketMQTemplate.syncSendOrderly("topic-02", "10002创建", "10002");
rocketMQTemplate.syncSendOrderly("topic-02", "10002支付", "10002");
rocketMQTemplate.syncSendOrderly("topic-02", "10002完成", "10002");
}
}
(5)RabbitMQ
理论基础
Erlang语言
最初用于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能非常优秀(Erlang有着和原生Socket一样的延迟)。
RabbitMQ
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。
RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
配置
(1)安装otp、并配置系统环境变量PATH
(2)安装 RabbitMQ、并sbin目录下运行
rabbitmq-plugins enable rabbitmq_management
(3)rabbitmq重新启动、并访问管理页面
http://127.0.0.1:15672
账号:guest
密码:guest
(4)Vue访问参数
export const PROTOCOL = 'ws' //连接协议
export const IP = 'localhost' //ip
export const PORT = '15674' //端口
export const ADDRESS = 'ws'
export const ACCOUNT = 'guest' //账号
export const PASSWORD = 'guest' //密码
Rabbit主要的端口说明:
4369 – erlang发现口
5672 --client端通信口
15672 – 管理界面ui端口
25672 – server间内部通信口
生产者消费者模型
生产者消费者模型:添加了一个队列,并创建了两个消费者用于监听队列消息,我们发现,当有消息到达时,两个消费者会交替收到消息。这一过程虽然不用创建交换机,但会使用默认的交换机,并用默认的直连(default-direct)策略连接队列
//生产者消费者模式的配置,包括一个队列和两个对应的消费者
@Configuration
public class ProducerConsumerConfig {
@Bean
public Queue myQueue() {
Queue queue=new Queue("myqueue");
return queue;
}
}
@Component
public class QueueListener1 {
@RabbitListener(queues = "myqueue")
public void displayMail(Mail mail) throws Exception {
System.out.println("队列监听器1号收到消息"+mail.toString());
}
}
@Component
public class QueueListener2 {
@RabbitListener(queues = "myqueue")
public void displayMail(Mail mail) throws Exception {
System.out.println("队列监听器2号收到消息"+mail.toString());
}
}
发布订阅模型
发布订阅模型,添加两个队列,分别各用一个消费者监听,设置一个交换机,类型为广播(fanout),交换机会将收到的消息广播给所有相连的队列
//发布订阅模式的配置,包括两个队列和对应的订阅者,发布者的交换机类型使用fanout(子网广播),两根网线binding用来绑定队列到交换机
@Configuration
public class PublishSubscribeConfig {
@Bean
public Queue myQueue1() {
Queue queue=new Queue("queue1");
return queue;
}
@Bean
public Queue myQueue2() {
Queue queue=new Queue("queue2");
return queue;
}
@Bean
public FanoutExchange fanoutExchange(){
FanoutExchange fanoutExchange=new FanoutExchange("fanout");
return fanoutExchange;
}
@Bean
public Binding binding1(){
Binding binding=BindingBuilder.bind(myQueue1()).to(fanoutExchange());
return binding;
}
@Bean
public Binding binding2(){
Binding binding=BindingBuilder.bind(myQueue2()).to(fanoutExchange());
return binding;
}
}
@Component
public class SubscribeListener1 {
@RabbitListener(queues = "queue1")
public void subscribe(Mail mail) throws IOException {
System.out.println("订阅者1收到消息"+mail.toString());
}
}
@Component
public class SubscribeListener2 {
@RabbitListener(queues = "queue2")
public void subscribe(Mail mail) throws IOException {
System.out.println("订阅者2收到消息"+mail.toString());
}
}
direct直连交换机通信模型
direct直连交换机通信模型,包括一个direct交换机,三个binding,两个队列,两个消费者监听器,消息只会被投入到routingkey一致的队列中
//direct直连模式的交换机配置,包括一个direct交换机,两个队列,三根网线binding
@Configuration
public class DirectExchangeConfig {
@Bean
public DirectExchange directExchange(){
DirectExchange directExchange=new DirectExchange("direct");
return directExchange;
}
@Bean
public Queue directQueue1() {
Queue queue=new Queue("directqueue1");
return queue;
}
@Bean
public Queue directQueue2() {
Queue queue=new Queue("directqueue2");
return queue;
}
//3个binding将交换机和相应队列连起来
@Bean
public Binding bindingorange(){
Binding binding=BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange");
return binding;
}
@Bean
public Binding bindingblack(){
Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("black");
return binding;
}
@Bean
public Binding bindinggreen(){
Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("green");
return binding;
}
}
@Component
public class DirectListener1 {
@RabbitListener(queues = "directqueue1")
public void displayMail(Mail mail) throws Exception {
System.out.println("directqueue1队列监听器1号收到消息"+mail.toString());
}
}
@Component
public class DirectListener2 {
@RabbitListener(queues = "directqueue2")
public void displayMail(Mail mail) throws Exception {
System.out.println("directqueue2队列监听器2号收到消息"+mail.toString());
}
}
topic主题交换机通信
topic主题交换机通信,包括一个topic交换机,三个binding,两个队列,两个消费者监听器,消息只会被投入到routingkey能够匹配的队列中,##表示0个或若干个关键字,*表示一个关键字
//topic交换机模型,需要一个topic交换机,两个队列和三个binding
@Configuration
public class TopicExchangeConfig {
@Bean
public TopicExchange topicExchange(){
TopicExchange topicExchange=new TopicExchange("mytopic");
return topicExchange;
}
@Bean
public Queue topicQueue1() {
Queue queue=new Queue("topicqueue1");
return queue;
}
@Bean
public Queue topicQueue2() {
Queue queue=new Queue("topicqueue2");
return queue;
}
//3个binding将交换机和相应队列连起来
@Bean
public Binding bindingtopic1(){
Binding binding=BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");//binding key
return binding;
}
@Bean
public Binding bindingtopic2(){
Binding binding=BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
return binding;
}
@Bean
public Binding bindingtopic3(){
Binding binding=BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.##");//##表示0个或若干个关键字,*表示一个关键字
return binding;
}
}
@Component
public class TopicListener1 {
@RabbitListener(queues = "topicqueue1")
public void displayTopic(Mail mail) throws IOException {
System.out.println("从topicqueue1取出消息"+mail.toString());
}
}
@Component
public class TopicListener2 {
@RabbitListener(queues = "topicqueue2")
public void displayTopic(Mail mail) throws IOException {
System.out.println("从topicqueue2取出消息"+mail.toString());
}
}
(6)Kafka
Zookeeper安装
https://downloads.apache.org/zookeeper/
(1)更新系统的包管理器
sudo yum update
(2)安装JDK
sudo yum install java-1.8.0-openjdk-devel
(3)下载ZooKeeper
cd /usr/local/
wget https://downloads.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
(4)解压ZooKeeper
tar -xvf apache-zookeeper-3.7.1-bin.tar.gz
(5)重命名为”zookeeper”
mv apache-zookeeper-3.7.1-bin zookeeper
(6)创建ZooKeeper数据目录
mkdir /usr/local/zookeeper/data
mkdir /usr/local/zookeeper/logs
(7)创建ZooKeeper配置文件:
ZooKeeper的滴答时间(以毫秒为单位)、ZooKeeper存储数据的数据目录、ZooKeeper监听的客户端端口
vim /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
(8)启动ZooKeeper
权限不足解决方案:su root、chmod a+xwr zkServer.sh
/usr/local/zookeeper/bin/zkServer.sh start
(9)使用如下命令检查ZooKeeper是否正在运行:
/usr/local/zookeeper/bin/zkServer.sh status
Kafka安装
https://kafka.apache.org/
权限不足解决方案:
chmod a+xwr kafka-topics.sh
chmod a+xwr kafka-console-producer.sh
chmod a+xwr kafka-console-consumer.sh
chmod a+xwr kafka-consumer-groups.sh
(1)安装Kafka
cd /usr/local/
tar -zxvf /usr/local/kafka_2.11-2.4.0.tgz
mv kafka_2.11-2.4.0 kafka
(2)配置Kafka
vim /usr/local/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.19.131:9092
log.dirs=/usr/local/kafka/data/kafka-logs
zookeeper.connect=192.168.19.131:2181
(3)启动Kafka
cd /usr/local/kafka/bin
./kafka-server-start.sh -daemon ../config/server.properties
ps -aux | grep server.properties
(4)使用Kafka
创建主题
./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 1 --topic test
查看主题
./kafka-topics.sh --list --zookeeper 192.168.19.131:2181
发送消息
./kafka-console-producer.sh --broker-list 192.168.19.131:9092 --topic test
接收消息方式一:从最后一条消息的偏移量+1开始消费
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --topic test
接收消息方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --from-beginning --topic test
(5)单播消息:一个消费组只有一个消费者能消费
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test
(6)多播消息:不同的消费者处于不同的消费组
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group2 --topic test
(7)查看消费组
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --list
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group1
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group2
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
group1 test 0 22 22 0 consumer-group1
Current-offset:当前消费组已经消费的偏移量
Log-end-offset:主题对应分区消息的结束偏移量(HW)
Lag:当前消费组未消费的消息数
(8)主题分区
./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 2 --topic topic1
./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic1
cd /usr/local/kafka/data/kafka-logs/topic1-0
cd /usr/local/kafka/data/kafka-logs/topic1-1
说明:定期将自己消费分区的offset提交给kafka内部topic、key是consumerGroupId+topic+分区、value是当前offset值
说明:kafka会定期清理topic里的消息、默认保存7天、7天后消息会被删除
说明:通过此公式可以选出consumer消费的offset要提交到哪个分区:hash(consumerGroupId)%__consumer_offsets主题分区数
__consumer_offsets-0
__consumer_offsets-49
Kafka集群
(1)Kafka集群、3个broker
3个server.properties
vim server0.properties
broker.id=0
listeners=PLAINTEXT://192.168.19.131:9092
log.dirs=/usr/local/kafka/data/kafka-logs-0
vim server1.properties
broker.id=1
listeners=PLAINTEXT://192.168.19.131:9093
log.dirs=/usr/local/kafka/data/kafka-logs-1
vim server2.properties
broker.id=2
listeners=PLAINTEXT://192.168.19.131:9094
log.dirs=/usr/local/kafka/data/kafka-logs-2
./kafka-server-start.sh -daemon ../config/server0.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
(2)副本:1个主题、2个分区、3个副本
./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 3 --partitions 2 --topic topic2
./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic2
Topic: topic2 PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: topic2 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: topic2 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Leader:
写和读的操作都在Leader上、Leader负责把数据同步到follower、当leader挂了、经过主从选举、从多个follower中选举产生一个新Leader
Follower:
接收leader的同步的数据
Isr:
可以同步的broker节点和已同步的broker节点、存放在isr集合中、如果isr节点中的性能较差、会被踢出isr集合
总结:broker、主题、分区、副本
[root@web-server data]## ls
kafka-logs kafka-logs-0 kafka-logs-1 kafka-logs-2
[root@web-server kafka-logs-1]## ls
cleaner-offset-checkpoint log-start-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint topic2-0 topic2-1
[root@web-server kafka-logs-2]## ls
cleaner-offset-checkpoint log-start-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint topic2-0 topic2-1
(3)Kafka集群消息的发送
./kafka-console-producer.sh --broker-list 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --topic topic2
(4)Kafka集群消息的发送
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group1 --from-beginning --topic topic2
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group2 --from-beginning --topic topic2
难点:一个Partition只能被一个组中的一个Consumer消费、一个Consumer可以消费多个Partition。
注意:Kafka只在Partition分区的范围内保证消息消费的局部顺序性、不能在同一个topic主题中的多个Partition中保证总的消费顺序性。
Kafka-eagle监控
(1)安装JDK
yum install java-1.8.0-openjdk-devel
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64
java -version
(2)解压
tar -zxvf kafka-eagle-bin-3.0.1.tar.gz
tar -zxvf efak-web-3.0.1-bin.tar.gz
mv efak-web-3.0.1 efak-web
mv efak-web ../
cd /usr/local/efak-web
(3)配置环境变量
vim /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64
export KE_HOME=/usr/local/efak-web
export PATH=$PATH:$JAVA_HOME/bin:$KE_HOME/bin
source /etc/profile
(4)kafka-eagle内部配置问
vim /usr/local/efak-web/conf/system-config.properties
efak.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.19.131:2181
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://192.168.3.53:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root
(5)启动
./ke.sh start
http://192.168.19.131:8048
admin/123456
windows版本启动问题解决方案:
例如:如果原来希望输入的命令为:
C:\Program Files\Java\jdk-11.0.12\bin\java.exe -jar xxx.jar
1
现在应改为:
"C:\Program Files\Java\jdk-11.0.12\bin\java.exe" -jar xxx.jar
案例:zookeeper+kafka
一、安装docker
1、Docker 要求 CentOS 系统的内核版本高于 3.10 ,查看本页面的前提条件来验证你的CentOS 版本是否支持 Docker 。
通过 uname -r 命令查看你当前的内核版本
$ uname -r
2、使用 root 权限登录 Centos。确保 yum 包更新到最新。
$ sudo yum update
3、卸载旧版本(如果安装过旧版本的话)
$ sudo yum remove docker docker-common docker-selinux docker-engine
4、安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2
5、设置yum源
$ sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
6、可以查看所有仓库中所有docker版本,并选择特定版本安装
$ yum list docker-ce --showduplicates | sort -r
7、安装docker
##$ sudo yum install docker-ce ##没有版本默认安装最新版本、由于repo中默认只开启stable仓库,故这里安装的是最新稳定版17.12.0
$ sudo yum install <FQPN> ## 例如:sudo yum install docker-ce-17.12.0.ce
8、启动并加入开机启动
$ sudo systemctl start docker
$ sudo systemctl enable docker
9、验证安装是否成功(有client和service两部分表示docker安装启动都成功了)
$ docker version
##############
拉取zookeeker
docker pull wurstmeister/zookeeper
拉取kafka版本为2.12-2.2.0,不填写版本好则安装最新,但是个别系统会报错
docker pull wurstmeister/kafka:2.12-2.2.0
启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
启动kafka
docker run --name kafka01 \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=127.0.0.1:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-d wurstmeister/kafka
进入Kafka容器类kafka01是容器名称,也可以填写成容器ID
docker exec -it kafka01 /bin/bash
创建my_log topic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.3.191:2181 --replication-factor 1 --partitions 1 --topic my_log
查询创建的主题
/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.3.191:2181
(7)Solr
理论基础
全文检索
数据总体分为两种:结构化数据和非结构化数据。
1)结构化数据:指具有固定格式或有限长度的数据,如数据库等。
2)非结构化数据:指不定长或无固定格式的数据,如邮件,word 文档等。非结构化数据又一种叫法叫全文数据。
按照数据的分类,搜索也分为两种:
1)对结构化数据的搜索:如对数据库的搜索,用 SQL 语句。
2)对非结构化数据的搜索:如利用 windows 的搜索也可以搜索文件内容,再如用 Google 和百度可以搜索大量内容数据。
Lucene 简介
Lucene 是一个高效的,基于 Java 的全文检索库,是一个开放源代码的全文检索引擎工具包,Lucene 提供了一个简单却强大的应用程序接口,能够做全文索引和搜寻。
在 Java 开发环境里 Lucene 是一个成熟的免费开源工具。
Solr 简介
Solr 基于 Lucene 的高性能的全文搜索服务器,是一个独立的企业级搜索应用服务器,采用 Java 开发。同时对其进行了扩展,提供了比 Lucene 更为丰富的查询语言,同时实现了可配置、可扩展并对查询性能进行了优化,并且提供了一个完善的功能管理界面,是一款非常优秀的全文检索引擎。
配置
(1)设置环境变量PATH
D:\solr-8.6.0\bin
(2)启动服务
solr start
http://localhost:8983/solr
solr stop -all
(3)创建核心文档
D:\solr-8.6.0\server\solr
solr.cmd create -c solrTest
Mysql导入
1 导入相关 jar包
【maven】
mysql-connector-java-8.0.30.jar
【D:\solr-8.6.0\dist】
solr-dataimporthandler-8.11.1.jar
solr-dataimporthandler-extras-8.11.1.jar
【D:\solr-8.6.0\contrib\analysis-extras\lucene-libs】
lucene-analyzers-smartcn-8.6.0.jar
D:\solr-8.6.0\server\solr-webapp\webapp\WEB-INF\lib
2 配置连接信息
【db-data-config.xml配置】
<dataConfig>
<dataSource driver="com.mysql.cj.jdbc.Driver" url="jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=utf-8&serverTimezone=Asia/Shanghai&autoReconnect=true&useSSL=false&allowPublicKeyRetrieval=true" user="root" password="123456" />
<document>
<entity name="poem" query="select * from poem">
<field column="poemcontent" name="poemcontent" />
</entity>
</document>
</dataConfig>
【managed-schema配置】
<field name="poemcontent" type="text_ik" indexed="true" stored="true"/>
<!-- ChineseAnalyzer -->
<fieldType name="text_ik" class="solr.TextField" positionIncrementGap="100">
<analyzer type="index">
<tokenizer class="org.apache.lucene.analysis.cn.smart.HMMChineseTokenizerFactory"/>
</analyzer>
<analyzer type="query">
<tokenizer class="org.apache.lucene.analysis.cn.smart.HMMChineseTokenizerFactory"/>
</analyzer>
</fieldType>
【solrconfig.xml配置】
<requestHandler name="/dataimport" class="solr.DataImportHandler">
<lst name="defaults">
<str name="config">db-data-config.xml</str>
</lst>
</requestHandler>
SpringBoot+Solr
<!--集成solr搜索引擎客户端-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-solr</artifactId>
<version>2.4.13</version>
</dependency>
spring:
data:
solr:
host: http://127.0.0.1:8983/solr/solrTest
import lombok.Data;
import org.apache.solr.client.solrj.beans.Field;
import java.io.Serializable;
@Data
public class Poem implements Serializable {
@Field("id")
private String id;
@Field("poemcontent")
private String poemcontent;
}
@SpringBootTest
class SolrApplicationTests {
@Autowired
SolrClient solrClient;
@Test
void add() throws SolrServerException, IOException {
Poem poem = new Poem();
poem.setId("101");
poem.setPoemcontent("敕勒歌 北朝民歌");
solrClient.addBean(poem);
UpdateResponse commit = solrClient.commit();
System.out.println(commit.toString());
solrClient.close();
}
@Test
public void update() throws IOException, SolrServerException {
// 所谓Solr的更新操作,就是对相同id的文档重新添加一次。修改之后,Version变得不一样了。
Poem poem = new Poem();
poem.setId("101");
poem.setPoemcontent("敕勒歌 北朝民歌 敕勒川,阴山");
solrClient.addBean(poem);
UpdateResponse commit = solrClient.commit();
System.out.println(commit.toString());
solrClient.close();
}
@Test
public void delete() throws IOException, SolrServerException {
UpdateResponse updateResponse = solrClient.deleteById("101");
UpdateResponse commit = solrClient.commit();
System.out.println(commit.toString());
solrClient.close();
}
@Test
public void list() throws IOException, SolrServerException {
SolrQuery query = new SolrQuery("poemcontent:春风");
//添加需要回显得内容
query.addField("id");
query.addField("poemcontent");
//设置每页显示多少条
query.setRows(20);
//执行查询返回QueryResponse
QueryResponse response = solrClient.query(query);
//获取doc文档
List<Poem> poemList=response.getBeans(Poem.class);
poemList.forEach(System.out::println);
solrClient.close();
}
}
(8)Elasticsearch
Elasticsearch和Solr的区别
Elasticsearch和Solr都是基于Lucene的分布式搜索引擎,它们具有高效、可扩展、分布式的特点。Elasticsearch主要适用于实时搜索、分析和数据可视化,Solr主要适用于企业级搜索。
Elasticsearch在大数据存储和实时搜索方面性能更优秀,Solr在文本分析、搜索语义理解等方面性能更强。Elasticsearch默认情况下集成了近实时搜索的功能,可以在几秒钟内从文档变更时就对新文档进行索引,而Solr需要手动设置使其能够支持实时搜索。
实例:假如有100万条数据需要搜索,使用Elasticsearch进行搜索,响应时间通常在毫秒或者数秒之间。而使用Solr进行搜索,其响应时间通常在数秒或者数十秒之间。
Elasticsearch和Solr在查询语法方面有所不同。Solr支持丰富的查询语法,能够满足更多复杂的查询需求;而Elasticsearch则采用了面向文档的查询方式,让用户能够更加方便地进行查询。同时,Elasticsearch支持通过API进行搜索以及通过Kibana进行数据可视化。
实例:假如有如下文本:“The quick brown fox jumps over the lazy dog”,我们可以用Solr进行复杂查询,如 “fox OR dog” 或 “The AND fox”,而在Elasticsearch中,我们可以更加直接地查询:“fox” 或者 “dog”。
Elasticsearch是一个开源项目,拥有庞大的社区。其API和插件的文档十分丰富,能够满足几乎所有的开发和使用需求。Solr也是一个开源项目,但是相较于Elasticsearch的社区,Solr的社区相对较小,在一些新特性的实现和开发方面可能会有所落后。
实例:如果你使用Elasticsearch中遇到了问题,你可以很容易地在论坛或者社区中找到支持和帮助,而如果你在使用Solr中遇到了问题,可能需要花费更长的时间来等待社区的响应。
Elasticsearch配置
(1)【解压:elasticsearch-8.4.0】
配置:
elasticsearch.yml跨域配置
http.cors.enabled: true
http.cors.allow-origin: "*"
elasticsearch.yml访问不到9200解决
xpack.security.enabled: false
运行:elasticsearch.bat
访问:192.168.3.49:9200
(2)【解压:elasticsearch-head-master】
安装:nodejs
问题解决:
1.删除C:\Users\用户\下的.npmrc文件
2.npm cache clean --force
3.npm install -g cnpm --registery=https://registery.npm.taobao.org
运行:npm install / npm run start
访问:192.168.3.49:9100
(3)【解压:kibana-8.4.0】
配置:kibana.yml配置中文
i18n.locale: "zh-CN"
启动:kibana.bat
(4)【解压:ik分词器D:\elasticsearch-8.4.0\plugins\ik】
(5)测试入门
最小切面
GET _analyze
{
"analyzer": "ik_smart",
"text": "中国共产党"
}
最细力度划分
GET _analyze
{
"analyzer": "ik_max_word",
"text": "中国共产党"
}
(6)自定义字典
【IKAnalyzer.cfg.xml】
<!--用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict">zhaoyang.dic</entry>
【zhaoyang.dic】
赵阳
赵哥
大连赵哥
索引操作
(1)创建索引
ES 软件的索引可以类比为 MySQL 中表的概念,创建一个索引,类似于创建一个表
ES 不允许修改索引
## 创建索引
## PUT+索引名
PUT myindex
(2)查询指定索引
根据索引名称查询指定索引,如果查询到,会返回索引的详细信息
## 查询索引
## GET 索引名称
GET myindex
(3)查询所有索引
这里请求路径中的_cat 表示查看的意思,indices表示索引,所以整体含义就是查看当前 ES 服务器中的所有索引
## 查询索引
GET _cat/indices
(4)删除索引
删除索引
删除指定已存在的索引
## 删除索引
## DELETE+索引名称
DELETE test_inde
文档操作
(1)创建文档
这里的文档可以类比为关系型数据库中的表数据,添加的数据格式为 JSON 格式
如果在创建数据时,指定唯一性标识,那么请求范式 POST,PUT 都可以
如果没有指定数据唯一性标识,只能使用 POST 请求
## 创建文档
## 创建文档
POST myindex/_doc/001
{
"id" : 1001,
"name" : "zhangsan",
"age" : 30
}
POST myindex/_doc/002
{
"id" : 1002,
"name" : "lisi",
"age" : 18
}
POST myindex/_doc/003
{
"id" : 1004,
"name" : "wangwu",
"age" : 30
}
POST myindex/_doc/004
{
"id" : 1004,
"name" : "zhaoliu",
"age" : 35
}
(2)查询文档
根据唯一性标识可以查询对应的文档
## 查询文档
GET myindex/_doc/001
(3)修改文档
修改文档本质上和新增文档是一样的,如果存在就修改,如果不存在就新增
## 修改文档
PUT myindex/_doc/001
{
"age":20
}
(4)删除文档
删除一个文档不会立即从磁盘上移除,它只是被标记成已删除(逻辑删除)
## 删除文档
DELETE myindex/_doc/001
(5)查询所有文档
## 查询所有文档
GET myindex/_search
数据搜索
(1)匹配查询文档
这里的查询表示文档数据中 JSON 对象数据中的 name 属性是lisi
GET myindex/_search
{
"query": {
"match": {
"name": "lisi" ##不会查出li si 此时查询关键字是lisi 而li si 的关键词是两个【li】【si】匹配不上
}
}
}
GET myindex/_search
{
"query": {
"term": {
"name": {
"value": "li si" ##会查出li si 不会查出lisi 此时关键字是li si
}
}
}
}
(2)匹配查询字段
默认情况下,Elasticsearch 在搜索的结果中,会把文档中保存在_source 的所有字段都返回。如果我们只想获取其中的部分字段,我们可以添加_source 的过滤
GET myindex/_search
{
"_source": ["name","age"],
"query": {
"term": {
"name": {
"value": "lisi"
}
}
}
}
(3)组合"or"
GET myindex/_search
{
"_source": ["name","age"],
"query": {
"bool": {
"should": [
{
"match": {
"name": "lisi"
}
},
{
"match": {
"age": 35
}
}
]
}
}
}
(4)排序
GET myindex/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"age": {
"order": "desc"
}
}
]
}
(5)分页
GET myindex/_search
{
"query": {
"match_all": {}
},
"from": 0,
"size": 2
}
(6)分组
GET myindex/_search
{
"aggs": {
"ageGroup": {
"terms": {
"field": "age"
}
}
},
"size": 0 ##只显示分组信息 不显示源信息
}
(7)平均值
GET myindex/_search
{
"aggs": {
"ageAvg": {
"avg": {
"field": "age"
}
}
},
"size": 0
}
(8)求和
GET myindex/_search
{
"aggs": {
"ageGroup": {
"terms": {
"field": "age"
},
"aggs": {
"ageSum": {
"sum": {
"field": "age"
}
}
}
}
},
"size": 0
}
(9)TopN
GET myindex/_search
{
"aggs": {
"Top3": {
"top_hits": {
"sort": [
{
"age": {
"order": "desc"
}
}
],
"size": 3
}
}
},
"size": 0
}
数据搜索
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.1.0</version>
</dependency>
@Configuration
public class ElasticSearchConfig {
@Bean
public ElasticsearchClient elasticsearchClient(){
RestClient client = RestClient.builder(new HttpHost("localhost", 9200,"http")).build();
ElasticsearchTransport transport = new RestClientTransport(client,new JacksonJsonpMapper());
return new ElasticsearchClient(transport);
}
}
/**
*
* springBoot整合ElasticSearch8.x版本
*
*/
@SpringBootTest
class EsApplicationTests {
@Autowired
private ElasticsearchClient client;
// 索引 CRUD
// (1)增加index
@Test
public void createTest() throws IOException {
CreateIndexResponse indexResponse = client.indices().create(c -> c.index("user"));
}
// (2)查询Index
@Test
public void queryTest() throws IOException {
GetIndexResponse getIndexResponse = client.indices().get(i -> i.index("user"));
}
// (3)判断index是否存在
@Test
public void existsTest() throws IOException {
BooleanResponse booleanResponse = client.indices().exists(e -> e.index("user"));
System.out.println(booleanResponse.value());
}
// (4)删除index
@Test
public void deleteTest() throws IOException {
DeleteIndexResponse deleteIndexResponse = client.indices().delete(d -> d.index("user"));
System.out.println(deleteIndexResponse.acknowledged());
}
// Document CRUD
// (1)插入document
@Test
public void addDocumentTest() throws IOException {
User user = new User("user1", 10);
IndexResponse indexResponse = client.index(i -> i
.index("user")
//设置id
.id("1")
//传入user对象
.document(user));
}
// (2)更新Document
@Test
public void updateDocumentTest() throws IOException {
UpdateResponse<User> updateResponse = client.update(u -> u
.index("user")
.id("1")
.doc(new User("user2", 13))
, User.class);
}
// (3)判断Document是否存在
@Test
public void existDocumentTest() throws IOException {
BooleanResponse indexResponse = client.exists(e -> e.index("user").id("1"));
System.out.println(indexResponse.value());
}
// (4)查询Document
@Test
public void getDocumentTest() throws IOException {
GetResponse<User> getResponse = client.get(g -> g
.index("user")
.id("1")
, User.class
);
System.out.println(getResponse.source());
}
// (5)删除Document
@Test
public void deleteDocumentTest() throws IOException {
DeleteResponse deleteResponse = client.delete(d -> d
.index("user")
.id("1")
);
System.out.println(deleteResponse.id());
}
// (6)批量插入Document
@Test
public void bulkTest() throws IOException {
List<User> userList = new ArrayList<>();
userList.add(new User("user1", 11));
userList.add(new User("user2", 12));
userList.add(new User("user3", 13));
userList.add(new User("user4", 14));
userList.add(new User("user5", 15));
List<BulkOperation> bulkOperationArrayList = new ArrayList<>();
//遍历添加到bulk中
for(User user : userList){
bulkOperationArrayList.add(BulkOperation.of(o->o.index(i->i.document(user))));
}
BulkResponse bulkResponse = client.bulk(b -> b.index("user")
.operations(bulkOperationArrayList));
}
// (7)查询
@Test
public void searchTest() throws IOException {
SearchResponse<User> search = client.search(s -> s
.index("user")
//查询name字段包含hello的document(不使用分词器精确查找)
.query(q -> q
.term(t -> t
.field("name")
.value(v -> v.stringValue("hello"))
))
//分页查询,从第0页开始查询3个document
.from(0)
.size(3)
//按age降序排序
.sort(f->f.field(o->o.field("age").order(SortOrder.Desc))),User.class
);
for (Hit<User> hit : search.hits().hits()) {
System.out.println(hit.source());
}
}
}
(9)Logstash
继承理论
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。
Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件,可帮助你解析,丰富,转换和缓冲来自各种来源的数据。 如果你的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。
Logstash 是 Elastic 栈非常重要的一部分,但是它不仅仅为 Elasticsearch 所使用。它可以介绍广泛的各种数据源。Logstash 可以帮利用它自己的 Filter 帮我们对数据进行解析,丰富,转换等。
最后,它可以把自己的数据输出到各种需要的数据储存地,这其中包括 Elasticsearch。
自动同步Mysql和ES
1、下载Logstash工具(版本必须一致)
logstash-8.4.0-windows-x86_64.zip
2、mysql数据库可用
3、在解压后的logstash目录下新建文件夹mysql
mysql驱动放入到上面新建的mysql目录下
4、创建脚本文件find.sql
在mysql目录下,新建文件find.sql,并写入数据备份所用到的查询sql,如:
-- student 为需要导入的表
select * from student
5、创建配置文件mysql.conf
在mysql目录下,新建文件mysql.conf,并写入如下配置:
input {
stdin {
}
jdbc {
## mysql 数据库链接,jdbc版本比较大的要加上?后面那串字符
jdbc_connection_string => "jdbc:mysql://localhost:3306/数据库名?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
## 用户名和密码
jdbc_user => "用户名"
jdbc_password => "密码"
## 驱动
jdbc_driver_library => "mysql-connector-java.jar包所在路径"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
## 执行的sql 就是上一步创建的sql文件的绝对路径+文件名字
statement_filepath => "find.sql文件所在路径"
## 也可以写一个要执行sql语句,替代statement_filepath的方式
##statement => "select * from tb_article"
## 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
## 索引类型
type => "student"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
## ES的IP地址及端口
hosts => ["localhost:9200"]
## 索引名称,elasticsearch叫做索引,相当于es的数据库
index => "自定义索引名"
## 自增ID id必须是待查询的数据表的序列字段
document_id => "%{表的主键字段名}"
}
stdout {
## JSON格式输出
codec => json_lines
}
}
参考配置例子如下:
input {
stdin {
}
jdbc {
## mysql 数据库链接,center为数据库名,jdbc版本比较大的要加上?后面那串字符
jdbc_connection_string => "jdbc:mysql://localhost:3306/mysql_test?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
## 用户名和密码
jdbc_user => "root"
jdbc_password => "123456"
## 驱动
jdbc_driver_library => "D:\logstash-8.4.0\mysql\mysql-connector-java-8.0.30.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
## 执行的sql 就是上一步创建的sql文件的绝对路径+文件名字
statement_filepath => "D:\logstash-8.4.0\mysql\find.sql"
## 也可以写一个要执行sql语句,替代statement_filepath的方式
##statement => "select * from tb_article"
## 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
## 索引类型
type => "student"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
## ES的IP地址及端口
hosts => ["localhost:9200"]
## 索引名称,elasticsearch叫做索引,相当于es的数据库
index => "mysql"
## 自增ID id必须是待查询的数据表的序列字段
document_id => "%{id}"
}
stdout {
## JSON格式输出
codec => json_lines
}
}
6、启动Logstash开始mysql表数据备份
打开命令行窗口,进入到zip包解压后logstash的目录bin下,输入:
//logstash -f mysql.conf文件路径,如:
logstash -f D:\logstash-8.4.0\mysql\mysql.conf
GET test-student/_search
{
"query":{
"match_all": {}
}
}
(10)Elastic-Job
简介
elastic-job 是由当当网基于quartz 二次开发之后的分布式调度解决方案 , 由两个相对独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成 。
elastic-job主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。
elastic-job是当当网基于Zookepper 、Quartz开源的一个java分布式定时任务,解决了Quartz不支持分布式的弊端.
elastic-job由两个相互独立子项目Elastic-Job-Lite 、 Elastic-Job-Cloud组成.
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅Zookeeper
Thread
package com.zhaoyang.demo;
import java.io.IOException;
import java.time.LocalDateTime;
public class A {
// 每隔3s执行一次。
public static void main(String[] args) throws IOException {
new Thread(() -> {
while (true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务"+ LocalDateTime.now());
}
}).start();
System.in.read();
}
}
Timer
package com.zhaoyang.demo;
import java.time.LocalDateTime;
import java.util.Timer;
import java.util.TimerTask;
public class B {
// 5s之后开始执行,后续每隔3s执行一次。
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("执行任务"+ LocalDateTime.now());
}
}, 5000,3000);
}
}
ScheduledThreadPoolExecutor
package com.zhaoyang.demo;
import java.time.LocalDateTime;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class C {
// 5s之后开始执行,后续每隔3s执行一次。
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("执行任务"+ LocalDateTime.now());
}
}, 5, 3, TimeUnit.SECONDS);
}
}
Spring Task
package com.zhaoyang.demo;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 以上几种方式都有几个共同的缺点:
*
* 单线程执行,若前一个任务执行时间较长,会导致下一个任务饥饿阻塞
* 无分布式协调机制,如果只有一个节点就会单点风险,如果部署多个节点就会有并发执行的问题
* 随着任务规模增多,无统一视角对其进行任务进度进行追踪和管控
* 功能比较简单,没有超时、重试等高级特性
*/
@Component
public class MySpringTask {
@Scheduled(cron = "0/5 * * * * ?")
public void test(){
System.out.println("执行SpringTask");
}
}
SimpleJob简单作业-基础
package com.zhaoyang.elastic;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import java.time.LocalDateTime;
// (1)SimpleJob-简单作业
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println("执行任务"+ LocalDateTime.now());
}
}
package com.zhaoyang.elastic;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
// (2)SimpleJob-简单作业:集成springboot
@Component
public class MySimpleJob2 implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println("执行任务2"+ LocalDateTime.now());
}
}
DataflowJob数据流作业-基础
package com.zhaoyang.elastic;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
// (2)DataflowJob-数据流作业
public class MyDataflowJob implements DataflowJob<String> {
@Override
public List<String> fetchData(ShardingContext shardingContext) {
List<String> data = new ArrayList<>();
data.add("数据1");
data.add("数据2");
data.add("数据3");
data.add("数据4");
return data;
}
@Override
public void processData(ShardingContext shardingContext, List<String> list) {
System.out.println(LocalDateTime.now()+"处理数据:"+list);
}
}
SimpleJob简单作业-进阶
package com.zhaoyang;
import com.zhaoyang.elastic.MySimpleJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
// (1)SimpleJob-简单作业
public class TestMySimpleJob {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfiguration()).schedule();
}
// 连接Zookeeper
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));
regCenter.init();
return regCenter;
}
// 创建作业配置
private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder("MySimpleJob", 1)
.cron("0/3 * * * * ?")
.build();
}
}
DataflowJob数据流作业-进阶
package com.zhaoyang;
import com.zhaoyang.elastic.MyDataflowJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
/**
*
* (2)DataflowJob-数据流作业
*
* streaming.process=true,表示开启流式处理,默认为false
* overwrite=true,表示要重写Job配置,如果不设置这个,新修改的或新增的配置将不会生效
* 一旦这么做了之后,我们会发现以上代码会不停的执行任务,而不是每隔3s执行一次了。
*
* 这是因为,如果开启流式处理,则作业只有在 fetchData 方法的返回值为 null 或集合容量为空时,才停止抓取,否则作业将一直运行下去; 如果关闭流式处理,则作业只会在每次作业执行过程中执行一次 fetchData 和 processData 方法,随即完成本次作业。
*
* 所以,以上代码每次调用 fetchData 方法都能获取到数据,所以会一直执行。
*
* 如果采用流式作业处理方式,那么就需要业务代理自己来控制什么时候从fetchData获取不到数据,从而停止本次任务的执行。
*/
public class TestMyDataflowJob {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), new MyDataflowJob(), createJobConfiguration()).schedule();
}
// 连接Zookeeper
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));
regCenter.init();
return regCenter;
}
// 创建作业配置
private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder("MyDataflowJob", 1)
.cron("0/3 * * * * ?")
.build();
}
}
ScheduleJobBootstrap脚本作业
package com.zhaoyang;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.script.props.ScriptJobProperties;
// (3)脚本作业
/**
* 注意ScheduleJobBootstrap的第二个参数为"SCRIPT",另外通过设置script.command.line来配置要执行的脚本。
*
* 其底层其实就是利用的CommandLine来执行的命令,所以只要在你机器上能执行的命令,那么就可以在这里进行设置并执行。
*/
public class TestScriptJob {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), "SCRIPT", createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
// 创建作业配置
return JobConfiguration.newBuilder("MyScriptJob", 1)
.cron("0/5 * * * * ?")
.setProperty(ScriptJobProperties.SCRIPT_KEY, "java -version")
.overwrite(true)
.build();
}
}
HTTP作业
package com.zhaoyang;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.http.props.HttpJobProperties;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
// (4)HTTP作业(3.0.0-beta 提供)
/**
* 注意ScheduleJobBootstrap的第二个参数为"HTTP",另外通过设置http.uri、http.method等参数来配置请求信息。
*
* 其底层其实就是利用的HttpURLConnection来实现的。
*
* 如果要看到调用结果,得把日志级别设置为debug,因为在HttpJobExecutor源码中中是这么打印请求结果的:
*/
public class TestHttpJob {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), "HTTP", createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
// 创建作业配置
return JobConfiguration.newBuilder("MyHttpJob", 1)
.cron("0/5 * * * * ?")
.setProperty(HttpJobProperties.URI_KEY, "http://www.baidu.com")
.setProperty(HttpJobProperties.METHOD_KEY, "GET")
.setProperty(HttpJobProperties.DATA_KEY, "source=ejob") // 请求体
.overwrite(true)
.build();
}
}