線程通信的幾種方式 線程通信的三種方式



文章插圖
線程通信的幾種方式 線程通信的三種方式

文章插圖
簡介:
線程開始運行,擁有自己的??臻g,就會如同一個腳本一樣,按照既定的代碼一步步的執行,直到終止 。但是,如果每個線程之間都是孤立的,那么它們的價值就會很少;反之,如果多個線程能夠配合著完成工作,將會帶來各方面巨大的收益 。
1、volatile和synchronized關鍵字
說明:(不做過多說明,需要的話可以看我的往期)
Java支持多線程訪問一個對象或者對象的成員變量,由于每個線程都擁有這個變量的拷貝(為了執行速度更快),所以程序執行過程中讀取的數據往往不是最新的 。
關鍵字volatile可以用來修飾字段(成員變量),作用通俗來講就是告知程序任何對該變量的訪問均需要從共享內存中獲取,而對它的改變必須同步刷新到共享內存中,volatile能保證線程對變量的可見性 。
關鍵字synchronized可以修飾方法或者同步代碼塊的形式來進行使用,它主要能確保多個線程在同一時刻,只有一個線程處于方法或者同步代碼塊中,它保證了線程對變量訪問的可見性和排他性 。
通過使用javap工具查看生成class文件信息來分析下synchronized關鍵字的實現細節,如下代碼是使用了同步塊和同步方法 。
代碼示例:
package com.lizba.p3;/** * <p> *同步方法和同步代碼塊示例代碼 * </p> * * @Author: Liziba * @Date: 2021/6/15 22:13 */public class Synchronized {public static void main(String[] args) {// 同步代碼塊synchronized (Synchronized.class) {}// 靜態方法method();}public static synchronized void method() {}}在Synchronized.class同級目錄執行javap -v Synchronized.class
javap -v Synchronized.class重點關注部分輸出:
同步代碼塊使用monitorenter和monitorexit指令
同步方法使用了ACC_SYNCHRONIZED
總結:
同步代碼塊和同步方法使用了不同的方式來加鎖,其本質上都是對一個對象的監視器(monitor)獲取,而這個獲取的過程是排他的,也就是說同一時刻只會有一個線程獲取由synchronized所保護的監視器 。我們知道,任意一個對象都擁有自己的監視器鎖,當這個對象由同步代碼塊或者這個對象的同步方法調用時,執行方法的線程必須先獲取到該線程對象的監視器鎖才能進入同步塊或者同步方法,而沒有獲取到監視器(執行該方法)的線程將會被阻塞在同步代碼塊和同步方法的入口處,進入BLOCKED狀態 。
圖示對象、對象的監視器、同步隊列和執行線程之間的關系
總結上圖:
任意線程對Object(受Synchronized保護)的訪問,首先要獲取Object的監視器 。如果獲取失敗則進入同步隊列,線程變為BLOCKED 。當訪問Object的前驅(獲得了鎖的線程)釋放了鎖,則該釋放操作喚醒阻塞在同步隊列中的線程,使其重新嘗試對監視器的獲取 。
2、等待通知機制
一個線程修改了一個對象的值,另一個對象感知到其的變化,然后進行相應的操作,這種類似于生產者-消費者模式的功能,在Java線程之間是怎么實現的呢?
最簡單的做法:
// 使用while循環檢測變量的值while (flag) {// 防止一直執行,未滿足條件進行短暫睡眠Thread.sleep(1000);}doSomething();上述代碼存在問題:
不能確保及時性,通過睡眠的方式來釋放處理器資源,會導致時效性問題難以降低開銷,通過降低睡眠的時間來提升時效性又會帶來過高的處理器資源開銷
Java內置解決辦法:
以上兩個看似矛盾的問題,卻可以通過Java內置的等待/通知機制很好的得以解決,等待/通知機制是Java任意對象都具備的,因為這些方法被定義在對象的超類Object中 。
public final native void notify();public final native void notifyAll();public final void wait() throws InterruptedExceptionpublic final native void wait(long timeout) throws InterruptedException;public final void wait(long timeout, int nanos) throws InterruptedException
方法名稱描述notify()通知一個在對象上等待的線程,使其從wait()方法返回,而返回的前提是該線程獲取到了對象的鎖notifyAll()通知所有等待在該對象上的線程wait()調用方的線程進入WAITING狀態,只有等待其他線程的通知或被中斷才會返回,需要注意,調用wait()方法會釋放鎖wait(long)超時等待一段時間,如果時間到沒有通知就超時返回 。單位mswait(long, int)對于超時時間做更加細粒度的控制可以精確到納秒
等待/通知機制描述:
等待/通知機制是指一個線程A調用了對象O的wait()方法進入等待狀態,另一個線程B調用了對象O的notify()或notifyAll()方法,線程A收到通知后從對象O的wait()方法返回,進而執行后續操作 。對象O上的wait()和notify()/notifyAll()就好比一個開關信號,用來完成等待方和通知方的交互工作(就好比一開始說的生產者-消費者模型)
示例代碼:
package com.lizba.p3;import com.lizba.p2.SleepUtil;import java.text.SimpleDateFormat;import java.util.Date;/** * <p> *wait()和notify()/notifyAll()示例代碼 * </p> * * @Author: Liziba * @Date: 2021/6/15 23:28 */public class WaitNotify {static boolean flag = true;static Object lock = new Object();static final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");public static void main(String[] args) {Thread waitThread = new Thread(new Wait(), "waitThread");waitThread.start();SleepUtil.sleepSecond(1);Thread notifyThread = new Thread(new Notify(), "notifyThread");notifyThread.start();}/*** wait線程,當條件不滿足時wait()*/static class Wait implements Runnable{@Overridepublic void run() {// 加鎖synchronized(lock) {// 當條件不滿足時,繼續waitwhile (flag) {System.out.println(Thread.currentThread()+ " flag is true. wait at " +sdf.format(new Date()));try {// 此操作會釋放鎖lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}// 滿足條件是完成工作System.out.println(Thread.currentThread()+ " flag is false. finished at " + sdf.format(new Date()));}}}static class Notify implements Runnable {@Overridepublic void run() {// 加鎖synchronized (lock) {// 獲取到鎖或通知等待在鎖上的線程// 通知不會釋放鎖,直到當前線程執行完釋放lock鎖后,waitThread才能從wait方法返回System.out.println(Thread.currentThread()+ "hold lock. notify at " + sdf.format(new Date()));lock.notifyAll();flag = false;SleepUtil.sleepSecond(5);}// 再次加鎖synchronized (lock) {System.out.println(Thread.currentThread()+ "hold lock again. notify at " + sdf.format(new Date()));SleepUtil.sleepSecond(5);}}}}查看輸出:
注意上述的hold lock again 和 flag is flase這兩行代碼可能執行順序會互換 。
總結:
【線程通信的幾種方式 線程通信的三種方式】使用wait()、notify()和notifyAll()需要先對該對象加鎖調用wait()方法后線程由RUNNING狀態變為WAITING狀態,并且將當前線程放置到對象的等待隊列中notify()方法和notifyAll()調用后,等待的線程需要等到調用notify()和notifyAll()的線程釋放鎖后,等待隊列中的線程才有機會從wait()返回notify()移動一個線程從等待隊列到同步隊列,notifyAll()移動所有等待線程,過程是將線程從等待隊列移動到同步隊列中,被移動的線程由WAITING變為BLOCKED狀態從wait()方法返回的前提是獲取了對象的鎖wait()、notify()和notifyAll()機制依賴的是同步機制,其目的是為了從wait()方法返回的線程能感知到其他線程對變量作出的修改
圖示上述過程:
總結上圖:
WaitThread線程首先獲取了鎖,然后調用對象的wait()方法,從而釋放了鎖進入對象的等待隊列WaitQueue中,進入等待狀態 。由于WaitThread釋放了對象的鎖,NotifyThread隨后獲取了對象的鎖,并且調用了對象的notify()方法,將處于等待隊列WaitQueue的WaitThread移動到了SynchronizedQueue中,此時WaitThread的狀態變為阻塞狀態 。NotifyThread釋放了鎖之后,WaitThread再次獲取到鎖從wait()方法返回繼續執行 。
3、等待/通知的經典范式
等待/通知的經典范式,分為等待方和通知方,這兩者需要分別遵循如下規則 。
等待方遵循如下規則:
獲取對象的鎖如果條件不滿足,那么調用對象的wait()方法,被通知后仍要檢查條件條件滿足則執行對應的邏輯
// 示例等待方偽代碼synchronized(對象) {while(條件不滿足) {對象.wait();}// ToDo...}通知各方遵循如下規則:
獲取對象的鎖改變條件通知所有等待在對象身上的線程
// 示例通知方偽代碼synchronized(對象) {改變條件對象.notifyAll();}4、管道輸入/輸出流
管道輸入/輸出流和普通文件輸入/輸出流或者網絡輸入/輸出流的不同之處在于,管道輸出/輸出流主要用于線程之間的數據傳輸,傳輸的媒介為內存 。
管道輸入/輸出流的具體實現:
PipedInputStreamPipedOutputStreamPipedReaderPipedWriter
1、2為字節流,3、4為字符流 。
示例代碼:
package com.lizba.p3;import java.io.IOException;import java.io.PipedReader;import java.io.PipedWriter;/** * <p> *管道流 * </p> * * @Author: Liziba * @Date: 2021/6/16 21:07 */public class Piped {public static void main(String[] args) throws IOException {PipedWriter out = new PipedWriter();PipedReader in = new PipedReader();// 輸入輸出流連接(不連接會報錯)out.connect(in);Thread printThread = new Thread(new Print(in), "PrintThread");printThread.start();// 輸入int receive = 0;try {while ((receive = System.in.read()) != -1) {out.write(receive);}} finally {out.close();}}/*** 單個字符讀取并輸出**/static class Print implements Runnable {private PipedReader in;public Print(PipedReader in) {this.in = in;}@Overridepublic void run() {int receive = 0;try {while (true) {// 單個字符讀取if ((receive = in.read()) != -1){System.out.print((char)receive);}}} catch (IOException e) {e.printStackTrace();}}}}測試代碼樣例:
## 輸入hello liziba## 輸出hello liziba5、Thread.join()
Thread.join()的語義含義:當前線程A等待Thread線程終止之后才從Thread.join()處返回 。線程提供的join()方法的api如下:
public final void join() throws InterruptedException// 下面兩個具有超時等待,線程再給定的時間沒有返回,那么超時的方法會返回public final synchronized void join(long millis, int nanos)public final synchronized void join(long millis)示例代碼:
設置十個線程,分別從0-9,每個線程需要調用前一個線程的join()方法,比如線程0結束了,線程1才能從join()返回線,程1結束了,線程2才能從join()返回 。
package com.lizba.p3;import com.lizba.p2.SleepUtil;import java.util.concurrent.TimeUnit;/** * <p> *join()等待通知機制 * </p> * * @Author: Liziba * @Date: 2021/6/16 21:25 */public class Join {public static void main(String[] args) {// 前一個線程Thread previous = Thread.currentThread();for (int i = 0; i < 10; i++) {Thread t = new Thread(new Domino(previous), String.valueOf(i));t.start();previous = t;}SleepUtil.sleepSecond(5);System.out.println(Thread.currentThread().getName() + " end.");}static class Domino implements Runnable {private Thread thread;public Domino(Thread thread) {this.thread = thread;}@Overridepublic void run() {try {thread.join();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " end.");}}}查看輸出結果:
總結上述代碼:
每個線程終止的前提是前驅線程的終止,每個線程等待前驅線程終止后,才從join()返回,這里涉及了等待/通知機制,具體原理我們可以通過看JDK的源碼來了解:
public final synchronized void join(long millis)throws InterruptedException {long base = System.currentTimeMillis();long now = 0;if (millis < 0) {throw new IllegalArgumentException("timeout value is negative");}// 超時等待時間未設置則為0,也就是join()方法if (millis == 0) {// 判斷當前線程是否終止while (isAlive()) {// 如果未終止,繼續wait()wait(0);}} else {// 判斷當前線程是否終止while (isAlive()) {long delay = millis - now;// 判斷超時等待時間是否已經到了,如果到了則返回if (delay <= 0) {break;}// 否則繼續等待,計算新的時間傳入wait(delay);now = System.currentTimeMillis() - base;}}}// 嘗試判斷當前線程時候已經執行完畢(是否還活著)public final native boolean isAlive();6、ThreadLocal的使用
本文不會詳細講述ThreadLocal的核心原理,之后簡單的介紹ThreadLocal的使用,后續會單獨分一篇文章來詳述其原理和使用 。
ThreadLocal即線程變量,它是以ThreadLocal對象為鍵、任意對象為值的存儲結構 。這個存儲結構可以附帶在線程上,我們可以通過一個ThreadLocal對象來查詢綁定在這個線程上的一個值 。
示例代碼:
如下代碼構造一個計算方法調用時間計算的類 。
package com.lizba.p3;import com.lizba.p2.SleepUtil;/** * <p> * * </p> * * @Author: Liziba * @Date: 2021/6/16 22:04 */public class Profiler {private static final ThreadLocal<Long> TIME_THREAD_LOCAL = new ThreadLocal<Long>() {@Overrideprotected Long initialValue() {return System.currentTimeMillis();}};public static final void begin() {TIME_THREAD_LOCAL.set(System.currentTimeMillis());}public static final Long end() {return System.currentTimeMillis() - TIME_THREAD_LOCAL.get();}public static void main(String[] args) {Profiler.begin();SleepUtil.sleepSecond(1);System.out.println("Cost: " + Profiler.end());}}查看執行結果: