Spring响应式微服务【更新ing】

Spring响应式微服务

参考《Spring响应式微服务:Spring Boot 2 + Spring 5 + Spring Cloud实战》一书


(一)响应式编程模型


(二)Reactor框架(Flux&Mono


(三)构建响应式RESTful服务

Spring Boot

  • 设计目的:简化Spring应用程序的初始搭建和开发过程
  • 设计理念:约定优于配置(Convention Over Configuration)

引入spring-boot-starter-web依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

启动(Bootstrap)类:@SpringBootApplication

1
2
3
4
5
6
@SpringBootApplication
public class HelloApplication {
public static void main(String[] args) {
SpringApplication.run(HelloApplication.class, args);
}
}

Controller类:@RestController

  • @Controller:标注该类是一个Servlet
  • @RestController标注这是一个基于RESTful风格的HTTP端点,并且会自动使用JSON实现HTTP请求的序列号/反序列化操作
  • @RequestMapping(value = ""):处理具体URL对应的请求
  • @GetMapping(""):以GET方式处理请求
  • @PostMapping(""):以POST方式处理请求
  • @PutMapping(""):以PUT方式处理请求
  • @DeleteMapping(""):以DELETE方式处理请求
  • @PathVariable:获取GET请求中的参数
  • @RequestBody:获取POST请求体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RestController
@RequestMapping(value = "v1/products")
public class ProductController {
@GetMapping("/{productCode}")
public Product getProduct(@PathVariable String productCode) {
Product p = new Product();
//...
return p;
}
}

/**
* @author dragonbaby308
*/
@RestController
public class HelloController {
@GetMapping("/")
public Mono<String> hello(){
return Mono.just("Hello Spring WebFlux!");
}
}

Spring Boot Actuator:系统监控组件

参考Spring Boot Actuator


使用Spring WebFlux构建响应式服务

Maven依赖

  • spring-boot-starter-webflux:响应式Web应用开发基础

    如果你使用的是Spring Boot 2.x,那么spring-boot-starter-web就包含了spring-boot-starter-webflux

  • spring-boot-starter-test:测试组件库,包括JUnitMockiot……

  • reactor-testReactor框架测试

  • Lombok:通过注解简化开发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 如果导入了spring-boot-starter-web,就不再需要spring-boot-starter-webflux -->
<!--WebFlux-->
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency> -->

<!--spring-boot-starter-test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!--Reactor测试-->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

<!--Lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

【基于MVC注解编程模型创建响应式RESTful服务】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
//Entity层
@Data
@AllArgsConstructor
public class Product{
private Stirng id;
private String productCode;
private String productName;
private String description;
private BigDecimal price;
}

//===========================================

//Service层 —— 提供CRUD服务
@Service
public class ProductService {
//ConcurrentHashMap存储产品
private final Map<String, Product> products = new ConcurrentHashMap<>();

//获取所有产品
public Flux<Product> getProducts() {
//从Iterable构造Flux,返回的是value的Flux流
return Flux.fromIterable(this.products.values());
}

//通过多个id获取多个产品
public Flux<Product> getProductsByIds(final Flux<String> ids) {
//对ids流进行扁平化处理,单独处理每一个,然后合并为一个集合
//如果id对应的产品存在则返回,否则返回空
return ids.flatMap(id ->
Mono.justOrEmpty(this.products.get(id)));
}

//通过单个id获取单个产品
public Mono<Product> getProductById(final String id) {
return Mono.justOrEmpty(this.products.get(id));
}

//新建 或 更新
public Mono<Void> createOrUpdateProduct(final Mono<Product> productMono) {
//doOnNext(),当元素被弹出就触发一次对应方法
return productMono.doOnNext(product -> {
products.put(product.getId(), product);
}).thenEmpty(Mono.empty());
//thenEmpty(),返回Mono<Void>
}

//删除
public Mono<Product> deleteProduct(final String id) {
return Mono.justOrEmpty(this.products.remove(id));
}
}

//===========================================
//controller层
/**
* @author dragonbaby308
*/
@RestController
@RequestMapping("/product")
public class ProductController {
@Autowired
private ProductService productService;

@GetMapping("")
public Flux<Product> getProducts() {
return this.productService.getProducts();
}

@GetMapping("/{id}")
public Mono<Product> getProductById(@PathVariable("id") final String id){
return this.productService.getProductById(id);
}

@PostMapping("")
public Mono<Void> createProduct(@RequestBody final Mono<Product> product){
return this.productService.createOrUpdateProduct(product);
}

@DeleteMapping("/{id}")
public Mono<Product> deleteProduct(@PathVariable("id") final String id){
return this.productService.deleteProduct(id);
}
}

代码放在GitHub仓库,通过PostMan可以测试GETDELETEPOST的测试见第七部分v

【基于函数式编程模型创建响应式RESTful服务】

函数式编程模型
  • Spring WebFlux中函数式编程模型的核心概念是Router Functions,对标@Controller@RequestMapping等标准Spring MVC注解。

  • Router Functions提供一套函数是风格的API,用于创建RouterHandler对象。

    可以简单地将Handler对应为@ControllerRouter对应为@RequestMapping

  • 当我们发起一个远程调用时,传入的HTTP请求由HandlerFunction处理,HandlerFunction本质上是一个接收ServerRequest并返回Mono<ServerResponse>的函数。ServerReuestServerResponse是一对不可变接口,用来提供对底层HTTP消息的友好访问

  1. ServerRequest访问各种HTTP请求元素,包括请求方法、URI和参数,以及通过单独的ServerRequest.Headers获取HTTP请求头信息。ServerRequest通过一系列bodyToXxx()方法提供对请求消息体进行访问的途径
1
2
3
4
5
6
7
8
9
10
11
//bodyToMono():将请求消息体提取为Mono
Mono<String> string = req.bodyToMono(String.class);

//bodyToFlux():将请求消息体提取为Flux
Flux<Person> person = req.bodyToFlux(Person.class);

//bodyToMono() & bodyToFlux()实际上都是使用通用的
//ServerRequest.body(BodyExtractor)工具方法的便利形式
//BodyExtractor是一种请求消息体的提取策略,允许我们编写自己的提取逻辑
Mono<String> string2 = req.body(BodyExtractors.toMono(String.class));
Flux<Person> person2 = req.body(BodyExtractors.toFlux(Person.class));
  1. ServerResponse提供对HTTP响应的访问。由于它是不可变的,所以可以通过使用构造器来创建一个新的ServerResponse。构造器允许设置相应状态、添加响应标题并提供响应的具体内容。
1
2
3
4
5
6
7
8
9
//Mono<Person> person = ...;
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(person);
//响应状态吗:200 OK
//格式:JOSN
//内容:person

//通过body()放入返回的内容,
//也可以使用BodyInserters工具类提供的构建方法,如fromObject()
ServerResponse.ok().body(BodyInserters.fromObject("Hello World"));
  • Mono<ServerResponse> build(Publisher<Void> var)也可以构建ServerResponse,比较常见的用法是用来返回新增和更新操作的结果
  1. HandlerFunctionServerRequest + ServerResponse,通过接口中的handle方法创建定制化的请求响应处理机制

    类比@Controller

1
2
3
4
5
6
7
8
public class HelloWorldHandlerFunction 
implements HandlerFunction<ServerResponse> {
@Override
public Mono<ServerResponse> handle(ServerRequest req) {
retrun ServerResponse.ok().body(
BodyInserters.fromObject("Hello World"));
}
}
  1. RouterFunction将传入请求路由到具体的处理函数,接受ServerRequest并返回一个Mono<ServerResponse>,如果请求与指定路由匹配,则返回处理函数的结果,否则返回一个Mono<Void>

    类比@RequestMapping("")

  • 通过RouterFunctions.route(RequestPredicate predicate, HandlerFunction<T> handlerFunction)方法创建RouterFunction
1
2
3
RouterFunction<ServerResponse> helloWorldRoute = 
RouterFunctions.route(RequestPredicates.path("/hello-world"),
new HelloWorldHandlerFunction());
  • RequestPredicates.GET("/xxx")
1
2
3
4
5
6
7
8
9
public class PersonRouter{
@Bean
public RouterFunction<ServerResponse> routePerson(PersonHandler person) {
return RouterFunctions
.route(RequestPredicates.GET("/person"))
.and(RequestPredicats.accept(MediaType.APPLICATION_JSON)),
personHandler::getPersons);
}
}
  • 组合路由:RouterFunction.and(RouterFunction)RouterFunction.andRoute(RequestPredicate, HandlerFunction)
