Java多线程 、并发

多线程 & 并发


(一)基础知识

进程 & 线程

  1. 进程:操作系统分配资源的基本单位
  2. 线程:CPU调度的基本单位,一个进程可以包含多个线程

    线程共享区域(进程独占):虚拟机堆、方法区
    线程独占区域:虚拟机栈、本地方法栈、程序计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//通过JMX查看普通Java程序有哪些线程
public class MultiThread {
public static void main(String[] args) {
//获取线程管理Bean
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
//不获取monitor和synchronized信息,仅打印堆栈
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
for (ThreadInfo threadInfo : threadInfos) System.out.println("id: " + threadInfo.getThreadId()
+ ", name: " + threadInfo.getThreadName());
}
// 结果:
// id: 6, name: Monitor Ctrl-Break
// id: 5, name: Attach Listener 添加事件
// id: 4, name: Signal Dispatcher 分发处理给JVM信号的线程
// id: 3, name: Finalizer 调用对象finalize方法的线程
// id: 2, name: Reference Handler 清除reference线程
// id: 1, name: main main线程,主程序入口
}
  1. 协程:协程可以看作轻量级的线程,它共享堆空间、拥有独立的栈空间,不同于线程的是,协程的调度由用户自己控制(用户态),而不是由OS控制(内核态)。一个线程上可以运行多个协程

1.进程间通信(IPC)

  1. 管道(pipeline)
  2. 信号量(semaphore)
  3. 消息队列
  4. 共享内存
  5. socket & stream

2.线程间通信

(1)synchronized + wait()/notify()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ShareResource {
private String name; //姓名
private String gender; //性别

//共享资源是否为空
private boolean isEmpty = true;

//Object#wait()/notify()/notifyAll()需要在同步代码块中使用,见第(二)节

//生产者:向共享资源存储数据
synchronized public void push(String name, String gender) {
try {
while(!isEmpty) {
//共享资源非空,生产者释放锁,等待唤醒,不再生产
this.wait();
}
this.name = name;
this.gender = gender;
isEmpty = false;
//唤醒消费者,进行消费
this.notify();
}catch (Exception e) {
//...
}
}

//消费者:消费共享资源
synchronized public void pop() {
try {
while(isEmpty) {
//共享资源为空,消费者释放锁,等待唤醒
this.wait();
}
//消费
System.our.println(this.name + "-" + this.gender)
isEmpty = true;
//唤醒生产者,进行生产
this.notify();
}catch (InterruptedException e) {
//...
}
}

}

(2)Lock + Condition

Lock机制不能调用Object#wait()/notify()机制,所以Java提供了用于控制Lock机制通信的Condition接口

  1. Lock#newCondition()生成一个与当前重入锁绑定的Condition实例,通过该实例就可以进行线程通信
  2. Condition#await()让线程等待通知
  3. Condition#signal()让线程得到通知,继续执行

    注意:await()/signal()也存在Lost Wake Up问题,需要在同步块(Lock#lock()/unlock())中执行!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class ShareResource {
private String name; //姓名
private String gender; //性别

//共享资源是否为空
private boolean isEmpty = true;

private Lock lock = new ReenterantLock(); //可重入锁
private Condition condition = lock.newCondition(); //Condition实例

//Condition#await()/signal()/signalAll()需要在同步代码块中使用,见第(二)节

//生产者:向共享资源存储数据
public void push(String name, String gender) {
//加锁
lock.lock();
try {
while(!isEmpty) {
//共享资源非空,等待唤醒,不再生产
condition.await();
}
this.name = name;
this.gender = gender;
isEmpty = false;
//唤醒消费者,进行消费
condition.signalAll();
}catch (Exception e) {
//...
}finally{
//释放锁
lock.unlock();
}
}

//消费者:消费共享资源
public void pop() {
//加锁
lock.lock();
try {
while(isEmpty) {
//共享资源非空,等待唤醒,不再生产
condition.await();
}
//消费
System.our.println(this.name + "-" + this.gender)
isEmpty = true;
//唤醒生产者,进行生产
condition.signalAll();
}catch (Exception e) {
//...
}finally{
//释放锁
lock.unlock();
}
}

}

(3)Semaphore:允许多个线程同时访问

(七)- AQS - Semaphore


(4)Thread#join():等待子线程执行完成

(二)线程状态变迁 - join()yield()的区别


使用线程的优势

  1. 使用线程可以更好地利用CPU资源
  2. 使用线程代替进程,减少了进程间切换的巨量开销

使用线程的缺点

使用线程引入了线程上下文切换的开销。

CPU采用时间片轮转的方式进行线程调度,一个线程的时间片用完就需要让出CPU给其他线程,这时候它需要保存现场,方便下次获得时间片后从中断位置继续执行。任务的保存到再加载就是一次线程上下文切换


