《Java 8 in Action》

Java 8 函数式编程

函数式编程

函数式编程的核心思想是行为参数化:允许一个方法接受多种行为(或策略)作为参数,并在内部使用,来完成不同的行为。具体点来说,就是允许将方法/代码作为参数传递给其他方法。有点类似于设计模式中的策略模式行为参数化的具体使用可以参考《功能模块的行为参数化改造》一文。

行为参数化的优点:可以帮助处理频繁变更的需求,让代码更好地适应不断变化的要求,减轻未来的工作量。

函数式编程的优点:函数式编程天生有利于多线程大数据编程。

行为参数化示例:

示例1:使用Comparator排序

1
2
3
4
//java.util.Comparator
public interface Comparator<T>{
public int compare(T o1, T o2) ;
}

在Java 8之前,使用java.util.Comparator来进行排序,需要实例化对象:

1
2
3
4
5
inventory.sort(new Comparator<Apple>{
public int compare(Apple a1, Apple a2){
return a1.getWeight().compareTo(a2.getWeight() );
}
});

在Java 8之后,直接通过Lambda表达式匿名调用java.util.Comparator接口的compare()方法即可:

1
inventory.sort((Apple a1, Apple a2) -> a1.getWeight().compareTo(a2.getWeight() ) );

示例2:使用Runnable执行线程

1
2
3
4
//java.lang.Runnable
public interface Runnable{
public void run() ;
}

在Java 8之前,使用java.lang.Runnable接口实例化线程:

1
2
3
4
5
Thread t = new Thread(new Runnable() {
public void run(){
System.out.println("Hello World!") ;
}
});

在Java 8之后,直接通过Lambda表达式匿名调用java.lang.Runnable接口的run()方法即可:

1
Thread t = new Thread(() -> System.out.println("Hello World!") ) ;

Java 8 函数式编程包含的内容

  1. Lambda表达式:即匿名函数,允许直接使用函数作为参数,而不需要指定函数所属类,也不需要指定函数名,天生有利于多线程编程
  2. 方法引用:允许直接通过函数名进行调用函数,而不需要通过函数所属类的对象调用
  3. 流:不把所有数据加载到内存,而是每次处理一项,有利于多线程、大数据编程;从某种程度上来说,Lambda表达式方法引用都是为了流操作服务的
  4. 默认方法:允许接口拥有默认方法,可以不被其实现类@Override
  5. 其他函数式编程思想:如空引用、模式匹配……

一、Lambda表达式(匿名函数/闭包)

定义

Lambda表达式也称为匿名函数,即允许直接使用函数作为参数,而不需要指定函数所属的类,也不需要指定函数名。
关键字:(parameter) -> {expression body},parameter代表的是输入(入参),expression body代表的是输出,可以是执行的代码,也可以是出参(即return语句)。

特征

  1. parameter是入参,可以不声明变量类型,编译器会根据实际传入的参数类型自动进行类型推断,如(a, b) -> a + b
  2. parameter单个参数可以不加(),多个参数或无参数的话需要加(),如(int a, int b) -> a + b() -> new Apple(10)
  3. expression body可以是执行的代码,单行代码可以不加{},多行代码必须要加{}
  4. expression body也可以是出参(即return语句),如果是return语句,则必须要加{},如(int a, int b) -> {return a + b;}

作用域

Lambda表达式除了可以使用作用域内的变量,还可以使用作用域外的变量,称为捕获,Lambda表达式可以自由使用实例变量静态变量,但局部变量必须显式地定义为final或事实上是final,否则编译报错。如以下代码编译就会报错:

1
2
3
int portNum = 1337;
Runnable r = () -> System.out.println(portNum);
//portNum是非final型的局部变量,编译时报错

这是因为在Stream编程中,一个流中的不同元素会被分配到不同CPU上并发执行,所以你必须保证流中单个元素是并发安全的
同时要注意,一个流中的元素(即使是.forEach())并不一定是顺序执行的。

使用场景:函数式接口Predicate/Consumer/Function

Lambda表达式整个作为函数式接口 中抽象方法的具体实现。

函数式接口:只定义了一个抽象方法的接口,被称为函数式接口。如java.util.Comparatorjava.lang.Runnablejava.util.concurrent.Callablejava.security.PrivilegedAction等。
①一个接口中如果有很多默认方法,但只有一个抽象方法,那么这个接口也是函数式接口;
interface SmartAdder extends Adder{ int add(double a, double b); }不是函数式接口,因为它除了自己的抽象方法,还有从父类继承来的抽象方法。
注意:任何函数式接口都不允许抛出checked exception,如果你需要Lambda表达式抛出异常,可以定义自己的函数式接口,并声明checked exception,或是用try-catch包裹Lambda表达式

常用的函数式接口:

  1. 谓词Predicate
    定义了一个抽象方法test,接收一个对象T,返回一个boolean值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@FunctionalInterface
public interface Predicate<T>{
boolean test(T t);
}

public static <T> List<T> filter(List<T> list, Predicate<T> p) {
List<T> result = new ArrayList<>();
for(T s : list) {
if(p.test(s) ) {
result.add(s);
}
}
return result;
}

//使用Predicate<T>
Predicate<String> nonEmptyStringPredicate = (String s) -> !s.isEmpty();
List<Stirng> nonEmpty = filter(listOfStrings, nonEmptyStringPredicate);
  1. 消费者Consumer
    定义了一个抽象方法accept,接收一个对象T,返回一个void
1
2
3
4
5
6
7
8
9
10
11
12
13
@FunctionalInterface
public interface Consumer<T>{
void accept(T t);
}

public static <T> void forEach(List<T> list, Consumer<T> c) {
for(T t : list) {
c.accept(t);
}
}

//使用Consumer<T>
forEach(Arrays.asList(1, 2, 3, 4, 5), (Integer i) -> System.out.println(i) );
  1. 方法Function<T, R>
    定义了一个抽象方法reply,接收一个泛型T对象,返回一个泛型R对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@FunctionalInterface
public interface Function<T, R>{
R reply(T t);
}

public static <T, R> List<R> map(List<T> list, Function<T, R> f) {
List<R> result = new ArrayList<>();
for(T t : list) {
result.add(f.reply(t) );
}
return result;
}

//使用Function<T, R>
List<Integer> l = map(Arrays.asList("lambdas", "in", "action"), (String s) -> s.length() );
  1. 函数式接口
函数式接口函数描述符
Predicate< T >T->boolean
Consumer< T >T->void
Function<T, R>T->R
Supplier< T > 【生产者】()->T
UnaryOperator< T >T->T
BinaryOperator< T >(T, T)->T
BiPredicate<L, R>(L, R)->boolean
BiConsumer<T, U>(T, U)->void
BiFunction<T, U, R>(T, U)->R

Lambda表达式复合操作

  1. 比较器复合
1
2
3
4
5
6
7
8
//现有比较器
Comparetor<Apple> c = Comparator.comparing(Apple::getWeight);

//逆序.reversed()
inventory.sort(Comparing(Apple::getWeight).reversed() );

//比较器链.thenComparing():传入一个Lambda表达式,前一个Comparator比较结果一致则会调用
inventory.sort(comparing(Apple::getWeight).reversed().thenComparing(Apple::getPrice) );
  1. 谓词(Predicate)复合
1
2
3
4
5
6
7
8
9
10
//现有谓词redApple

//非.negate()
Predicate<Apple> notRedApple = redApple.negate();

//与.and()
Predicate redAndHeavyApple = redApple.and(a -> a.getWeight() > 150);

//或.or()
Predicate redAndHeavyOrGreenApple = redApple.and(a -> a.getWeight() > 150).or(a -> "green".equals(a.getColor() ) );
  1. 函数(Function)复合
1
2
3
4
5
6
7
8
9
10
Function<Integer, Integer> f = x -> x + 1;
Function<Integer, Integer> g = x -> x * 2;

//f.andThen(g):即g(f(x))
Function<Integer, Integer> h1 = f.andThen(g);
int result1 = h1.apply(1); //结果为(1+1)*2=4

//f.compose(g):即f(g(x))
Function<Integer, Integer> h2 = f.compose(g);
int result2 = h2.apply(1); //结果为(1*2)+1=3

二、方法引用(::)

定义:允许方法作为值进行传递,通过函数名称即可调用方法,不需要通过函数所属类的对象调用。从某种程度可以看作调用特定Lambda方法的一种快捷方法。

语法:ClassName::MethodName
如:inventory.sort(Comparing(Apple::getWeight) );

java.util.Comparator.comparing()方法:
传入一个Function来提取Comparable键值,生成一个Comparator对象。

构造函数的引用:ClassName::new,不将构造方法实例化却能够使用它。
如:

1
2
3
4
5
6
7
//生成苹果
Supplier<Apple> c1 = Apple::new;
Apple a1 = c1.get();

//生成一个100g的苹果
Function<Integer, Apple> c2 = Apple::new;
Apple a2 = c2.apply(100);

优点:可以从根据已有的方法创建Lambda表达式,并能够重用;相对于Lambda表达式来说,可读性更高。


Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//lambda
//map()映射
//filter()过滤
List<ObjectB> objectBList = objectAList.stream()
.map(this::mapAToB)
.filter(Objects::nonNull)
.collect(Collectors.toList() );

//判空方法
public static boolean nonNull(Object obj) {
return obj != null;
}

//将A对象映射为B对象的方法
private ObjectB mapAToB(ObjectA a) {
ObjectB b = null;
try{
return ObjectB.builder().
paramA(a.getParamA() ).
paramB(a.getParamB() ).build();
}catch(Exception e) {
//log.error()
}
return b;
}

三、流(Stream)

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
List<String> threeHighCaloricDishNames = menu.stream()
.filter(d -> d.getCalories() > 300) //中间操作,filter筛选,从流中排除某些元素——留下卡路里大于300的元素
.map(Dish::getName) //中间操作,map映射,将元素转换为其他形式或提取信息——将菜映射成菜名
.limit(3) //中间操作,limit截断流,使其元素不超过给定数量——截取前3个菜名
.collect(toList() ); //终端操作,将流转换成List

//给定List<String>,其中有数字型字符串、字母型字符串,字母有大小写,字符串长短不一
//找出所有【长度>=3】的字母型字符串,【转为小写、去重后,按照字母排序,最后以“❤”连接成一个字符串输出】
String rst = list.stream()
.filter(i -> !isNum(i)) //过滤掉数字型字符串
.filter(i -> i.length() >= 3) //过滤掉长度小于3的字符串
.map(i -> i.toLowerCase()) //转化为小写
.distinc() //去重
.sorted(Comparator.naturalOrder() ) //排序
.collect(Collectors.join("❤")); //以❤连接

定义

简单来说,流就是从支持数据处理操作的源(如集合、数组或I/O资源)生成的元素序列
流是一系列数据项,一次只生成一项,不同时将所有数据加入内存,程序可以以流水线的方式一个个地读取数据项,然后一个个地进行处理和输出。主要针对集合。
流允许你以声明性方式处理数据集合(通过查询语句实现,而非临时编写一个实现),你可以把它看成遍历数据集的高级迭代器

Stream API:java.util.stream
顺序流:Collection.stream()
并行流:Collection.parallelStream()

StreamCollection的区别:什么时候进行计算

  1. Collection侧重数据的存储和访问Stream侧重数据的计算
  2. Collection的元素全部是存在内存的,所有元素都必须计算出来才能加入集合;Stream的元素则是按需求计算的,只有在消费者有需要时才计算值。

优点

  1. 相对于集合操作来说,更简洁易读;
  2. 可以组合中间操作和终端操作,更灵活;
  3. 可以透明地并行处理,无需多线程代码。

特点

  1. 流水线:很多流操作本身就返回一个流,多个流操作可以链接起来,形成一条流水线。
  2. Stream可以进行短路求值,有些操作不需要处理整个流就能得到结果,可以把无限流转换为有限流;也可以延迟
  3. 内部迭代:流的迭代操作是由Stream库在内部进行的,还把得到的值存在了某个地方,不需要程序员另外写迭代代码。

    外部迭代:需要用户去做迭代。如Collection接口的for-each/Iterator。
    内部迭代的优点:Stream库自动选择一种适合你硬件的数据表示和并行实现,项目可以透明地进行并行处理,或者采用更优化的顺序进行处理。

流的构建

  1. 创建 —— Stream.of()/Stream.empty()
1
2
Stream<String> s = Stream.of("Java8", "in", "Action");
Stream<String> emptyStream = Stream.empty();
  1. 数组创建 —— Arrays.stream()
1
2
int[] nums = {1, 2, 3};
int sum = Arrays.stream(nums).sum();
  1. 文件生成 —— java.nio.file.Files.lines()
1
2
3
4
5
6
long uniqueWords = 0;
try(Stream<String> lines = Files.lines(Paths.get("data.txt"), Charset.defaultCharset() ) ){
uniqueWords = lines.flatMap(line -> Arrays.stream(line.split(" ") ) ).distinct().count();
}catch(IOException e){

}
  1. 函数生成无限流 —— Stream.iterate()/Stream.generate()
1
2
Stream.iterate(0, n -> n + 2).limit(10).forEach(System.out::println);
Stream.generate(Math::random).limit(10).forEach(System.out::println);

Stream.iterate(final T seed, final UnaryOperator<T> f)接受两个参数:

  1. seed作为流的第一个元素
  2. f(seed)作为流的第二个元素,f(f(seed) )作为流的第三个元素……依次类推
1
2
3
Stream.iterate(0, n -> n + 2).limit(10).forEach(System.out::println); 【加】

//换行输出:0 2 4 6 8 10 12 14 16 18

流的遍历

流只能遍历一次,遍历完成之后,我们就说这个流已经被消费掉了。

1
2
3
Stream<String> s = Arrays.asList("Java 8", "in", "Action").stream();
s.forEach(System.out::println);
s.forEach(System.out::println); //java.lang.IllegalStateException:流已被操作或关闭

流的操作

中间操作

返回一个流、可以连接起来形成一条流水线的流操作称为中间操作

操作返回类型操作参数函数描述符
filterStream< T >PredicateT -> boolean
mapStream< R >Function<T, R>T -> R
limitStream< T >
sortedStream< T >Comparator< T >(T, T) -> int
distinctStream< T >
(一)筛选和切片
1.用谓词筛选 —— filter

接受一个谓词作为参数,返回一个包括所有符合谓词的元素的流

1
List<Dish> vegetable = menu.stream().filter(Dish::isVegetable).collect(toList() );
2.筛选各异的元素 —— distinct

返回一个元素各异(根据流生成的元素的hashCode和equals方法实现)的流

1
2
3
4
5
6
List<Integer> nums = Arryas.asList(1, 2, 1, 3, 3, 2, 4);
//找出不重复的偶数
nums.stream()
.filter(d -> d % 2 == 0)
.distinct()
.forEach(System.out::println);
3.截短流 —— limit(n)

返回一个不超过给定长度的流

1
2
3
4
5
List<Dish> threeVegetable = menu
.stream()
.filter(Dish::isVegetable)
.limit(3)
.collect(toList() );
4.跳过元素 —— skip(n)

返回一个扔掉了前n个元素的流,如果流中元素不足n个,则返回一个空流

1
2
3
4
5
6
7
8
9
//skip()常和limit()搭配,用于DB批量插入
//long batchSize; 一批数据的量
Stream.iterate(0, n -> n + 1).limit(limitNum).forEach(i -> {
List<DataObj> stepList = dataList.stream()
.skip(i * batchSize)
.limit(batchSize).collect(Collectors.toList() );
//批量插入
batchInsert(stepList);
});
(二)映射
1.对流中的每一个元素应用函数 —— map

流接受一个函数作为参数,这个函数会被应用到每个元素上,并映射成一个新的元素

1
2
3
4
5
//返回每个菜名的长度
List<Integer> dishNameLengths = menu.stream()
.map(Dish::getName)
.map(String::length)
.collect(toList());
2.流的扁平化 —— flatMap

flatMap让你将一个流中的每个值都转换成另一个流,然后将每个流合并起来,生成一个扁平化的流

可以理解为将每个元素分别进行映射,然后合并为一个流

1
2
3
4
5
6
//给定单词列表,返回一张列表,列出里面各不相同的字符。如:给定单词列表["Hello", "World"],返回列表["H", "e", "l", "o", "w", "d"]
List<String> uniqueCharacters = words.stream()
.map(s -> s.split("") )
.flateMap(Arrays::stream)
.distinct()
.collect(toList() );
(三)查找匹配
1.检查谓词是否至少匹配一个元素 —— anyMatch

终端操作,返回一个boolean

1
2
3
if (menu.stream().anyMatch(Dish::isVegetable) ) {
System.out.println("vegetable!");
}
2.检查谓词是否匹配所有元素 —— allMatch/noneMatch

终端操作,返回一个boolean

1
2
3
4
//是否全部匹配
boolean isHealthy = menu.stream().allMatch(a -> a.getCalories() < 300);
//是否全部不匹配
boolean isHealthy = menu.stream().noneMatch(b -> b.getCalories() >= 300);
3.查找元素 —— findAny

终端操作,返回当前流的任意元素

1
2
3
4
5
6
7
8
9
10
11
Optional<Dish> dish = menu.stream().filter(Dish::isVegetable).findAny();

//如果存在则打印
menu.stream()
.filter(Dish::isVegetable)
.findAny()
.ifPresent(d -> System.out.println(d.getName() ) );

//找出list中是否存在TYPE_A的元素
boolean flag;
flag = list.steam().filter(i -> TypeEnum.TYPE_A == i.getType() ).findAny().isPresent();
4.查找第一个元素 —— findFirst

终端操作,返回当前流的第一个元素

1
Optional<Dish> dish = menu.stream().filter(Dish::isVegetable).findFirst();
(四)归约(折叠) —— reduce

归约操作:将一个流中的元素组合起来,归约成一个值。

1.求和/积

reduce接受两个参数:

  1. 一个初始值(可以不接受初始值,但是会返回一个Optional对象)
  2. 一个BinaryOperator<T>来将两个元素结合起来产生一个新值
1
2
3
4
5
6
7
//求和
int sum = nums.stream().reduce(0, (a, b) -> a + b);
//求积
int mul = nums.stream().reduce(1, (a, b) -> a * b);

//不接受初始值,返回一个Optional对象
Optional<Integer> sumMaybeNotExist = nums.stream().reduce((a, b) -> a + b);
2.最大/小值

reduce接受两个参数:

  1. 一个初始值(可以不接受初始值,但是会返回一个Optional对象)
  2. 一个Lambda来将两个流元素结合起来产生一个新值