响应式RESTful服务 —— 函数式编程Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//Handler - 相当于@Controller
@Component
public class ProductHandler{
@Autowired
private ProductService ps;

//获取所有产品
public Mono<ServerResponse> getProducts(ServerRequest req) {
return ServerResponse
.ok()
.body(this.ps.getProducts(), Product.class);
}

//通过id获取单个产品
public Mono<ServerResponse> getProductById(ServerRequest req) {
String id = req.pathVariable("id");
return ServerResponse
.ok()
.body(this.ps.getProductById(id), Product.class);
}

//创建产品
public Mono<ServerResponse> createProduct(ServerRequest req) {
Mono<Product> = req.bodyToMono(Product.class);
return ServerResponse
.ok()
.body(this.ps.createOrUpdateProduct(), Product.class);
}

//删除产品
public Mono<ServerResponse> deleteProduct(ServerRequest req) {
String id = req.pathVariable("id");
return ServerResponse
.ok()
.body(this.ps.deleteProduct(id), Product.class);
}
}

//组合式Router - 相当于@RequestMapping
@Configuration
public class ProductRouter{
@Bean
public RouterFunction<ServerResponse> routeProduct(ProductHandler productHandler){
return RouterFunctions
.route(RequestPredicates.GET("/").and(RequestPredicates
.accpet(MediaType.APPLICATION_JSON)),
productHandler::getProducts)
.andRoute(RequestPredicates.GET("/{id}").and(RequestPredicates
.accept(MediaType.APPLICATION_JSON)),
productHandler::getProductById)
.andRoute(RequestPredicates.POST("/").and(RequestPredicats
.accept(MediaType.APPLICATION_JSON)),
productHandler::createProduct)
.andRoute(RequestPredicates.DELETE("/{id}").and(RequestPredicates
.accept(MediaType.APPLICATION_JSON)),
productHandler::deleteProduct);
}
}

