Spring Cloud

Spring Cloud

Spring Cloud可以将大的项目拆分成若干个小的模块,每一个模块都是一个独立的子系统,所有子模块组合起来完成系统总功能。

基础功能:

  1. 服务治理:Spring Cloud Eureka
  2. 客户端负载均衡:Spring Cloud Ribbon
  3. 服务器容错保护:Spring Cloud Hystrix
  4. 声明式服务调用:Spring Cloud Feign
  5. API网关服务:Spring Cloud Zuul
  6. 分布式配置中心:Spring Cloud Config

高级功能:

  1. 消息总线:Spring Cloud Bus
  2. 消息驱动的微服务:Spring Cloud Stream
  3. 分布式服务跟踪:Spring Cloud Sleuth

Spring Cloud技术栈

消息中心:StreamBus
配置中心:GitZooKeeper
授权认证中心:SecurityJWTOauth
缓存中心:Data
文档中心:Swagger
服务注册与发现:EurekaConsulZooKeeper
网关路由:ZuulGateway
服务调用:RibbonFeignHystrix
监控:ActuctorAdmin
链路监控:SleuthZipkin

Spring Cloud 与 Dubbo 的区别:
服务注册:Dubbo使用的ZooKeeper,注重数据一致性,抛弃高可用性;Spring Cloud使用的Eureka,注重高可用性,存在自我保护机制,允许旧数据;
服务调用:Dubbo使用RPC;Spring Cloud使用REST API;

服务监控:Dubbo使用Dubbo-monitor,由Provider和Consumer统计服务调用成功次数、失败次数、平均响应时间,然后实时提交到监控中心;Spring Cloud使用Spring Boot Admin;
断路器:Dubbo无;Spring Cloud Hystrix;
服务网关:Dubbo无;Spring Cloud Zuul;
分布式配置:Dubbo无;Spring Cloud Config;
服务跟踪:Dubbo无;Spring Cloud Sleuth;
消息总线:Dubbo无;Spring Cloud Bus;
数据流:Dubbo无;Spring Cloud Stream;
批量任务:Dubbo无;Spring Cloud Task


IDEA中Spring Cloud项目的创建

首先新建一个总的Maven项目作为Spring Cloud的父项目,在父项目下New -> Module,每一个Module就是一个子微服务。
父项目中添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.bat</groupId>
<artifactId>spring_cloud_in_action</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>eureka_server_8761</module>
<module>config</module>
<module>eureka_client_provider_7777</module>
</modules>

<!-- 添加Spring Boot依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.7.RELEASE</version>
</parent>

<!-- 添加依赖 -->
<dependencies>
<!-- web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- JDK8相关配置 -->
<!-- jaxb用于xml与Bean相互转化 -->
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>

<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<!-- 允许Optional替代null -->
<optional>true</optional>
</dependency>
</dependencies>

<!-- Spring Cloud依赖管理 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

</project>

(一)Spring Cloud Config:配置文件的集体管理

(1)添加Maven依赖

1
2
3
4
5
6
7
8
<dependencies>
<!-- 配置中心 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
<version>2.0.4.RELEASE</version>
</dependency>
</dependencies>

(2)新建搜索目录

main -> resources目录下新建shared目录。

(3)新建并修改application.yaml

main -> resources目录下新建application.yaml并添加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Config: 配置中心
# 端口,8762
server:
port: 8762
spring:
# 应用名
application:
name: configserver
# 读取本地配置文件
profiles:
active: native
cloud:
config:
server:
native:
#配置文件搜索路径
search-locations: classpath:/shared

(4)在/shared目录下创建各个微服务配置文件

创建provider7777-dev.yml文件,内容如下:

1
2
server:
port: 7777

(5)启动类添加@EnableConfigServer、@SpringBootApplication注解

main -> java目录下新建包com.bat,在包下新建ConfigApplication.java

注意Spring Boot启动类必须要放在java的子目录下,如果直接放在java目录下会报错。

1
2
3
4
5
6
7
@SpringBootApplication
@EnableConfigServer
public class ConfigApplication {
public static void main(String[] args) {
SpringApplication.run(ConfigApplication.class, args);
}
}

(二)Spring Cloud Eureka:服务治理