(二)线程状态变迁

  1. NEW:初始状态。线程被创建,但是没有通过.start()启动;
  2. RUNNABLE:运行状态。Java中的RUNNABLE包括OS中的RUNNABLERUNNING

    RUNNABLE:通过.start()启动线程

RUNNING:线程获得了CPU时间片,正式运行

  1. BLOCKED:阻塞状态。线程阻塞于锁

  2. WAITING:等待状态。当前线程需要等待其他线程通知(notify()/notifyAll())或中断

  3. TIME_WAITING:超时等待状态。

    WAITING是通过wait()/join()进入的,TIME_WAITING是通过wait(long timeout)/join(long timeout)/sleep(long timeout)进入的,可以在指定时间内自行返回

  4. TERMINATED:终止状态。run()方法结束,或抛出异常导致程序终止


wait()sleep()的区别

  1. wait()会释放锁,sleep()不会

    你可以如此理解,sleep()到了时间会自动醒来,所以不需要释放锁

  2. wait()不会主动苏醒,需要等待其他线程notify()/notifyAll(),但是wait(long timeout)会主动苏醒;sleep(long timeout)会主动苏醒

  3. wait()用于线程间交互/通信,sleep()用于暂停


wait()/notify()/notifyAll()强制要求写在同步代码块中

1.Object#wait()

调用Object#wait()时,等待线程会释放、并等待对象的monitor锁,直到其他线程调用其等待对象的notify()/notifyAll(),唤醒等待线程,等待线程会尝试重新获取对象的monitor锁,然后再恢复执行

  • IllegalMonitorStateException当前线程执行方法时必须持有对象的monitor锁,也就是说对象必须在同步块内,否则抛出此异常
  • InterruptedException:其他线程打断了当前线程的等待

2.错误示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//不做同步
try {
new Object().wait();
}catch (Exception e){
e.printStackTrace();
}

//不在正确对象的monitor锁上同步
Object monitoredObj = new Object(); //需要同步的对象
Object otherObj = new Object(); //其他对象
synchronized (otherObj) { //尝试获取一个对象的锁,却在其他对象上同步,等同于没锁
try {
monitoredObj.wait();
}catch (Exception e){
e.printStackTrace();
}
}

//因为wait()不在同步块内,当前线程运行时不一定有对象的monitor锁,所以都会抛出异常:
//java.lang.IllegalMonitorStateException
//at java.lang.Object.wait(Native Method)
//at java.lang.Object.wait(Object.java:502)
//……

3.Lost Wake Up问题

  • 生产者:
1
2
cnt++;
notify();
  • 消费者:
1
2
while(cnt <= 0) wait();
cnt--;
  • 初始时cnt = 0,消费者检查发现满足while条件,决定执行wait(),但是还没有执行;生产者执行cnt++,然后调用notify(),可是此时消费者还没有执行wait(),所以notify()无效;在这个时候消费者执行完了wait(),释放锁,进入等待。

  • wait()/notify()/notifyAll()不使用同步块的话,由于竞态条件造成的执行顺序不同,会导致notify()/notifyAll()唤醒不了wait()的线程,这就是大名鼎鼎的Lost Wake Up问题。

    j.u.c.locks.Condition#await()/signal()同样有这个问题。

  • 所以:Javawait()/notify()/notifyAll()强制要求写在同步代码块中

4.正确示例

1
2
3
4
5
6
7
8
9
Object monitoredObj = new Object();

synchronized (monitoredObj) { //在正确对象上进行同步
try {
monitoredObj.wait(); //可以保证不出现Lost Wake Up问题
}catch (Exception e){
e.printStackTrace();
}
}

join()yield()的区别

  1. join()会让一个RUNNING线程阻塞成WAITING状态,join(long)会让一个RUNNING线程阻塞成TIMED_WAITING状态,都是让主线程阻塞,等待子线程执行完成
  2. yield()是让一个RUNNING线程放弃时间片,变为RUNNABLE状态

(三)线程 创建 & 运行

1.继承Thread

创建线程:extends Thread
运行线程:.start()

为什么需要通过.start()启动线程?直接调用线程的.run()不行吗?
直接调用线程的.run()方法会在当前线程(比如main线程)内执行.run(),并不会新启动一个线程。调用.start()才会启动线程并使线程进入就绪状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//创建
public class MyThread extends Thread{
@Override
public void run() {
super.run();
System.out.println("MyThread");
}
}

//运行
public class Run{
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}
}

2.实现Runnable接口

创建线程:implements Runnable可以通过Lambda表达式创建匿名内部类
运行线程:.start()

1
2
3
new Thread(
() -> System.out.println("Runnable in Java 8 Lambda"))
.start();

3.实现Callable接口

4.线程池

优点

  1. 线程的创建和销毁需要消耗大量的系统资源,使用线程池可以复用线程,减少线程创建和销毁的开销
  2. 提高响应速度