(四)构建响应式数据访问组件

  1. 响应式开发方式的有效性取决于在整个请求链路的各个环节是否都采用了响应式编程模型,如果某个环节不是响应式的,就会出现同步阻塞,导致背压机制无法生效
  2. 最常见的非响应式场景就是DAO层中使用了关系型数据库,因为传统的非关系型数据库采用的都是非响应式的数据访问机制。
  3. Spring Boot 2.x支持的响应式数据访问组件包括响应式MongoDB & 响应式Redis

Spring Data Reactive:响应式数据访问模型

ReactiveCrudRepository:响应式CRUD仓库接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface ReactiveCrudRepository<T, ID> extends Repository<T, ID> {
<S extends T> Mono<S> save(S entity);
<S extends T> Flux<S> saveAll(Iterable<S> entities);
<S extends T> Flux<S> saveAll(Pulisher<S> entityStream);
Mono<T> findById(ID id);
Mono<T> findById(Publisher<ID> id);
Mono<Boolean> existsById(ID id);
Mono<Boolean> existsById(Publisher<ID> id);
Flux<T> findAll();
Flux<T> findAllById(Iterable<ID> ids);
Mono<Long> count();
Mono<Void> deleteById(ID id);
Mono<Void> deleteById(Publisher<ID> id);
Mono<Void> delete(T entity);
Mono<Void> deleteAll(Iterable<? extends T> entities);
Mono<Void> deleteAll(Publisher<? extends T> entityStream);
Mono<Void> deleteAll();
}