Spring Cloud微服务中,不同子系统是通过网络进行远程调用的,所以我们需要知道子系统的IP地址,如果存在A <-> B <-> C的调用链,B的IP地址修改了,A、C代码中对应的IP就要跟着修改,服务一多,手动维护这些配置就是噩梦。
Spring Cloud Eureka是如此解决的:

  1. 创建D服务,作为Eureka服务注册中心(Eureka Server);A、B、C都注册到D服务上,由D服务负责维护这些注册信息(服务名 + 对应的IP地址
  2. A、B、C都可以从D服务上拿到注册清单,相互之间不再通过IP地址调用,而是通过服务名调用

机制

服务生产者

  1. 服务注册:生产者启动时会发送REST请求到注册中心,同时带上自身服务的一些元数据信息
  2. 服务续约:注册完成后,服务生产者会持续发送心跳信息告诉注册中心服务可用
  3. 服务下线:正常关闭时生产者会给注册中心发送下线的REST请求

服务消费者

  1. 获取服务列表:服务消费者会给注册中心发送REST请求,获取所有可用服务的列表
  2. 调用服务:通过服务名调用服务,优先访问处于同一个Zone的服务生产者

注册中心

  1. 失效剔除:每过一段时间(60s),将注册列表中一段时间(90s)没有续约的服务剔除
  2. 自我保护:如果某个服务注册者15分钟内心跳失败的比例达到了85%,注册中心会将其注册信息保存起来,等待服务重新上线后退出自我保护,而不是直接将服务给注销

    高可用性:自我保护机制——宁可保留错误的服务注册信息,也不盲目注销任何可能健康的服务


与ZooKeeper的区别

根据CAP定理强一致性可用性分区容错性最多满足两者,而在分布式系统中分区容错性是必须的。

  1. ZooKeeper选择满足了强一致性,也就意味着它放弃了部分可用性。比如:当ZK的Leader节点宕机后,它会重新发起Leader选举,这个过程持续30-120s,此期间所有请求都会被ZK丢弃,系统处于瘫痪状态。
  2. Spring Cloud Eureka选择的是满足可用性,我们可以容忍注册中心返回的是几分钟之前的注册信息,但不能容忍服务直接down掉,具体做法就是“自我保护机制”——如果某个服务注册者15分钟内心跳失败的比例达到了85%,注册中心会将其注册信息保存起来

具体配置

1.Server

(1)添加Maven依赖
1
2
3
4
5
6
7
8
<dependencies>
<!-- Netflix Eureka Server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
(2)新建并修改application.yaml

main -> resources目录下新建application.yaml并添加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Netflix Eureka Server:注册中心
# 端口,默认8761
server:
port: 8761
# Eureka
eureka:
client:
# service-url是一个Map,key不一定要叫defaultZone
service-url:
# /eureka/是必需的固定值
# 如果使用集群,那么多个URL之间用逗号隔开即可
defaultZone: http://localhost:${server.port}/eureka/
# 注册中心不向自己注册
register-with-eureka: false
# 注册中心不读取注册信息
fetch-registry: false
(3)程序入口添加@EnableEurekaServer注解

如果要搭配Spring Boot Admin使用,还可以添加@EnableAdminServer注解,应用启动后可用通过http://localhost:yourEurekaPort/admin打开Spring Boot Admin监控板

main -> java目录下新建com.bat目录,目录下新建Spring Boot的启动类EurekaServerApplication,添加@SpringBootApplication@EnableEurekaServer注解。

注意Spring Boot启动类必须要放在java的子目录下,如果直接放在java目录下会报错。

1
2
3
4
5
6
7
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}

启动项目,访问localhost:8761,就能看到注册中心:

Eureka Server 启动
8761

2.Client(包括生产者&消费者)

(1)添加Maven依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependencies>
<!-- Eureka Client -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>

<!-- 通过配置中心读取配置文件 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
</dependencies>
(2)创建并修改bootstrap.yml

main -> resources目录下,创建bootstrap.yml

  1. 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    # Eureka Client 生产者
    # 通过application.name-profiles.active定位到Spring Cloud Config中/shared/目录下的配置文件
    spring:
    application:
    name: provider7777
    profiles:
    active: dev
    cloud:
    config:
    # 配置中心地址
    uri: http://localhost:8762
    fail-fast: true
    # Eureka
    eureka:
    client:
    # service-url是一个Map,key不一定要叫defaultZone
    service-url:
    # /eureka/是必需的固定值
    defaultZone: http://localhost:8761/eureka/
    # 使用自定义的实例名称,而非默认的
    instance:
    instance-id: provider-7777
    # 访问路径可以显示IP
    prefer-ip-address: true
  2. 消费者