任务提交

  • 提交到线程池的任务需要实现Runnable/Callable接口,区别是Callable可以返回结果,而Runnable无返回结果
  • execute()用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功;submit()用于提交需要返回值的任务,线程池会返回一个Future类型的对象,通过这个Future对象可以判断任务是否执行成功,并且可以通过get()获取返回值

创建线程池:ThreadPoolExecutor

不推荐通过Executors的工厂方法(比如ExecutorService threadPool = Executors.newFixedThreadPool(5);)创建线程池。
newFixedThreadPoolnewSingleThreadPool工厂方法创建的线程池工作队列允许Integer.MAX_VALUE,可能造成请求大量堆积而OOM;而newCachedThreadPoolnewScheduledThreadPool工厂方法创建的线程池最大线程为Integer.MAX_VALUE,可能创建大量线程造成OOM
使用ThreadPoolExecutor的方式可以让人更加明确线程池的运行规则,避免资源耗尽的风险

1
2
3
4
5
6
7
8
9
10
public ThreadPoolExecutor(
int corePoolSize, //核心线程数,工作队列没满时,线程最大并发数
int maximumPoolSize, //最大线程数,工作队列满后,线程最大并发数
long keepAliveTime, //空闲线程过多久被回收
TimeUnit unit,
BlockingQueue<Runnable> workQueue, //工作队列,分为有界队列和无界队列
//无界队列可能导致OOM,极不推荐
ThreadFactory ThreadFactory, //用于设置创建线程的工厂,可以给创建的线程设置有意义的名字,方便排查问题
RejectExecutionHandler hadler //线程池满后,任务的拒绝策略
){...}

corePoolSize就像是公司正式员工,maximumPoolSize是外包员工,workQueue则是需求池。
如果有需求,正式员工先接;正式员工手里有需求,产品就将需求放到需求池;如果需求池是有界的,满了以后,会将需求交给外包员工;如果正式员工和外包员工手里都有需求,需求池还满了,产品就不得不放弃部分需求。
外包员工做完需求,经过一段空闲时间(keepAliveTime),就可以滚蛋了;经济不景气的时候,正式员工空闲一段时间也可以滚蛋。

线程池最佳大小公式:
最佳线程数量 = (线程等待时间 + 线程CPU时间) / 线程CPU时间 * CPU数量
由此可以看到:
1.线程等待时间越长的任务(比如I/O密集型任务),最佳线程数量就越多
2.线程等待时间越短的任务(比如CPU密集型任务),最佳线程数就越少,因为CPU的频繁调度会影响任务的执行

当新任务加入线程池时:

  1. 如果线程池中线程数小于corePoolSize,新提交任务将创建一个新线程执行,即使线程池中存在空闲线程

  2. 如果线程池中线程数达到了corePoolSize,新提交的任务将被加入workQueue,等待线程池中任务调度执行

    线程池有哪几种工作队列?

  3. ArrayBlockingQueue:数组实现的FIFO有界阻塞队列;

  4. LinkedBlockingQueue:链表实现的FIFO阻塞队列,容量可以自行设置,设置了容量是有界队列,不设置则是一个无界队列,最大容量Integer.MAX_VALUEnewFixedThreadPool使用了这个队列;

  5. DelayQueue:延迟队列。newScheduledThreadPool使用了这个队列;

  6. PriorityBlockingQueue:具有优先级的无界阻塞队列;

  7. SynchronousQueue:同步队列。newCachedThreadPool使用了这个队列

  8. workQueue已满,且maximumPoolSize > corePoolSize,新提交任务会创建新线程执行

  9. workQueue已满,且提交任务数超过了maximumPoolSize,任务交给RejectExecutionHandler处理

    4种拒绝策略:

  10. AbortPolicy默认,抛出异常

  11. DiscardPolicy直接丢弃任务,不抛出异常

  12. DiscardOldestPolicy丢弃队列中最老的任务,当前任务继续提交

  13. CallerRunsPolicy交给线程池调用所在的线程进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//拒绝策略接口
public interface RejectedExecutionHandler{
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

//AbortPolicy:中止策略
public static class AbortPolicy implements RejectedExecutionHandler{
public AbortPolicy() {}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//直接抛出拒绝执行的异常
throw new RejectedExecutionException("Task "
+ r.toString + " rejected from "
+ e.toString);
}
}

//DiscardPolicy:丢弃策略
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() {}

@Override
//直接悄悄丢弃这个任务,不触发任何动作
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
}

//DiscardOldestPolicy:弃老策略
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() {}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//如果线程池未关闭,弹出队首任务,尝试执行
if(!e.isShutdown()){
e.getQueue().poll();
e.execute(r);
}
}
}