ReactiveSortingRepository:响应式排序仓库接口

1
2
3
public interface ReactiveSortingRepository<T, ID> extends ReactiveCrudRepository<T, ID>{
Flux<T> findAll(Sort sort);
}

ReactiveMongoRepository:响应式MongoDB仓库接口

1
2
3
4
5
6
7
8
9
public interface ReactiveMongoRepository<T, ID> 
extends ReactiveSortingRepository<T, ID>,
ReactiveQueryByExampleExecutor<T> {
<S extends T> Mono<S> insert(S entity);
<S extends T> Flux<S> insert(Iterable<S> entities);
<S extends T> Flux<S> insert(Publisher<S> entities);
<S extends T> Flux<S> findAll(Example<S> example);
<S extends T> Flux<S> findAll(Example<S> example, Sort sort);
}

【响应式MongoDB

Maven依赖:spring-boot-starter-data-mongodb-reactive

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

Bootstrap类添加@EnableReactiveMongoRepositories注解

1
2
3
4
5
6
7
@SpringBootApplication
@EnableReactiveMongoRepositories
public class SpringReactiveMongodbApplication{
public static void main(String[] args) {
SpringApplication.run(SpringReactiveMongodbApplicationj.class, args);
}
}

默认情况下其实并不需要添加,因为MongoReactiveRepositoriesAutoConfiguration会自动创建与MongoDB交互的核心类。
只有在希望修改MongoDB的配置行为,这个注解才派上用场

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
@EnableReactiveMongoRepositories(basePackageClasses = OrderRepository.class)
public class MongoConfig extends AbstractReactiveMongoConfiguration{
@Bean
@Override
public MongoClient reactiveMongoClient() {
return MongoClients.create();
}

@Override
protected String getDatabaseName() {
return "order_test";
}

@Bean
public ReactiveMongoTemplate mongoTemplate() throws Exception{
return new ReactiveMongoTemplate(mongoClient(), getDatabaseName());
}
}

创建实体类和Reactive Mongodb Repository

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Document(collection = "article")
public class Article{
@Id
private String id;
private String title;
private String content;
private String author;
}

@Repository
public interface ArticleReactiveMongoRepository extends ReactiveMongoRepository<Article, String>
, ReactiveQueryByExampleExecutor<Article> {

}

使用CommandLineRunner初始化MongoDB数据

  1. CommandLineRunnerSpring Boot应用程序启动,ApplicationContext初始化完成后,会遍历所有CommandLineRunner接口的实例并运行它们的run()方法,我们也可以使用@Order注解来指定所有CommandLineRunner实例的运行顺序
1
2
3
4
//CommandLineRunner用于系统运行之前执行一些初始化操作
public interface CommandLineRunner {
void run(String... args) throws Exception;
}
  1. MongoOperations工具类:接近MongoDB的原生态语言
  • dropCollections():删除
  • insert():插入
  • findAll():查找
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class InitDatabase{
@Bean
CommandLineRunner init(MongoOperations operations) {
return args -> {
//删除操作:MongoOperations.dropCollection()
operations.dropCollection(Article.class);

//插入操作:MongoOperations.insert()
operations.insert(new Article(UUID.randomUUID().toString(), "title1", "content1", "author1"));
operations.insert(new Article(UUID.randomUUID().toString(), "title2", "content2", "author2"));

//查找操作:MongoOperations.findAll()
operations.findAll(Article.class).forEach(article -> {
System.out.println(article.toString());
});
}
}
}

Service层调用Reactive Mongodb Repository

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Service
public class ArticleService{
private final ArticleReactiveMongoRepository armr;

ArticleService(ArticleReactiveMongoRepository armr) {
this.armr = armr;
}

public Mono<Article> save(Article a){
return armr.save(a);
};

public Mono<Article> findOne(String id){
return armr.findById(id).log("findOneArticle");
}

public Flux<Article> findAll() {
return armr.findAll().log("findAllArticles");
}

public Mono<Void> delete(String id) {
return armr.deleteById(id).log("deleteOneArticle");
}

public Flux<Article> findByAuthor(String author) {
Article a = new Article();
a.setAuthor(author);

ExampleMatcher matcher = ExampleMatcher.matching()
.withIgnoreCase()
.withMatcher(author, startWith())
.withIncludeNullValues();

Example<Article> example = Example.of(a, matcher);

Flux<Article> articles = armr.findAll(example).log("findByAuthor");

return articles;
}
}