(3)程序入口添加@EnableEurekaClient注解

main -> java目录下新建com.bat目录,目录下新建Spring Boot的启动类,添加@EnableEurekaClient注解。

注意Spring Boot启动类必须要放在java的子目录下,如果直接放在java目录下会报错。

比如:

1
2
3
4
5
6
7
@SpringBootApplication
@EnableEurekaClient
public class EurekaClientProvider7777 {
public static void main(String[] args) {
SpringApplication.run(EurekaClientProvider7777.class, args);
}
}

(4)测试

1
2
3
4
5
6
7
8
9
10
11
@RestController
@RequestMapping("/order")
public class OrderController{
@Value("${server.port}")
private String port;

@GetMapping("/index")
public String index() {
return "order's running at port:" + port;
}
}

依次启动Eureka ServerConfig ServerEureka Client

浏览器输入localhost:8761,可以看到7777端口有服务在运行:
8761

浏览器输入localhost:7777/order/index,可以看到该微服务已经从8762端口的配置中心读取到了自己对应的配置文件:
7777/order/index


(三)Spring Cloud Ribbon:客户端负载均衡

Ribbon服务消费者通过负载均衡算法,决定要调用哪个服务生产者提供的服务。

负载均衡算法

  1. RoundRobinRule:轮询
  2. RandomRule:随机
  3. AvailuabilityFilteringRule:过滤掉不可用服务、并发连接超过阈值的服务,然后剩余服务轮询
  4. WeightedResponseTimeRule:根据平均响应时间计算所有服务的权重,选择响应时间最短的,如果统计信息不足则采用轮询
  5. RetryRule:先轮询,如果失败会在指定时间内重试
  6. BestAvailuableRule:过滤不可用服务,选择并发连接数最小的服务
  7. ZoneAvoidanceRule:复合判断server所在区域的性能和server的可用性选择服务器

Ribbon的负载均衡算法默认是RoundRobinRule轮询,你可以通过继承AbstractLoadBalancerRule类,重写public Server choose(ILoadBalancer lb, Object key)来自定义负载均衡算法。

具体使用

Ribbon依赖

在服务消费者的pom.xml文件中添加Ribbon的相关依赖:

1
2
3
4
5
6
7
8
9
10
11
<!--  Ribbon需要和Eureka整合才能使用  -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
<!-- Ribbon -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-ribbon</artifactId>
</dependency>

向Eureka Server注册(已注册的可忽略)

application.yaml

1
2
3
4
5
6
7
8
9
# Eureka
eureka:
client:
# service-url是一个Map,key不一定要叫defaultZone
service-url:
# /eureka/是必需的固定值
# 如果使用集群,那么多个URL之间用逗号隔开即可
defaultZone: http://localhost:${server.port}/eureka/
register-with-eureka: false

在服务消费方添加@LoadBalanced注解


(四)Spring Cloud Hystrix:熔断器

参考《Monitor Microservices with Hystrix, Eureka admin and Spring boot admin》以及《服务容错模式》

多个微服务之间存在相互调用,在高并发的情况下,由于某个微服务的不可用,导致所有请求都处在延迟状态,在很短时间内就会耗尽系统资源,造成“雪崩”。
Hystrix充当了熔断器,提供了断路器线程隔离等一系列服务保护功能,可以保证在一个依赖出问题的情况下,不会导致整体服务的失败,通过隔离服务之间的接入点来避免级联故障,以提高分布式系统的弹性。

容错机制

1. 服务隔离(Isolation)

  1. 舱壁隔离模式:像舱壁一样对资源或失败单元进行隔离
  2. 资源隔离模式:为每个服务创建一个线程池(或者信号量),实际使用时会将业务进行分类交给不同的线程池进行处理,这样如果一个服务延迟过高,不会影响到其他服务

    线程池:资源消耗比信号量模式多,但是支持异步请求,适用于网络请求+异步请求的场景;
    信号量:资源消耗比线程池少,不支持异步请求,适用于非网络请求的场景。

2. 服务熔断(Circuuit Breaker)