//CallerRunsPolicy:调用者运行策略
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() {}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//只要线程池没有关闭,就由提交任务的当前线程处理
if(!e.isShutdown()){
r.run();
}
}
}
  1. 当线程池中线程数超过corePoolSize,且超过这部分的空闲时间达到keepAliveTime,回收这些线程;如果设置了allowCoreThreadTimeOut(true)corePoolSize中的线程空闲时间到达keepAliveTime也会被回收

种类

  1. FixedThreadPool:固定大小线程池,使用的是无界阻塞队列【LinkedBlockingQueuecorePoolSize == maximumPoolSizekeepAliveTime == 0。适用于处理CPU密集型的长期任务,即确保CPU长期被工作线程处理的情况下,尽可能少地分配线程

    1
    2
    3
    4
    5
    6
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }
  2. SingleThreadPool:单线程池【,使用的是无界阻塞队列LinkedBlockingQueuecorePoolSize == maximumPoolSize == 1keepAliveTime == 0。适用于串行执行任务的场景

    1
    2
    3
    4
    5
    6
    7
    8
    public static ExecutorService newSingleThreadPool(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService(
    new ThreadPoolExecutor(
    1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory));
    }
  3. CachedThreadPool:可变大小线程池【,使用的是SynchronousQueuecorePoolSize == 0maximumPoolSize == Integer.MAX_VALUEkeepAliveTime为60s。适用于并发执行大量短期的小任务

    1
    2
    3
    4
    5
    6
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>(),
    threadFactory);
    }
  4. ScheduledThreadPool:定时任务线程池【,使用的是DelayQueuemaximumPoolSize == Integer.MAX_VALUEkeepAliveTime == 0。适用于周期性执行任务的场景

    1
    2
    3
    4
    public static ExecutorService newScheduledThreadPool(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    new DelayedWorkQueue());
    }

异常处理

在使用线程池处理任务时,任务代码可能抛出RuntimeException,抛出异常后,线程池可能捕获它,也可能创建一个新的线程来代替异常的线程,我们无法感知任务出现了异常,所以需要考虑异常处理。
比如下面这段代码,明显会有NullPointerException,但是使用线程池执行却并不会进行抛出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName());
Object obj = null;
System.out.println(obj.toString()); //NullPointerException
});
}
//输出(不会抛出异常):
//pool-1-thread-2
//pool-1-thread-1
//pool-1-thread-3
//pool-1-thread-4
//pool-1-thread-5
  1. 直接在任务代码中try-catch捕获异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName());
try {
Object obj = null;
System.out.println(obj.toString()); //NullPointerException
}catch (NullPointerException e) {
System.out.println("NullPointerException!");
}
});
}
//pool-1-thread-1
//pool-1-thread-4
//NullPointerException!
//NullPointerException!
//pool-1-thread-3
//NullPointerException!
//pool-1-thread-2
//NullPointerException!
//pool-1-thread-5
//NullPointerException!
  1. 通过Future.get()获取处理结果,包括异常,在Future中进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
//构建Future对象,接收submit()结果
Future future = threadPool.submit(() -> {
System.out.println(Thread.currentThread().etName());
Object obj = null;
System.out.println(obj.toString()); //NullPointerException
});
//通过Future.get()接收抛出的异常
try{
future.get();
}catch (Exception e){
System.out.println("Exception!");
}
}
//pool-1-thread-1
//Exception!
//pool-1-thread-2
//Exception!
//pool-1-thread-3
//Exception!
//pool-1-thread-4
//Exception!
//pool-1-thread-5
//Exception!
  1. 为工作者线程设置UncaughtExceptionHandler,在UncaughtExceptionHandler方法中处理异常
1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((t1, e) -> {
System.out.println(t1.getName() + " 线程抛出的异常: " + e);
});
return t;
});
threadPool.execute(() -> {
Object obj = null;
System.out.println(obj.toString());
});
//Thread-0 线程抛出的异常: java.lang.NullPointerException
  1. 重写ThreadPoolExecutor.afterExecute()方法,处理传递的异常引用

关闭线程池

  1. shutdown():平缓关闭线程池。不再接收新任务,但是原有任务会执行完成
  2. shutdownNow():强制关闭线程池。不再接收新任务,原有任务也会强制中止

(四)线程优先级

设置线程优先级可能会导致部分线程由于长时间无法获得时间片而“饿死”,所以一般情况下不要设置线程优先级,除非是严重依赖线程优先级别的任务,比如权重。

  • 优先级从0到10,默认都是5

(五)用户线程 & 守护线程

  1. 用户线程(User Thread):运行于前台,执行具体的任务,比如main线程
  2. 守护线程(Daemon Thread):运行于后台,为用户线程服务,比如GC线程

区别

当JVM中没有运行的用户线程,那么JVM就会退出,守护线程会随着JVM一起结束工作——也就是说,守护线程不影响JVM的退出


(六)线程死锁