【响应式Redis

Maven依赖:spring-boot-starter-data-redis-reactive

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

Bootstrap类:整合LettuceConnectionFactory & ReactiveRedisTemplate

  • LettuceConnectionFactory基于Netty创建连接实例,可以在多个线程间实现线程安全,满足多线程环境下的并发访问要求,最重要的是,同时支持响应式的数据访问用法
  • ReactiveRedisTemplate:依赖于ReactiveRedisConnectionFactory创建ReactiveRedisConnection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@SpringBootApplication
public class SpringReactiveRedisApplication{
//LettuceConnectionFactory
@Bean
public ReactiveRedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory();
}

//ReactiveRedisTemplate
@Bean
ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactor factory) {
return new ReactiveRedisTemplate<>(factory,
RedisSerializationContext.string());
}

//RedisOperations工具类
@Bean
ReactiveRedisTemplate<String, Article> redisOperations(ReactiveRedisConnectionFactory factory){
//序列化
Jackson2JsonRedisSerializer<Article> s = new Jackson2JsonRedisSerializer<>(Article.class);
RedisSerializtionContext.RedisSerializationContextBuilder<String, Article> builder = RedisSerializationContext.newSerializtionContext(new StringRedisSerializer());
RedisSerializtionContext<String, Article> context = builder.value(s).build();

return new ReactiveRedisTemplate<>(factory, context);
}

jpublic static void main(String[] args){
SpringApplcation.run(SpringReactiveRedisApplication.class, args);
}
}

创建Reactive Redis Repository

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public interface ArticleReactiveRepository {
Mono<Boolean> saveArticle(Article article);
Mono<Boolean> updateArticle(Article article);
Mono<Boolean> deleteArticle(String articleId);
Mono<Article> findArticleById(String articleId);
Flux<Article> findAllArticles();
}

//实现类
@Repository
public interface ArticleReactiveRepositoryImpl implements ArticleReactiveRepository {

@Autowired
private ReactiveRedisTemplate<String, Article> reactiveRedisTemplate;

private static final String HASH_NAME = "Article:";

@Override
public Mono<Boolean> saveArticle(Article article){
return reactiveRedisTemplate.opsForValue()
.set(HASH_NAME + article.getId(), article);
}

@Override
public Mono<Boolean> updateArticle(Article article){
return reactiveRedisTemplate.opsForValue()
.set(HASH_NAME + article.getId(), article);
}

@Override
public Mono<Boolean> deleteArticle(String articleId){
return reactiveRedisTemplate.opsForValue()
.delete(HASH_NAME + articleId);
}

@Override
public Mono<Article> findArticleById(String articleId){
return reactiveRedisTemplate.opsForValue()
.get(HASH_NAME + articleId);
}

@Override
public Flux<Article> findAllArticles(){
return reactiveRedisTemplate.keys(HASH_NAME + "*")
.flatMap((String key) -> {
Mono<Article> mono = reactiveRedisTemplate.opsForValue().get(key);
return mono;
});
}
}

值得注意的是,上述Demo中RedisKEYS操作非常危险!如果key不多可以这么做,否则最好还是用SCAN防止阻塞!

Service层使用Reactive Redis Repository

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Service
public class ArticleService{
private final ArticleReactiveRedisRepository articleRepository;

ArticleService(ArticleReactiveRedisRepository articleRepository) {
this.articleRepository = articleRepository;
}

public Mono<Boolean> save(Article article){
return articleRepository.saveArticle(article);
}

public Mono<Boolean> delte(String id){
return articleRepository.deleteArticle(id);
}

public Mono<Article> findArticleById(String id){
return articleRepository.findArticleById(id).log("findOneArticle");
}

public Flux<Article> findAllArticles() {
return articleRepository.findAllArticles().log("findAllArticles");
}
}