当一个链路的某个微服务不可用或响应时间太长时,直接熔断该服务的调用,快速返回一个Fallback,直到检测到该微服务正常响应后才恢复链路。

熔断参数
  • 滑动窗口大小(20)、熔断器开关间隔(5s)、错误率(50%)

    20个请求中,有50%的失败,熔断器打开,此时调用服务直接返回失败;
    5s之后重新检测,判断是否可以将熔断器打开

3. 服务回退(Fallback)

当某个服务生产者故障,通过断路器的监控,向服务调用方返回一个符合预期的、可处理的错误响应(Fallback),而不是长时间的等待

使用方法
  1. 新建一个Fallback类,继承FallbackFactory<T>接口,重写create()方法
  2. 在Fallback类上添加@Component注解
  3. 直接在@FeignClient()中添加fallbackFactory属性值
1
2
@FeignClient(value = "FEIGNCLIENTSERVICE", fallbackFactory = FeignClientServiceFallbackFactory.class)
public interface FeignClientService{...}
  1. 配置文件中添加feign.hystrix.enabled = true

Hystrix使用

  1. 添加Maven依赖
1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency
  1. 希望熔断的函数上添加@HystrixCommand(fallbackMethod = "funcName")注解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//希望被Hystrix熔断的函数上加@HystrixCommand()注解
//通过fallbackMethod设置熔断函数
@RequestMapping(value = "/employeeDetails/{employeeid}", method = RequestMethod.GET)
@HystrixCommand(fallbackMethod = "fallbackMethod")
public String getStudents(@PathVariable int employeeid){
System.out.println("Getting Employee details for " + employeeid);

String response = restTemplate.exchange("http://employee-service/findEmployeeDetails/{employeeid}",
HttpMethod.GET, null, new ParameterizedTypeReference<String>() {}, employeeid).getBody();

System.out.println("Response Body " + response);

return "Employee Id - " + employeeid + " [ Employee Details " + response+" ]";
}

//熔断回调函数:在熔断时被调用
public String fallbackMethod(int employeeid){
return "Fallback response:: No employee details available temporarily";
}
  1. 在启动类上加@EnableCircuitBreaker注解,如果希望可视化,还可以加上@EnableHystrixDashBoard注解

    通过http://localhost:yourHystrixPort/hystrix可用打开Hystrix Dashboard,输入http://localhost:yourHystrixPort/hystrix.stream即可实时监控所有Hystrix命令和线程池。


(五)Spring Cloud Feign:像调用本地方法一样调用远程方法

Feign是一种声明式、模板化的HTTP客户端,整合了RibbonHystrix,并提供了声明式的服务调用。
Spring Cloud中使用Feign,用户可以做到使用HTTP请求远程服务时与使用本地方法一样的编码体验,开发者完全感知不到这是远程方法,更感知不到这是HTTP请求。

使用:

  1. 添加Feign依赖
1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
</dependency
  1. 新建Service接口,添加@FeignClient(value = "")注解

  2. 在启动类上加@EnableFeignClient


(六)Spring Clooud Zuul:API网关

现在我们已经将服务拆分成了子模块,它们都同样要实现拦截器和过滤器,如果在每个子服务中都实现一次,势必会造成冗余,所以我们这些功能单独抽出一层API网关层。
Spring Cloud Zuul注册到Eureka的注册中心,使自身成为Eureka治理下的应用,同时能够从注册中心获取所有注册服务的信息。外层的服务调用都必须先经过API网关层,在API网关层先对服务接口做统一前置拦截和过滤

既然Zuul也同样支持Ribbon和Hystrix,那么是不是使用Zuul就没必要使用Feign了呢?
Zuul是对外暴露的唯一接口,相当于路由的是Controller层;而Feign路由的是系统内部Service层的请求,更重要的是充当HTTP Client


作用

  1. 验证
  2. 审查与监控
  3. 动态路由
  4. 压力测试
  5. 负载均衡
  6. 静态响应处理
  7. 限流

使用

  1. 添加Zuul依赖
1
2
3
4
5
<!--  Zuul同样需要搭配Eureka使用  -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zuul</artifactId>
</dependency
  1. 启动类上添加@EnableZuulProxy