1
2
Optional<Integer> max = nums.stream().reduce(Integer::max);
Optional<Integer> min = nums.stream().reduce(Integer::min);
(五)数值流 & 对象流的转换
  1. 对象流 -> 数值流:mapToInt()/mapToDouble()/mapToLong()分别可以将对象流转换为IntStream/DoubleStream/`LongStream
1
2
3
4
5
6
7
8
//计算元素总和,存在一个自动装箱和自动拆箱的成本:
//装箱好理解,getCalories得到的int想要调用Integer的sum,必须要装箱;
//但是为什么会存在拆箱的成本呢?答案是.stream()返回的Stream<T>存在泛型,泛型不允许基本数据类型,所以返回的是Integer,想要调用getCalories就必须要拆箱
int calories = menu.stream().map(Dish::getCalories).reduce(0, Integer::sum);
//那么可以将代码如下吗?
int calories = menu.stream().map(Dish::getCalories).sum(); //很遗憾,这时错误代码,map方法生成的是Stream<T>,该接口不存在sum方法
//我们可以将对象流转换成数值流再进行计算
int calories = menu.stream().mapToInt(Dish::getCalories).sum();
  1. 数值流 -> 对象流:boxed()
1
2
IntStream intStream = menu.strem().mapToInt(Dish::getCalories);
Stream<Dish> s = intStream.boxed();
  1. 数值范围 —— range(int start)/rangeClosed(int start, int end)
    range(int start)/rangeClosed(int start, int end)是可以用于IntStrem/LongStrem的静态方法,生成范围内的数值。
1
2
//1-100之间的偶数数值流
IntStream evenNums = IntStream.rangeClosed(0, 100).filter(n -> n % 2 == 0);

终端操作

生成结果、关闭流的操作称为终端操作,其中最常用的是收集器 collect

操作目的
forEach消费流中的每个元素并对其应用Lambda,返回void
count返回流中元素的个数,返回long
collect将流归约成一个集合,比如ListMap甚至是Integer
预定义收集器

预定义收集器是可以从Collector提供的工厂方法创建的收集器。如groupingBy

功能主要分为3类:

  1. 将流元素归约、汇总成一个值:如countingmaxByminBysummingIntaveragingIntsummarizingIntjoiningreducing
  2. 元素分组:groupingBy
  3. 元素分区:partitioningBy

具体有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//List<T> toList()  将流中所有项目收集到一个List
List<Dish> dishes = menu.stream().collect(toList() );

//Set<T> toSet() 将流中所有项目去重后,收集到一个Set
Set<Dish> dishes = menu.stream().collect(toSet() );

//Collection<T> toCollection() 将流中所有项目收集到给定的供应源创建的集合
Collection<Dish> dishes = menu.stream().collect(toCollection(), ArrayList::new);

//Long counting() 计算流中元素个数
long howManyDishes = menu.stream().collect(counting() );

//Integer summingInt() 对流中项目的一个整数属性求和
int totalCalories = menu.stream().collect(summingInt(Dishes::getCalories) );

//Double averagingInt() 计算流中项目Integer属性的平均值
double avgCalories = menu.stream().collect(averagingInt(Dishes::getCalories) );

//IntSummaryStatistics summarizingInt() 计算流中项目Integer属性的统计值,如最大、最小、总和、平均值
IntSummaryStatistics menuStatistics = menu.stream().collect(summarizingInt(Dishes::getCalories) );

//String joining(String s) 连接对流中每个项目调用toString()生成的字符串
//joining可以接受一个字符串作为分隔符
String shortMenu = menu.stream().map(Dish::getName).collect(joining(", ") );

//Optional<T> maxBy 一个包裹了流中按照给定Comparator选出的最大元素的Optional,如果流为空则为Optional.empty()
Optional<Dish> fattest = menu.stream().collect(maxBy(ComparingInt(Dish::getCalories) ) );

//Optional<T> minBy 一个包裹了流中按照给定Comparator选出的最小元素的Optional,如果流为空则为Optional.empty()
Optional<Dish> lightest = menu.stream().collect(minBy(ComparingInt(Dish::getCalories) ) );

//reducing() 返回类型:归约操作产生的类型
//从一个作为累加器的初始值开始,利用BinaryOperator与流中的元素逐个结合,从而将流归约成单个值
//reducing包含3个参数:
//1.归约操作的初始值,也就是流中没有元素的返回值
//2.一个转换函数,这里是将Dish对象转为所含卡路里的int
//3.一个累积函数BinaryOperator,将两个项目累积成一个同类型的值,这里就是对两个int求和
int totalCalories = menu.stream().collect(reducing(0, Dish::getCalories, Integer::sum) );
//reducing也可以只传入一个参数,作为转换函数使用
Optional<Dish> mostCaloriesDish = menu.stream().collect(reducing((d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2) );

//collectingAndThen() 返回类型:转换函数返回的类型
//包裹另一个收集器,将其结果应用转换函数
int howManyDishes = menu.stream().collect(collectingAndThen(toList(), List::size) );

//Map<K, List<T>> groupingBy 接受一个分类函数和一个收集器作为参数,根据分类函数对流中的项目进行分组,并将属性值作为结果Map的键
//收集器可以传,类型可以为任何收集器,如groupingBy、counting();也可以不传
Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(groupingBy(Dish::getType) );
//多级分组,传入的参数是分类函数和另一个groupingBy收集器
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel = menu.stream().collect(
groupingBy(Dish::getType,
groupingBy(dish -> {
if (dish.getCalories() <= 400) {
return CaloricLevel.DIET;
}else if (dish.getCalories() <= 700) {
return CaloricLevel.NORMAL;
}else return CaloricLevel.FAT;
} ) )
);
//按子组收集数据,这里传入的是一个分类函数和一个counting收集器
Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting() ) );

//Map<Bollean, List<T>> partitioningBy 根据对流中每个项目应用谓词的结果来对项目进行分区
//使用方法与groupingBy很类似,不同的是Map的键是boolean类型
Map<Boolean, List<Dish>> meatDishes = menu.stream().collect(partitioningBy(Dish::isMeat) );
自定义收集器

对于自定义的接口,只需要imlements Collector<T, A, R>接口,@Override Supplier<A> supplier()@Override BiConsumer<A, T> accumulator()@Override Function<A, R> finisher()@Override BinaryOperator<A> combiner()@Override Set<Characteristics> characteristics()即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//T:要收集的项目的泛型
//A:累加器的类型
//R:收集操作得到的对象的类型
public interface Collector<T, A, R> {
//作用:建立新的结果容器
//所以必须返回一个结果为空的Supplier,也就是一个无参函数,在调用它时创建一个空的累加器实例,供数据收集过程使用
Supplier<A> supplier();

//作用:将元素添加到结果容器
BiConsumer<A, T> accumulator();

//作用:对结果容器应用最终转换
Function<A, R> finisher();

//作用:合并两个结果容器
BinaryOperator<A> combiner();

//作用:返回一个不可变的Characteristics集合,定义了收集器的行为——尤其是关于流是否可以并行归约,以及可以使用哪些优化的提示。
//Characteristics是一个包含3个项目的枚举:
//1.UNORDERED:归约结果不受流中项目的遍历和累积顺序的影响
//2.CONCURRENT:accumulator函数可以从多个线程同时调用,且该收集器可以并行归约
//3.IDENTITY_FINISH:表明finisher()是一个恒等函数,可以跳过
Set<Characteristics> characteristics();
}

//自定义Collector
public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
@Override
public Supplier<List<T>> supplier() {
return ArrayList::new;
}

@Override
public BiConsumer<List<T>, T> accumulator() {
return ArrayList::add;
}

@Override
public Function<List<T>, List<T>> finisher() {
return Function.identity();
}

@Override
public BinaryOperator<List<T>> combiner() {
return (list1, list2) -> {
list1.addAll(list2);
return list1;
}
}

@Override
public Characteristics characteristics() {
return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, CONCURRENT) );
}

}

并行流

定义

并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。

顺序流 & 并行流 的转换

顺序流->并行流:.parallel()
并行流->顺序流:.sequential()
值得注意的是最后一次.parallel()/.sequential()调用会影响整个流水线,如:

1
2
// 流水线将并行执行,因为最后一次操作是.parallel()
stream.parallel().filter(...).sequential().map(...).parallel().reduce();

ForkJoinPool线程池

并行流内部使用了默认的ForkJoinPool,默认的线程数量就是处理器数量,通过Runtime.getRuntime().availableProcessors()得到。

你可以通过修改系统属性java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小, 如System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,”12”);
这是一个全局设置,它会影响代码中所有的并行流,强烈建议不要修改默认值。

不同数据源是否适合并行流

可分解性
ArrayList极佳
LinkedList
IntStream.range极佳
Stream.iterate
HashSet
TreeSet

注意事项

  1. 共享可变状态会影响并行流及并行计算
  2. 自动装箱拆箱会极大影响性能,所以但凡有可能都尽可能使用IntStrem/LongStrem/DoubleStream
  3. 有些操作本身在并行流上的性能就要比顺序流差,有Iterator迭代limitfindFirst链表操作
  4. 对于少量数据,使用并行流得不偿失

分支/合并框架(Fork/Join)

目的

以递归的方式将并行的任务拆分为更小的任务,然后将每个子任务的结果合并起来生成整体结果。

分治法:分-治-合

实现

实现ExecutorService接口,将子任务分配给ForkJoinPool

任务提交

创建RecursiveTask<R>的子类,实现protected abstract R compute();方法,这个方法将任务拆分为子任务(分),直到无法拆分或不方便拆分时,生成单个子任务结果(治),最后合并所有结果(合)。
如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;

public static final long THRESHOLD = 10_000;

...

@Override
protected long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
//分
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
leftTask.join();
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
//对子任务调用`fork`会把它排进`ForkJoinPool`,但是同时对左右两边的子任务调用`fork`会比直接对其中一个调用`compute`慢
Long rightRst = rightTask.compute();
Long leftRst = leftTask.join();
return leftRst + rightRst;
}

...

}

注意事项

  1. 对一个任务调用join会阻塞调用方,直到该任务得到结果。因此有必要在两个子任务的计算都开始后再调用它,否则每个子任务都必须等待另一个子任务完成,会比顺序执行更慢更复杂。
  2. 不要在RecursiveTask内部使用ForkJoinPoolinvoke方法。
  3. 对子任务调用fork会把它排进ForkJoinPool,但是同时对左右两边的子任务调用fork会比直接对其中一个调用compute慢,因为后者可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销

工作窃取(work stealing)

将任务拆分为大量的小任务总是一个更好的选择,理想情况下,划分并行任务应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。
然而,实际情况下,每个子任务所化的时间天差地别,可能是因为划分策略效率低下,也有可能是不可预知的原因,如磁盘访问慢等。
Fork/Join框架提供了一种称为工作窃取(work stealing)的技术来解决这个问题——
Fork/Join框架将任务差不多平均分配到ForkJoinPool的所有线程上,每个线程都为它的任务保存一个双向链表队列,每完成一个任务,就会从队列头取出下一个任务继续执行,若某个线程完成了分配给它的所有任务,也就是它的队列空了,而其他线程还很忙,它就会随机选择一个别的线程,从它的队尾“偷走”一个任务,这个过程一直继续,直到所有任务都执行完毕。

Splitrator(可分迭代器,Splitable Iterator)

目的

并行遍历数据源中的元素。

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface Spliterator<T> {
// tryAdvance类似于普通的Iterator,都是按顺序遍历元素,如果还有其他元素要遍历就返回true
// 但tryAdvance可以把一些元素划分出去分给第二个Spliterator,让它们两个并行处理
boolean tryAdvance(Consumer<? super T> action);

// 拆分Spliterator,不能再分割则返回false
Splitrator<T> trySplit();

// 粗略估计还剩多少元素要遍历,有利于均匀拆分
long estimateSize();

// 特性
//ORDERED 元素有既定的顺序,如List,因此Spliterator遍历和划分时也会遵循这一规则
//DISTINCT 对于任意一对遍历过的元素x和y,x.equals(y)返回false
//SORTED 遍历的元素按照一个预定的顺序排序
//SIZED 该Spliterator由一个已知大小的源建立,如Set,因此estimatedSize()返回的是准确值
//NONNULL 保证遍历的元素不为null
//IMMUTABLE Spliterator的数据源不可变。意味着遍历时不能添加、删除或修改任何元素
//CONCURRENT 该Spliterator的数据源可以被其他线程同时修改而无需同步
//SUBSIZED 该Spliterator和所有从它拆分出来的Spliterator都是SIZED
int characteristics();
}

四、默认方法(default)

关键字:default

优点:默认方法的出现有利于接口的改进,可以扩展接口的功能,同时又不破坏原有的引用。

试想一下,我们有如下的一段代码:

1
2
3
4
5
List<Apple> heavyApples1 = inventory.stream().filter((Apple a) -> a.getWeight() > 150)
.collect(toList());

List<Apple> heavyApples2 = inventory.parallelStream().filter((Apple a) -> a.getWeight() > 150)
.collect(toList());

在Java 8之前,List<T>Collection<T>接口中都没有stream方法或parallelStream方法,如果要使用这两个方法要怎么办呢?

最简单的方法是在List<T>Collection<T>接口中加入这两个方法,但是如果这么做了,所有List<T>Collection<T>接口的实现类都需要重新@Override
这会导致已有的代码全部需要重构,工作量是不可想象的,所以Java 8不得已引入了默认方法这个概念,允许接口的实现类不实现接口的default方法
这样就不需要重构原有的代码,只需要将stream方法和parallelStream方法设置为默认方法即可。

一个类可以implements多个接口,这时候如果多个接口含有相同的default methods,就会造成歧义,解决方法有两种:重载/使用ClassName.super显式声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface Vehicle{
default void print() {
System.out.println("vehicle!");
}
}

public interface FourWheeler{
default void print() {
System.out.println("4wheeler!");
}
}

//解决歧义方法1 —— 在实现类中重载default方法
public class car1 implements Vehicle, FourWheeler{
public void print() {
System.out.println("car!");
}
}

//解决歧义方法2 —— 通过ClassName.super显式声明调用的哪个default方法
public class car2 implements Vehicle, FourWheeler{
default void print() {
Vehicle.super.print();
}
}

五、其他函数式编程思想

空引用(null引用)(java.util.Optional)

定义

Optional<T>是一个容器类,代表一个值存在或者不存在。
Optional<T>类中存在方法来明确处理值不存在的情况,这样就可以避免NullPointerException,防止null检查相关bug。

null检查的缺点:
NullPointerException是目前Java开发中最典型的异常。
深度嵌套的null检查会严重降低代码的可读性。
null本身没有语义。
Java一直避免程序员意识到指针的存在,唯一的例外是null指针。

相关方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//创建Optional对象
Optional.empty() //生成空的Optional对象
Optional.of() //生成非null的Optional对象,如果传入空值,立马抛出NullPointerException
Optional.ofNullable() //生成一个允许null值的Optional对象

isPresent() //Optional为null时返回false,非null时返回true
ifPresent(Consumer<T> block) //Optional非null时执行相应代码

T get() //存在值则返回值,不存在值返回NoSuchElement异常
T orElse(T other) //存在值时返回值,否则返回一个默认值
T orElseGet(Supplier<? extends T> other)
//orElse的延迟调用版,Supplier方法只有在Optional对象不含值时菜执行调用
T orElseThrow(Supplier<? extends X> exceptionSupplier)
//类似get,不过可以定制自己的异常类型

几种模式

(一)使用map从Optional对象中提取和转换值
1
2
Optional<Insurance> optInsurance = Optional.ofNullable(insurance);
Optional<String> name = optInsurance.map(Insurance::getName);
(二)使用flatMap链接Optional对象
1
2
3
4
5
6
public String getCarInsuranceName(Optional<Person> person) {
return person.flatMap(Person::getCar)
.flatMap(Car::getInsurance)
.map(Insurance::getName)
.orElse("Unknown");
}
(三)两个Optional对象的组合
1
2
3
public Optional<Insurance> nullSafeFindCheapestInsurance(Optional<Person> person, Optional<Car> car) {
return person.flatMap(p -> car.map(c -> findCheapesetInsurance(p, c) ) );
}
(四)使用filter剔除特定的值
1
2
optInsurance.filter(insurence -> "CambridgeInsurance".equals(insurance.getName() ) )
.ifPresent(x -> System.out.println("ok") );

模式匹配


六、CompletableFuture:组合式异步编程

Future接口

原理

对将来某个时刻会发生的结果进行建模,它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。

优点

  1. Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不必等待耗时的操作完成。
  2. FutureThread更易用,只需要将耗时的操作封装到一个Callable对象中,然后提交给ExecutorService即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() {
return doSomeLongComputation(); //在Future中执行耗时操作
}
});
doSomethingElse(); //异步操作进行的同时,可以做其他事情

try{
//获取异步操作的结果,如果最终被阻塞,无法得到结果,那么最多等待一秒后退出
Double rst = future.get(1, TimeUnit.SECONDS);
}catch(ExecutionException ee){
//计算抛出一个异常
}catch(InterruptedException ie){
//当前线程在等待过程中被中断
}catch(TimeoutException te){
//在Future对象完成之前超过已过期
}

局限性

Future接口很难完成以下操作:

  1. 将两个一部计算合并为一个——这两个异步计算之间相互独立,同时又依赖于第一个的结果
  2. 等待Future集合中的所有任务都完成
  3. 仅等待Future集合中最快结束的任务完成(它们试图通过不同方式计算同一个值),并返回结果
  4. 通过编程方式完成一个Future任务的执行(即手工设定异步操作结果)
  5. 当Future完成事件发生时会收到通知,并能使用Future计算的结果进行下一步操作,而不是简单地阻塞等待操作的结果

使用CompletableFuture构建异步应用

(1)创建异步操作:runAsync/supplyAsync

其中runAsync不支持返回值,supplyAsync支持返回值:

1
2
3
4
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

具体使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//无返回值
public static void runAsync() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
});

future.get();
}

//有返回值
public static void supplyAsync() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
return System.currentTimeMillis();
});

long time = future.get();
System.out.println("time = "+time);
}

(2)计算结果完成时的回调方法:whenComplete/whenCompleteAsync/exceptionally

当计算结果完成或抛出异常时,可以执行特点的Action。其中whenComplete是执行当前任务的线程继续执行whenComplete的任务,而whenCompleteAsync是把任务提交给线程池执行:

1
2
3
4
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

具体使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void whenComplete() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if(new Random().nextInt()%2>=0) {
int i = 12/0;
}
System.out.println("run end ...");
});

future.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void t, Throwable action) {
System.out.println("执行完成!");
}

});
future.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable t) {
System.out.println("执行失败!"+t.getMessage());
return null;
}
});

TimeUnit.SECONDS.sleep(2);
}

(3)两个线程的串行化:thenApply

当一个线程依赖另一个线程时,可以用thenApply将两个线程串行化。其中T是上一个任务返回结果的类型,U是当前任务的返回类型:

1
2
3
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

具体使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static void thenApply() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
long result = new Random().nextInt(100);
System.out.println("result1="+result);
return result;
}
}).thenApply(new Function<Long, Long>() {
@Override
public Long apply(Long t) {
long result = t*5;
System.out.println("result2="+result);
return result;
}
});

long result = future.get();
System.out.println(result);
}

(4)执行任务完成时对结果进行处理:handle/handleAsync

handle是在任务完成后再执行,还可以处理异常的任务。thenApply只可以执行正常的任务,任务出现异常则不执行thenApply方法:

1
2
3
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

具体使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void handle() throws Exception{
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {

@Override
public Integer get() {
int i= 10/0;
return new Random().nextInt(10);
}
}).handle(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer param, Throwable throwable) {
int result = -1;
if(throwable==null){
result = param * 2;
}else{
System.out.println(throwable.getMessage());
}
return result;
}
});
System.out.println(future.get());
}

(5)消费处理结果:thenAccept/thenAcceptAsync

接收任务的处理结果,并消费处理,无返回结果:

1
2
3
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

具体使用如下:

1
2
3
4
5
6
7
8
9
10
11
public static void thenAccept() throws Exception{
CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return new Random().nextInt(10);
}
}).thenAccept(integer -> {
System.out.println(integer);
});
future.get();
}

(6)thenRun

(7)thenCombine

(8)thenAcceptBoth

(9)applyToEither

(10)acceptEither

(11)runAfterEither

(12)runAfterBoth

(13)thenCompose


七、新的日期和时间API

Java8中的Date/Time API相较于之前版本的优点:

  1. 线程安全:Java8之前的Date/Time API是非线程安全的,Java8中的Date/Time API是不可变的,所以天生线程安全。
  2. Java8中的Date/Time API提供了更多操作方法。
  3. 本地Date/Time API LocalDateTimeLocalTimeLocalDate得以简化,不需要处理时区操作;时区Date/Time APIZonedDateTime可以处理各种时区。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//LocalDateTime & LocalDate & LocalTime
LocalDateTime currentTime = LocalDateTime.now(); //获取当前本地时间
LocalDate now = currentTime.toLocalDate(); //获取当前本地日期
Month month = currentTime.getMonth(); //月
int day = currentTime.getDayOfMonth(); //日
int seconds = currentTime.getSecond(); //秒

//通过ChronoUnit修改时间
LocalDate nextWeek = now.plus(1, ChronoUnit.WEEKS); //ChronoUnit.WEEKS,一周后
LocalDate nextMonth = now.plus(1, ChronoUnit.MONTHS); //ChronoUnit.MONTHS,一月后
LocalDate nextYear = now.plus(1, ChronoUnit.YEARS); //ChronoUnit.YEARS,一年后
LocalDate nextDecade = now.plus(1, ChronoUnit.DECADES); //ChronoUnit.DECADES,十年后

//修改年(2012)、日(10)
LocalDateTime date2 = currentTime.withDayOfMonth(10).withYear(2012);
//通过LocalDate.of(year, MONTH, day)生成日期
LocalDate date3 = LocalDate.of(2014, Month.DECEMBER, 12);
//通过LocalTime.of(hour, minute)生成时间 —— 22:15
LocalTime time1 = LocalTime.of(22, 15);
//通过字符串生成时间
LocalTime time2 = LocalTime.parse("20:15:30");

//ZonedDateTime
ZonedDateTime zonedDate = ZonedDateTime.parse("2007-12-03T10:15:30+05:30[Asia/Karachi]");
ZoneId id = ZoneId.of("Europe/Paris");
ZoneId currentZone = ZoneId.systemDefault();
  1. LocalDateTime的加减操作、判断先后关系
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//获取当前时间
LocalDateTime now = LocalDateTime.now();
System.out.println("now: " + now);

//加/减x小时 (年、月、日、分、秒 同理)
LocalDateTime before4h = now.minusHours(4L);
System.out.println("4h前: " + before4h);
LocalDateTime after4h = now.plusHours(4L);
System.out.println("4h后: " + after4h);

// 输出:
// now: 2020-04-27T16:42:52.254
// 4h前: 2020-04-27T12:42:52.254
// 4h后: 2020-04-27T20:42:52.254

System.out.println(before4h.isAfter(now) );
System.out.println(after4h.isBefore(now) );

//输出:
//false
//false
  1. LocalDateTimeLong类型的时间戳相互转换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//获取时间戳
Long timestamp = System.currentTimeMillis();
System.out.println("时间戳: " + timestamp);
//将时间戳转为LocalDateTime
LocalDateTime now = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault() );
System.out.println("LocalTimeStamp: " + now);
//将LocalDateTime转为时间戳
Long transfer = now.atZone(ZoneId.systemDefault() ).toInstant().toEpochMilli();
System.out.println("转回时间戳: " + transfer);

//输出:
// 时间戳: 1587977423910
// LocalTimeStamp: 2020-04-27T16:50:23.910
// 转回时间戳: 1587977423910
-------------本文结束感谢您的阅读-------------

本文标题:《Java 8 in Action》

文章作者:DragonBaby308

发布时间:2019年05月26日 - 19:11

最后更新:2020年04月27日 - 20:58

原始链接:http://www.dragonbaby308.com/java-8-in-action/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

急事可以使用右下角的DaoVoice,我绑定了微信会立即回复,否则还是推荐Valine留言喔( ఠൠఠ )ノ
0%