条件

  1. 互斥条件
  2. 请求与保持:持有资源不放弃,然后申请别人持有的资源
  3. 不剥夺:线程已获得的资源不会被强行剥夺
  4. 循环等待

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//持有lock1,等待lock2
new Thread(() -> {
synchronized (lock1) {
System.out.println(Thread.currentThread() + " get lock1.");
try{
Thread.sleep(1000);
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " waits lock2.");
synchronized (lock2) {
System.out.println(Thread.currentThread() + " get lock2.");
}
}
}).start();

//持有lock2,等待lock1
new Thread(() -> {
synchronized (lock2) {
System.out.println(Thread.currentThread() + " get lock2.");
try{
Thread.sleep(1000);
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + " waits lock1.");
synchronized (lock1) {
System.out.println(Thread.currentThread() + " get lock1.");
}
}
}).start();

避免死锁

  1. 一次性申请所有资源

    类似于MySQL中避免死锁采用的在一个事务中申请所有需要用到的锁

  2. 如果申请不到需要的资源,就主动放弃已经持有的资源

    参考事务的回滚

  3. 按照一定的顺序获取资源

    比如说:

  4. 通过比较两个锁的hashCode()来决定获取锁的顺序

  5. 如果两个锁的hashCode()恰巧一致,那么就引入第三者的“加时赛锁(tie lock)”,代码如下

1
2
3
4
5
6
//lock1.hashCode() == lock2.hashCode()
synchronized(tieLock) { //加时赛锁
synchronized(lock1) {
synchronized(lock2) {...}
}
}

(七)并发 & 线程安全

1.synchronized

synchronizedJVM级别的锁同步机制,可以保证被它修饰的方法/代码块在任意时刻只能有一个线程执行。

原理:monitor

synchronized的原理是获取对象的监视器锁,也称为monitor

在Java中,每一个对象都有对应的监视器锁(monitor锁),存储在对象头中,同一时刻只能有一个线程获取对象的monitor锁。执行monitorenter字节码指令就是试图获取对象monitor锁,如果获取失败,线程进入BLOCKED状态,阻塞等待锁;通过执行monitorexit指令释放锁

  1. synchronized修饰实例方法:锁住当前对象实例
  2. synchronized修饰static方法:由于static方法属于类,所以synchronized修饰static方法,锁住的是类的Class对象
  3. synchronized修饰代码块:指定加锁对象,对给定对象加锁

synchronized vs ReenterantLock

  1. synchronizedReentrantLock都是可重入锁

    可重入锁:线程可以获取自己已经获取了没释放的锁

  2. synchronized依赖于JVM,锁优化不会直接暴露;而ReentrantLock依赖于API,可以直接查看源代码

  3. ReentrantLock提供了一些高级功能,包括:①等待可中断;②公平锁;③锁可以绑定多个条件,实现选择性通知

  • Lock.lockInterruptibly()可以实现可中断等待锁——线程可以主动放弃等待锁,转而处理其他事务
  • ReentrantLock(boolean fair)可以实现公平锁

    非公平锁:多个线程竞争锁时,锁空闲时哪个线程是就绪的就将锁给哪个线程
    公平锁:多个线程竞争锁时,严格按照线程申请锁的顺序来分配锁。

公平锁的坏处:
1.线程的挂起和恢复需要消耗很多资源
2.等待线程的恢复会增加时延
所以Java一般都是非公平锁

  • ReentrantLock配合Condition可以实现条件锁
  1. 性能已不再是选择标准。

    JDK6对synchronized加入了大量的优化机制——自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁,所以现在synchronized的执行速度并不会比Lock慢,相反的,由于它是JVM级别的锁机制,所以应该优先考虑,除非需要定时锁、主动释放锁、公平锁、条件锁这种无法通过synchronized实现的功能。


锁优化

synchronized锁存在4种状态——无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态,它们会随着竞争的激烈而升级。

注意:锁可以升级,不可以降级。这是为了提高获取锁和释放锁的效率

1.偏向锁

锁会偏向于第一次获取它的那个线程——如果在接下来的执行过程中,该锁没有被其他线程获取,那么持有偏向锁的线程就不需要同步!

试想你追到了女朋友,你是她的初恋,在接下来的一段时间里,如果没有你没有被挖墙脚,那么你就不需要重新追她。
就是俗话说的,第一次是珍贵的,记忆弥深嘛(●ˇ∀ˇ●)

2.轻量级锁

关闭偏向锁、或者多个线程竞争偏向锁,会导致偏向锁升级为轻量级锁。

原理:CAS

轻量级锁被获取后会膨胀为重量级锁,阻塞其他线程。

3.自旋锁 & 自适应自旋锁
  • 一般锁的等待时间都不会太长,为了这一点时间去挂起线程/恢复线程是得不偿失的,所以JVM引入了自旋锁,让线程执行忙自选,等待其他线程释放锁,而不是直接挂起。
  • 自旋次数一般默认10,可以通过-XX:PreBlockSpin来修改
  • 自适应自旋锁的自旋时间不再是固定的,而是综合前一次同一个锁上的自旋时间以及当前持有锁的线程状态来决定的
4.锁消除

对于不可能出现竞争的共享数据,直接将锁消除,避免无意义的申请锁。

5.锁粗化

JVM将多个锁合并为一个锁来获取。


2.volatile

原理

  • 在Java内存模型中,线程将变量保存在本地的工作内存中进行读写,然后才同步更新到主内存
  • 数据不一致:一个线程在主内存中修改了一个变量的值,但是另一个线程还继续使用它在工作内存中的变量副本
  • volatile是作用是:通知JVM变量是不稳定的,所有使用该共享变量的线程每次使用时都要在共享内存(即:主内存)中读取最新值,对其修改必须同步刷新回共享内存,从而保证所有线程的可见性

作用

  1. 可见性(volatile变量的修改会在所有用到它的线程种看到
  2. 防止指令重排序

    volatile不具备原子性,因此不要将volatile用在getAndOperate场合,仅仅set或者get的场景是适合volatile

应用

  1. boolean状态变量要求改变后立即可见
  2. double-check

3.ThreadLocal(线程隔离)

ThreadLocal变量在每个线程内创建一个副本,这样一个线程的更改不会影响到另一个线程的变量值

使用

  1. 创建ThreadLocal变量:ThreadLocal<T> tl = new ThreadLocal<T>();
  2. 获取ThreadLocal变量值:T tl.get()
  3. 设置ThreadLocal变量值:tl.set(T t)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ThreadLocalTest {
//定义ThreadLocal变量
private static ThreadLocal<Long> longThreadLocal = new ThreadLocal<Long>(){};
private static ThreadLocal<String> stringThreadLocal = new ThreadLocal<String>(){};

//getter
private Long getLong(){
return longThreadLocal.get();
}

private String getString(){
return stringThreadLocal.get();
}

//setter
private void setThreadLocals(){
longThreadLocal.set(Thread.currentThread().getId());
stringThreadLocal.set(Thread.currentThread().getName());
}

public static void main(String[] args) throws InterruptedException{
ThreadLocalTest test = new ThreadLocalTest();
test.setThreadLocals();
System.out.println("主线程ID: " + test.getLong());
System.out.println("主线程名称: " + test.getString());

Thread subThread = new Thread(() -> {
test.setThreadLocals();
System.out.println("子线程ID: " + test.getLong());
System.out.println("子线程ID: " + test.getString());
});
subThread.start();
subThread.join();

System.out.println("主线程ID: " + test.getLong());
System.out.println("主线程名称: " + test.getString());
//输出:
//主线程ID: 1
//主线程名称: main
//子线程ID: 9
//子线程ID: Thread-0
//主线程ID: 1
//主线程名称: main
}

}

原理

首先来看Thread类的源码:

1
2
3
4
5
6
public class Thread implements Runnable{
...

//与此线程有关的ThreadLocal值,由ThreadLocal类维护
ThreadLocal.ThreadLocalMap threadLocals = null;
}

可以看到每个线程对象内有一个threadLocals变量,是ThreadLocalMap类型,可以理解为是给ThreadLocal类定制的HashMap。默认情况下这个变量是空,只有调用ThreadLocalget()/set()才会创建。

1
2
3
4
5
6
7
8
9
10
11
//ThreadLocal.set()
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if(map != null) map.set(this, value);
else createMap(t, value);
}

ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

可以看到,最终变量没有存在ThreadLocal上,而是存在了线程对象对应的ThreadLocalMap中,ThreadLocal相当于存储了一个从ThreadLocal到线程中实际变量的映射也就是说,ThreadLocal本身不存储值,而是作为Thread.ThreadLocalMap.Entry<K, V>中实际存储内容的一个key弱引用


【阿里-政务钉钉-一面】的时候被问到。


内存泄漏问题

ThreadLocalMap中的key是一个指向ThreadLocal实例的弱引用(下一次GC时就会被回收),而value则是一个指向实际存储对象obj的强引用。所以,如果ThreadLocal没有被外部强引用的情况下,GC时key会被清理掉,而value不会。这样一来,ThreadLocalMap中就会出现key为null的Entry。假如我们不做任何措施的话,value永远无法被GC 回收,这个时候就可能会产生内存泄露

强引用:只有对象被判断死亡才会回收;
弱引用:下一次GC就被回收


解决方法
  1. 每次get()/set()/remove()都会清除Entry[]key = null的记录
  2. 每次使用完ThreadLocal后手动调用remove(),会清理掉key = null的记录

既然弱引用key会造成内存泄漏,为何不用强引用?

  1. 我们来看上图,栈中的ThreadLocalRef实例(我们称其为tl吧)指向ThreadLocal的是一个强引用,而线程中的ThreadLocalMap.Entry.key指向ThreadLocal的只是一个弱引用。如果我们想要回收ThreadLocal,只需要通过tl = null;释放掉tl指向ThreadLocal的强引用即可,这样下一次GCThreadLocal就会被回收了
  2. 但是如果将ThreadLocalMap.Entry.key设置为强引用,当通过tl = null;释放掉tl指向ThreadLocal的强引用后,key指向ThreadLocal的强引用会导致ThreadLocal无法被GC。除非线程生命周期结束了,ThreadLocalMap随之回收,ThreadLocal才能被释放。但是项目中往往会用线程池复用线程,这样ThreadLocal就永远不会被回收了,造成无法解决的内存泄漏

4.Atomic原子类

j.u.c.atomic

AtomicInteger的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//获取当前值
public final int get();
//获取当前值,并设置为新值
public final int getAndSet(int newVal);
//获取当前值,并自增
public final int getAndIncrement();
//获取当前值,并自减
public final int getAndDecrement();
//获取当前值,加上预期值
public final int getAndAdd(int delta);
//CAS:如果当前值为预期值,则以原子方式设置为输入值
boolean compareAndSet(int expect, int update);
//延迟设置
public final void lazySet(int newVal);

原理

CAS + volatile

CAS存在的ABA问题解决措施:为数据加上版本号version,每次修改成功后version += 1,这样可以保证A->B->A的版本号不同于最开始,从而得知A被修改了
j.u.c.atomic中的AtomicStampedReference就是解决这个问题的,通过AtomicStampedReference#getStamp()即可获取版本号


5.AQS:抽象队列同步器

AQSjava.util.concurrent.locks.AbstractQueuedSynchronizer,是一个用于构建锁和同步器的框架,比如ReentrantLock/Semaphore/ReentrantReadWriteLock/SynchronousQueue/FutureTask等都是基于AQS的。

原理

  • 总结起来就是:FIFO队列 + volatile state变量

    需要通过volatile保证共享资源的可见性,让其他线程知道状态的更改。

  • 如果被请求的共享资源(volatile state)空闲,那么将当前请求资源的线程设置为工作线程,并将共享资源设置为锁定状态;如果当前请求资源被锁定,就将请求线程加入FIFO等待队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private volatile int state; //共享变量,通过volatile保证线程可见性

//返回同步状态的当前值
protected final int getState() {
return state;
}

// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}

//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS资源共享方式

1.独占(Exclusive)

只能有一个线程获取资源。
ReentrantLock

公平:严格按照FIFO队列顺序获取资源;
非公平:谁抢到算谁的

1
2
boolean tryAcquire(int)     //独占方式获取资源
boolean tryRelease(int) //独占方式释放资源
2.共享(Share)

多个线程可同时获取资源。
Semaphore/CountDownLatch/CyclicBarrier/ReentrantReadWriteLock

1
2
3
4
5
6
int tryAcquireShared(int)     //共享方式获取资源
//负数表示失败;
//0表示成功,但没有剩余可用资源;
//正数表示成功,有剩余可用资源

boolean tryReleaseShared(int) //共享方式释放资源
3.自定义

自定义AQS就是继承AQS,然后重载对应的方法,决定是独占还是共享地获取资源,当然也可以有组合式的,比如ReadWriteLock


AQS组件

1.信号量Semaphore

创建Semaphore时给定一个资源值,可以同时有这么多个线程访问资源,默认是非公平的

API
  • new Semaphore(int permit)初始化信号量,同一时刻允许permit个线程访问资源
  • new Semaphore(int permits, boolean fair):初始化信号量,同一时刻允许permit个线程访问资源,并指定是公平还是非公平
  • Semaphore # acquire()尝试获取资源拿走一个许可证。若无法获取,则线程会等待,直到有线程释放一个许可证或者当前线程被中断
  • Semaphore # acquireUninterruptibly():和acquire()类似,但是不响应中断,也就是说它会一直等待直到有线程释放许可证
  • Semaphore # release()释放资源,增加一个许可证
  • Semaphore # tryAcquire()尝试获取资源,如果没有立刻返回false,不会进行等待

    抛出InterruptedException

demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//Semaphore:同时允许多个线程访问资源,Share型,默认非公平
public class SemaporeTest {

//请求数量
private static final int threadCount = 550;

//抛出InterruptedException
public static void main(String[] args) {
//线程池——这种方法不推荐
ExecutorService threadPool = Executors.newFixedThreadPool(300);
//同时允许20个线程访问
final Semaphore semaphore = new Semaphore(20);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {
try{
semaphore.acquire();
test(threadnum);
semaphore.release();
}catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPool.shutdown(); //平缓关闭
System.out.println("Finish!");
}

//模拟线程获取资源后进行耗时的工作
private static void test(int threadnum) throws InterruptedException{
Thread.sleep(1000);
System.out.println("threadnum:"+threadnum);
Thread.sleep(1000);
}
}

2.倒计时器CountDownLatch

让某个线程等待倒计时结束后执行

一般来说,死循环while(true){ if(...) break; }的代码,用CountDownLatch进行取代会更加优雅

API
  • new CountDownLatch(int count)初始化倒计时器,倒计时count下后允许等待线程执行
  • CountDownLatch.countDown()倒计时-1
  • CountDownLatch.await()等待倒计时器执行完毕的线程,当倒计时器到0会唤醒等待线程

    抛出InterruptedException

demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//CountDownLatch:倒计时器——Share型,默认非公平
public class CountDownLatchExample {
//请求数量
private final static int threadCount = 550;

public static void main(String[] args) throws InterruptedException{
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(300);
//创建倒计时器
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {
try{
test(threadnum);
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
//倒计时器-1
countDownLatch.countDown();
}
});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish!");
}

//模拟线程获取倒计时器后的耗时操作
private static void test(int threadnum) throws InterruptedException{
Thread.sleep(1000);
System.out.println("threadnum:"+threadnum);
Thread.sleep(1000);
}
}

3.循环栅栏CyclicBarrier

让一组线程到达一个同步点时被阻塞,直到最后一个线程到达同步点,栅栏才会开启。

应用

多线程计算数据,然后合并结果

API
  • new CyclicBarrier(int count)这一组线程的个数
  • new CyclicBarrier(int parties, Runnable barrierAction)当一组线程到达后,优先执行barrierAction方法
  • CyclicBarrier.await(long timeout, TimeUnit tu)在每个线程内调用,阻塞等待一组线程全部执行完

    抛出InterruptedExceptionBrokenBarrierException

demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class CyclicBarrierExample {
private final static int threadCount = 550;
//循环栅栏,一组5个线程
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
() -> {
System.out.println("一组线程执行完,优先调用");
});

public static void main(String[] args) {
//线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {
try{
test(threadnum);
}catch (Exception e) {
e.printStackTrace();
}
});
}
threadPool.shutdown(); //关闭
}

private static void test(int threadnum) {
System.out.println("threadnum:"+threadnum+" is ready");
try{
cyclicBarrier.await(60, TimeUnit.SECONDS);
}catch (Exception e) {
e.printStackTrace();
}
System.out.println("threadnum:"+threadnum+" is finish");
}
}

6.并发容器

(1)ConcurrentHashMap

线程安全的HashMap

  1. 读操作几乎不需要加锁
  2. 写操作是分段锁

(2)CopyOnWriteArrayList

线程安全的List,在读多写少的场景效率远远高于Vector,比如白名单

原理

对原有数据进行一次复制,直接在副本中进行修改,修改后用副本替代原有数据

  1. 由于存储元素的array数组是不会被修改的,所以读操作可以不加锁
  2. 写操作在复制新数组时加锁,避免多线程创建多个副本
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    //读
    private transient volatile Object[] array;
    public E get(int index) {
    return get(getArray(), index);
    }
    private E get(Object[] a, int index) {
    return (E) a[index];
    }
    final Object[] getArray() {
    return array;
    }

    //写
    public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock(); //加锁
    try{
    Object[] elements = getArray();
    int len = elements.length;
    Object[] newElements = Arrays.copyOf(elements, len + 1); //拷贝副本
    newElements[len] = e; //在副本中添加
    setArray(newElements);
    return true;
    }finally{
    lock.unlock(); //解锁
    }
    }

(3)ConcurrentLinkedQueue:非阻塞队列

  • 并发队列,可以看作是线程安全的LinkedList,是非阻塞队列
  • 使用CAS非阻塞算法实现线程安全
  • 适用场景:需要线程安全的队列,但是对队列加锁的成本计较高

(4)BlockingQueue:阻塞队列接口

应用:
“生产者-消费者” —— 当队列已满,生产者线程会被阻塞,直到队列未满;当队列为空,消费者线程会被阻塞,直到队列非空

实现类:

  1. ArrayBlockingQueue有界队列,底层是数组,默认是非公平模式
  2. LinkedBlockingQueue可以有界也可以无界,底层是单向链表
  3. PriorityBlockingQueue支持优先级的无界阻塞队列,如果空间不够会自动扩容

(5)ConcurrentSkipListMap

跳表,便于快速查找,以空间换时间,时间复杂度是O(lgN)

-------------本文结束感谢您的阅读-------------

本文标题:Java多线程 、并发

文章作者:DragonBaby308

发布时间:2019年08月21日 - 22:51

最后更新:2020年02月08日 - 17:17

原始链接:http://www.dragonbaby308.com/multithread/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

急事可以使用右下角的DaoVoice,我绑定了微信会立即回复,否则还是推荐Valine留言喔( ఠൠఠ )ノ
0%