API网关的缺点

  1. 无网关时用户请求直接访问微服务即可,引入网关后,用户请求先访问网关,然后再转发给微服务,链路增长,性能会有一定下降;但是通常网关机器性能很好,并且网关和微服务是内网访问,一般速度很快,所有影响不大;
  2. 网关存在单点故障问题;
  3. API网关使用不当可能导致系统臃肿,所以网关要尽可能轻量

(七)Spring Cloud Sleuth:链路监控

抄袭自公众号“IT牧场”

具体应用:使用ES收集 & 分析日志

原理:
1.让Sleuth打印JSON格式的日志;
2.在Logstash的配置文件中,配置grok语法,解析并收集JSON格式的日志,存储到ES
3.Kibana可视化分析日志

【具体步骤】:

  1. 添加Sleuth、Logstash依赖
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>

<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.1</version>
</dependency>

注意logstash-logback-encoder的版本必须和Logback兼容。

  1. resource目录下创建logback-spring.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
<?xml version="1.0" encoding="UTF-8">
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>

<springProperty scope="context" name="springAppName" source="spring.application.name"/>

<!-- 日志存储位置 -->
<property name="LOG_FILE" value="/user/elk/logs/${springAppName}"/>

<!-- 日志格式:颜色…… -->
<property name="CONSOLE_LOG_PATTERN" value="%clr(%d{yyyy-MM-dd HH:mm:ss:SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

<!-- ConsoleAppender -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<!-- 在console logs中展示的日志最低级别 -->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>

<!-- RollingFileAppender:将日志转为JSON格式 -->
<appender name="logstash" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_FILE}.json</file>
<rollingPolicy>
<fileNamePattern>${LOG_FILE}.json.%d{yyyy-MM-dd}.gz</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"serverity": "%level",
"service": "${springAppName:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"parent": "%X{X-B3-ParentSpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="console"/>
<appender-ref ref="logstash"/>
</root>
</configuration>
  1. 新建bootstrap.yml,将application.yml中的以下属性移到bootstrap.yml
1
2
3
spring:
application:
name: spring-cloud-sleuth-plus-ES-and-Kibana

logback-spring.xml中含有的变量如springAppName必须要放在bootstrap.yml,否则会导致logback-spring.xml无法正确读取属性。

  1. 通过Docker-Compose搭建ES:创建docker-compose.yml文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
version: '3'
services:
elasticsearch:
image: elasticsearch:7.3.1
environment:
discovery.type: single-node
ports:
- "9200:9200"
- "9300:9300"
logstash:
image: logstash:7.3.1
command: logstash -f /etc/logstash/conf.d/logstash.conf
volumes:
# 挂载logstash配置文件
- ./config:/etc/logstash/conf.d
- /user/elk/logs/:/opt/build/
ports:
- "5000:5000"
kibana:
image: kibana:7.3.1
environment:
-ELASTICSEARCH_URL=http://elasticsearch:9200
ports:
- "5601:5601"
  1. docker-compose.yml文件所在目录创建config/logstash.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input{
file{
codec => json
path => "/opt/build/*.json"
}
}
filter{
grok{
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}\s+%{LOGLEVEL:severity}\s+\[%{DATA:service},%{DATA:trace},%{DATA:span},%{DATA:exportable}\]\s+%{DATA:pid}\s+\[%{DATA:thread}\]\s+%{DATA:class}\s+:\s+%{GREEDYDATA:rest}" }
}
}
output{
elasticsearch{
hosts => "elasticsearch:9200"
}
}
  1. 通过docker-compose up启动ES
  2. 启动应用,日志会打印到设置好的/user/elk/logs/${springAppName}目录下,文件名称是设置好的${springAppName}.json
  3. 访问Kibana默认地址:http://localhost:5601,即可可视化分析日志

(八)Spring Cloud Stream:事件驱动消息组件

从本质上来说Spring Cloud Stream就是一个MQ的集成组件,核心功能就是“存储-转发”和解耦。

基本架构

工作流程

Spring Cloud Stream中有3个角色:

  1. 消息发布者(Publisher):Publisher根据业务需要产生消息发送的需求。
  • Source —— Spring Cloud Stream中的Source组件是真正生成消息的组件
  • Channel —— 消息然后消息通过Channel传送到Binder
  • Binder —— 通过Binder可以与特定的消息通信系统(如RabbitMQKafka)进行通信
  1. 消息消费者(Consumer):
  • Binder —— Consumer通过Binder从消息通信系统获取消息
  • Channel —— 消息通过Channel流转到Sink组件
  • Sink —— Sink组件是服务级别的,每个微服务可能会实现不同的Sink组件,分别对消息进行不同业务上的处理
  1. 消息通信系统:Spring Cloud Stream内置集成的消息通信系统实现工具包括RabbitMQKafka