(五)构建响应式消息通信组件

Spring Cloud Stream

Reactive Spring Cloud Stream:响应式消息通信系统

Maven依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>

响应式Source组件:@StreamEmitter

  • @StreamEmitter是一个方法级别的注解,可以将方法转变为一个发射器(Emitter)。
  • @StreamEmitter只能与@Output注解组合,因为它的作用就是生产消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@SpringBootApplication
//@EnableBinding(Source.class)用于定义消息Producer类
@EnableBinding(Source.class)
public class SourceApplication{

//@StreamEmitter将方法转为发射器
@StreamEmitter(Source.OUTPUT)
public Flux<String> emit() {
return Flux.interval(Duration.ofSecond(1)).map(l -> "Hello World");
}
}

//等价于:
@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication{
@StreamEmitter
public Flux<String> emit(@Output(Source.OUTPUT) FluxSender output) {
return Flux.interval(Duration.ofSecond(1)).map(l -> "Hello World");
}
}

响应式Sink组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@EnableBinding(Sink.class)
@SpringBootApplication
public class SinkApplication{
@StreamListener
public Flux<String> receive(@Input(Sink.INPUT) Flux<String> input){
return input.map(s -> s.toUpperCase());
}
}

//等价于
@EnableBinding(Sink.class)
@SpringBootApplication
public class SinkApplication{
@StreamListener(target = Sink.INPUT)
public Flux<String> receive(Flux<String> input){
input.map(String::toUpperCase).subscribe(System.out::println);
}
}

Processor组件:集成SourceSink的双向通道

1
2
3
4
5
6
7
8
@SpringBootApplication
@EnableBinding(Processor.class)
public class SourceApplication{
public void receive(@Input(Processor.INPUT) Flux<String> input,
@Output(Processor.OUTPUT) Flux<String> output){
output.send(input.map(s -> s.toUpperCase()));
}
}

Demo

  1. 构建一个Event对象作为响应式Source组件和Sink组件进行交互的消息载体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Event implements Serializable{
private Long id;

public Event(){}

public Event(Long id){
this.id = id;
}

//重载toString方便打印日志
@Override
public String toString(){
reutrn "Event{ id = " + id + " }";
}
}
  1. 响应式Source组件ReactiveSourceApplication
1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootApplication
@EnableBinding(Source.class)
public class ReactiveSourceApplication{
public static void main(String[] args){
SpringApplication.run(ReactiveSourceApplication.class, args);
}

@StreamEmitter
@Output(Source.OUTPUT)
public Flux<Event> emit() {
return Flux.interval(ofMillis(1000)).map(Event::new);
}
}
  1. 响应式Source组件application.yml

当然你需要通过Homebrew安装RabbitMQ并启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
spring:
cloud:
stream:
bindings:
default:
content-type: application/json
binder: rabbitmq
output:
group: test-event-group
destination: test-event-destination
producer:
# 消息分区
partitionKeyExpression: payload.id % 2
partitionCout: 2
binders:
rabbitmq:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
  1. 响应式Sink组件ReactiveSinkApplication
1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveSinkApplication{
private static Logger logger = LoggerFactory.getLogger(ReactiveSinkApplication.class);

@StreamListener(target = Sink.INPUT)
public void loggerSink(Flux<Event> events) {
events.map(Object::toString)
.subscribe(event -> logger.info("Event: {}", event));
}
}
  1. 响应式Sink组件application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spring:
cloud:
stream:
bindings:
default:
content-type: application/json
binder: rabbitmq
input:
group: test-event-group
destination: test-event-destination
consumer:
# 消息分区
partitioned: true
instanceIndex: 1
instanceCount: 2
binders:
rabbitmq:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
  1. 另一个Sink组件配置同4、5,只有instanceIndex: 0
  2. 启动Source组件,启动两个Sink组件,查看控制台输出。同样的结果可以在RabbitMQ的控制台中看到。