核心组件

1. Binder

BinderChannel成对出现。

  • Binder是服务与消息通信系统之间的粘合剂,目前Spring Cloud Stream实现了RabbitMQKafka这两种消息中间件的Binder
  • 通过Binder,我们可以方便地连接消息中间件,也可以动态改变消息的目的地址和发送方式
  • Binder还提供了Consumer Group、消息分区等特性,通过这些特性,我们可以不必了解Binder背后各种消息中间件在实现上的差异
2. Channel

BinderChannel成对出现。

  • Channel是对队列的抽象,用于对消息的“存储-转发”
3. Source
  • 面向Publisher,可以理解为输出(发送消息)
  • Source组件通过一个POJO对象来作为需要发布的消息,通过对该对象进行序列化(默认JSON),然后发布到Channel
1
2
3
4
5
6
7
public interface Source{
String OUTPUT = "output";

//定义输出通道,发布的消息通过该通道离开应用
@Output(Source.OUTPUT)
MessageChannel output(); //通过MessageChannel发送消息
}

关于MessageChannel的定义见下一节Spring Integration

4. Sink
  • 面向Consumer,可以理解为输入(接收消息)
  • Sink监听通道并等待消息的到来,一旦有可用消息,Sink将消息反序列化为一个POJO对象,用于处理业务逻辑
1
2
3
4
5
6
7
public interface Sink{
String INPUT = "input";

//定义输入通道,应用通过该输入通道接收来自外部的消息
@Input(Source.INPUT)
SubscribabaleChannel input(); //通过SubscribabaleChannel实现接收
}

关于SubscribableChannel的定义见下一节Spring Integration

Spring Integration:企业服务总线(ESB)组件

Spring Cloud Stream基于Spring Integration实现消息发布和消费机制,并提供了一层封装。

MessageChannel

Spring Integration把通道抽象为两种基本的表现形式:

  1. 支持轮询的PollableChannel
  2. 实现发布/订阅模式的SubscribableChannel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//具有消息发送功能的通道
public interface MessageChannel{
boolean send(Message message);
boolean send(Message message, long timeout);
}

//支持轮询
public interface PollableChannel extends MessageChannel{
//通过轮询主动获取消息
Message<?> receive();
Message<?> receive(long timeout);
}

//实现发布/订阅模式
public interface SubscribableChannel extends MessageChannel{
//MessageHandler是回调函数,用于实现Callback形式的事件响应
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}

Source:实现消息发布者

引入Maven依赖

  • spring-cloud-stream
  • spring-cloud-starter-stream-rabbit:引入RabbitMQ作为消息中间件系统
1
2
3
4
5
6
7
8
9
10
11
<!--Spring Cloud Stream-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>

<!--Spring Cloud Stream - Rabbit Binder-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

消息发布服务类添加@EnableBinding(Source.class)注解

1
2
3
4
5
6
7
8
9
10
11
@EnableBinding(Source.class)
public class MessageSender{
@Resource
private MessageChannel output;

@Override
public void send(Order order) {
this.output.sent(MessageBuilder.withPlayload(order).build());
//通过MessageBuilder将POJO转换为Message对象
}
}

@StreamListener:实现消息消费者

  1. 类上添加:@EnableBinding(Sink.class)
  2. 方法上添加:@StreamListener(Sink.INPUT)
1
2
3
4
5
6
7
@EnableBinding(Sink.class)
public class MessageListener{
@StreamListener(Sink.INPUT)
public void consume(Message<Order> message){
System.out.println(message.getPayload());
}
}
-------------本文结束感谢您的阅读-------------

本文标题:Spring Cloud

文章作者:DragonBaby308

发布时间:2019年07月21日 - 16:23

最后更新:2020年04月10日 - 19:21

原始链接:http://www.dragonbaby308.com/spring-cloud/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

急事可以使用右下角的DaoVoice,我绑定了微信会立即回复,否则还是推荐Valine留言喔( ఠൠఠ )ノ
0%