(六)构建响应式微服务架构

基础概念见Spring Cloud,恕不赘述 ̄へ ̄

【服务治理Eureka

服务治理 和 负载均衡 往往是紧密联系在一起的。

  • 服务治理用于实现自动化的注册和发现。

【1】服务注册中心

Maven依赖
1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
Bootstrap类:@EnableEurekaServer
1
2
3
4
5
6
7
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication{
public static void main(String[] args){
SpringApplication.run(EurekaServerApplication.class, args);
}
}
application.yml
1
2
3
4
5
6
server:
port: 8761
eureka:
client:
registerWithEureka: false
fetchRegistry: false
集群
  • 准备两个Eureka Server实例,一个peer8761,一个peer8762
  • peer8761application.yml
1
2
3
4
5
6
7
8
9
server:
port: 8761

eureka:
instance:
hostname: peer8761
client:
serverUrl:
defaultZone: http://peer8762:8762/eureka
  • peer8762application.yml
1
2
3
4
5
6
7
8
9
server:
port: 8762

eureka:
instance:
hostname: peer8762
client:
serverUrl:
defaultZone: http://peer8761:8761/eureka

【2】客户端

Maven依赖
1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
Bootstrap类:@EnableEurekaClient
1
2
3
4
5
6
7
@SpringBootApplication
@EnableEurekaClient
public class AccountApplication{
public static void main(String[] args){
SpringApplication.run(AccountApplication.class, args);
}
}
application.yml
1
2
3
4
5
6
7
8
9
10
11
server:
port: 8763
eureka:
client:
registerWithEureka: true
fetchRegistry: true
serviceUrl:
defaultZone: http://localhost:8761/eureka/
spring:
application:
name: accountservice

负载均衡Ribbon

使用Eureka.DiscoveryClient工具类查找服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Autowired
private DiscoveryClient discoveryClient;

public List<String> getEurekaServices() {
List<String> services = new ArrayList<String>();

discoveryClient.getServices().forEach(serviceName -> {
discoveryClient.getInstance(serviceName).forEach(instance -> {
services.add(String.format("%s:%s",
serviceName, instance.getUri()));
});
});

return services;
}

@EnableDiscoveryClient & @LoadBalanced

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication{
public static void main(String[] args){
SpringApplication.run(UserApplication.class, args);
}

//负载均衡
@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}
}

服务容错Hystrix

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
@EnableDiscoveryClient
@EnableCircuitBreaker
public class UserApplication{
public static void main(String[] args){
SpringApplication.run(UserApplication.class, args);
}

//负载均衡
@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}
}

服务网关Gateway

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
server:
port: 8090

spring:
application:
name:gatewayservice

eureka:
client:
registerWithEureka: false
serviceUrl:
defaultZone: http://localhost:8761/eureka/

spring:
cloud:
gateway:
discovery:
locator:
enabled: true
routes:
- id: accountservice
uri: lb://accountservice
predicartes:
- Path=/account/**
filters:
- PrefixPath=/mypath
- id: userservice
uri: lb://userservice
predicartes:
- Path=/user/**
filters:
- PrefixPath=/mypath

服务配置Config

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>

服务监控Sleuth

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>

使用WebClient实现响应式服务调用


(七)测试响应式微服务架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<!--  spring-boot-starter-test  -->
<dependency>
<groupId>org.springframwork.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!-- Reactor测试 -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>

<!-- flapdoodle:允许在不适用真实MongoDB的情况下编写测试案例并执行测试 -->
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<scope>test</scope>
</dependency>
-------------本文结束感谢您的阅读-------------

本文标题:Spring响应式微服务【更新ing】

文章作者:DragonBaby308

发布时间:2019年09月19日 - 22:49

最后更新:2019年10月01日 - 11:10

原始链接:http://www.dragonbaby308.com/Reactor-Spring-WebFlux/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

急事可以使用右下角的DaoVoice,我绑定了微信会立即回复,否则还是推荐Valine留言喔( ఠൠఠ )ノ
0%