Javaマルチスレッド研究ノート(プログラミングの洞窟-Java Muliti-スレッド)



Java Multithreading Study Notes



1.開始スレッドスレッドを開始する2つの方法a。スレッドクラスを拡張し、実行メソッドをオーバーライドしますb。 Runnableインターフェースを実装し、それをスレッドコンストラクターのパラメーターとして渡しますclass RunnerThread extends Thread{ public void run(){ //to do some action } } class RunnerRunnable implements Runnable{ public void run(){ //to do some action } } public class BasicApp { public static void main(String[] args) { RunnerThread t1 = new RunnerThread() t1.start() RunnerRunnable runnerRunnable = new RunnerRunnable() Thread t2 = new Thread(runnerRunnable) Thread t3 = new Thread(new Runnable(){ public void run(){ //to do some action } }) t2.start() t3.start() } }
2. Basic-Thread-Synchronization volatileキーワードは、キャッシュ内の値が更新されることを保証しますimport java.util.Scanner class Runner implements Runnable{ volatile boolean runFlag = true public void run() { while(runFlag){ System.out.println('Running') try { Thread.sleep(500) } catch (InterruptedException e) { e.printStackTrace() } } } public void setRunFlag(boolean runFlag){ this.runFlag = runFlag } } public class App2 { public static void main(String[] args) { Scanner scan = new Scanner(System.in) Runner runner = new Runner() Thread t1 = new Thread(runner) t1.start() scan.nextLine() runner.setRunFlag(false) scan.close() } }

3.同期されたキーワード

public class App3 { private int count = 0 synchronized private void increment(){ count++ } public static void main(String[] args) throws InterruptedException { App3 app3 = new App3() app3.doWork() } private void doWork() throws InterruptedException{ Thread t1 = new Thread(new Runnable() {public void run() {for(int i=0 i<1000 i++) increment()}}) Thread t2 = new Thread(new Runnable() {public void run() {for(int i=0 i<1000 i++) increment()}}) t1.start() t2.start() t1.join() t2.join() System.out.println('Count='+ count) } }

4.同期されたコードブロックを使用した複数のロック



class Process implements Runnable{ private ArrayList list1 = new ArrayList() private ArrayList list2 = new ArrayList() private Random random = new Random() private Object obj1 = new Object() private Object obj2 = new Object() private void addList1() throws InterruptedException{ synchronized(obj1){ list1.add(random.nextInt(100)) } Thread.sleep(1) } private void addList2() throws InterruptedException{ synchronized(obj2){ list2.add(random.nextInt(100)) } Thread.sleep(1) } public void run(){ for(int i=0 i<1000 i++){ try { addList1() addList2() } catch (InterruptedException e) { } } } public void showSize(){ System.out.println('List1:'+ list1.size() +',List2:'+ list2.size()) } } public class App4 { public static void main(String[] args) throws InterruptedException { StopWatch stopWatch = new StopWatch('App4') stopWatch.start() Process proc = new Process() Thread t1 = new Thread(proc) Thread t2 = new Thread(proc) t1.start() t2.start() t1.join() t2.join() stopWatch.stop() proc.showSize() System.out.println(String.format('App4 takes %d miliseconds', stopWatch.getTotalTimeMillis())) } }演算結果:リスト1:2000、リスト2:2000 App4は2016ミリ秒かかります
同期キーワードなしで結果を実行します。 リスト1:1981、リスト2:1988 App4は2018ミリ秒かかります

同期キーワードをメソッドに追加した結果:リスト1:2000、リスト2:2000

App4はかかります4093ミリ秒

private synchronized void addList1() throws InterruptedException{ list1.add(random.nextInt(100)) Thread.sleep(1) } private synchronized void addList2() throws InterruptedException{ list2.add(random.nextInt(100)) Thread.sleep(1) }

5.スレッドプール



import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.TimeUnit class Runner1 implements Runnable{ private int id public Runner1(final int i){ this.id = i } public void run(){ System.out.println('Starting:'+ id) try { Thread.sleep(2000) } catch (InterruptedException e) { } System.out.println('Complete:'+ id) } } public class App5 { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(2) //Set up 2 thread pools and reuse them to reduce the overhead of creating threads for(int i=0 i<5 i++){ executor.submit(new Runner1(i)) //Submit task } executor.shutdown() //Submit the task ends, wait for the thread to end, and then no more submissions System.out.println('All tasks submitted.') executor.awaitTermination(1, TimeUnit.MINUTES) //If the thread runs for too long, it will end after this time System.out.println('All tasks completed.') } }

6.カウントダウンラッチ

class Process implements Runnable{ private CountDownLatch latch public Process(final CountDownLatch latch){ this.latch = latch } public void run(){ System.out.println('Start, count='+ latch.getCount()) latch.countDown() } } public class App6 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(9) //CountDownLatch class is thread-safe ExecutorService executor = Executors.newFixedThreadPool(3) for(int i=0 i<20 i++){ executor.submit(new Process(latch)) } executor.shutdown() latch.await() //thread will wait until the count of latch become 0 System.out.println('Complete.') } }

演算結果:

開始、カウント= 9 開始、カウント= 8 開始、カウント= 7 開始、カウント= 6 開始、カウント= 5 開始、カウント= 4 開始、カウント= 3 開始、カウント= 2 開始、カウント= 1 開始、カウント= 0 開始、カウント= 0 開始、カウント= 0 開始、カウント= 0 開始、カウント= 0 開始、カウント= 0 開始、カウント= 0 開始、カウント= 0 開始、カウント= 0 開始、カウント= 0 コンプリート。 //常にcount = 0の後 開始、カウント= 0

7.生産者/消費者



BlockingQueueはスレッドセーフであり、最大サイズに達した後に要素が追加されることはなく、サイズが0の場合はこれ以上取得できません。

public class App7 { private static BlockingQueue queue = new ArrayBlockingQueue(10) private static void producer() throws InterruptedException{ Random random = new Random() while(true){ Thread.sleep(500) queue.put(random.nextInt(100)) } } private static void consumer() throws InterruptedException{ Random random = new Random() while(true){ Thread.sleep(100) if(random.nextInt(10) == 0){ int value = queue.take() System.out.println('Taken value='+ value + ', Queue Size='+ queue.size()) } } } public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new Runnable() { public void run() { try { producer() } catch (InterruptedException e) { } } }) Thread t2 = new Thread(new Runnable() { public void run() { try { consumer() } catch (InterruptedException e) { } } }) t1.start() t2.start() t1.join() t2.join() } }

8.待って通知します

package demo1 import java.util.Scanner class Processor8{ public void producer() throws InterruptedException{ System.out.println('Thread start...') synchronized(this){ wait() } System.out.println('Resumed.') } public void consumer() throws InterruptedException{ Thread.sleep(1000) Scanner scan = new Scanner(System.in) System.out.println('Press enter to be continue...') scan.nextLine() synchronized(this){ notify() //Wake up the thread waiting for this object, and the awakened thread can only continue to run after the object is released Thread.sleep(2500) } if(scan!=null)scan.close() } } public class App8 { public static void main(String[] args) throws InterruptedException { final Processor8 proc = new Processor8() Thread t1 = new Thread(new Runnable() { public void run() { try {proc.producer()} catch (InterruptedException e) {} } }) Thread t2 = new Thread(new Runnable() { public void run() { try {proc.consumer()} catch (InterruptedException e) {} } }) t1.start() t2.start() t1.join() t2.join() } }

9.9。低レベルの同期を使用した実例

class Processor9 { private LinkedList list = new LinkedList() private final int LIMIT = 10 private Object lock = new Object() private Random random = new Random() public void producer() throws InterruptedException { int value = 0 while (true) { synchronized (lock) { System.out.println('producer') while (list.size() == LIMIT) { lock.wait() } list.add(value++) lock.notify() } } } public void consumer() throws InterruptedException { while (true) { synchronized (lock) { System.out.println('consumer') while (list.size() == 0) { lock.wait() } System.out.print('List size:' + list.size()) int value = list.removeFirst() System.out.println(', taken value:' + value) lock.notify() } Thread.sleep(random.nextInt(1000)) } } } public class App9 { public static void main(String[] args) throws InterruptedException { final Processor9 proc = new Processor9() Thread t1 = new Thread(new Runnable() { public void run() { try { proc.producer() } catch (InterruptedException e) { } } }) Thread t2 = new Thread(new Runnable() { public void run() { try { proc.consumer() } catch (InterruptedException e) { } } }) t1.start() t2.start() t1.join() t2.join() } }

10.リエントラントロック

import java.util.Scanner import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock class Processor10 { private ReentrantLock lock = new ReentrantLock() Condition cond = lock.newCondition() private int count = 0 public void increment(){ for(int i=0 i<100000000 i++){ count++ } } public void firstThread() { lock.lock() //Analog synchronized keyword System.out.println('Waiting...') try { cond.await() //analog object.wait() System.out.println('Wake up...') increment() } catch (InterruptedException e) { System.out.println('Error1:'+ e.getMessage()) }finally{ lock.unlock() } } public void secondThread() { Scanner scan = null try{ Thread.sleep(1000) lock.lock() //Analog synchronized keyword System.out.println('Press enter to continue...') scan = new Scanner(System.in) scan.nextLine() cond.signal() //analog object.notify() increment() }catch(Exception e){ System.out.println('Error2:'+ e.getMessage()) }finally{ lock.unlock() if(scan!=null)scan.close() } } public void finish(){ System.out.println('Count is:'+ count) } } public class App10 { public static void main(String[] args) throws InterruptedException { final Processor10 proc = new Processor10() Thread t1 = new Thread(new Runnable() { public void run() { proc.firstThread() } }) Thread t2 = new Thread(new Runnable() { public void run() { proc.secondThread() } }) t1.start() t2.start() t1.join() t2.join() proc.finish() } }

11.デッドロック

class Account{ private int deposit = 10000 private void deposit(final int amount){ this.deposit += amount } private void withdraw(final int amount){ this.deposit -= amount } public static void transfer(final Account acc1, final Account acc2, final int amount){ acc1.withdraw(amount) acc2.deposit(amount) } public int getDeposit(){ return this.deposit } } class Runner11{ Random random = new Random() Account acc1 = new Account() Account acc2 = new Account() Lock lock1 = new ReentrantLock() Lock lock2 = new ReentrantLock() public void firstThread(){ for(int i=0 i<1000000 i++){ try{ accquireLock(lock1, lock2) Account.transfer(acc1, acc2, random.nextInt(1000)) }catch(Exception e){ }finally{ lock1.unlock() lock2.unlock() } } } public void secondThread(){ for(int i=0 i<1000000 i++){ try{ accquireLock(lock2, lock1) Account.transfer(acc2, acc1, random.nextInt(1000)) }catch(Exception e){ }finally{ lock1.unlock() lock2.unlock() } } } public void finish(){ System.out.println('Account1 Amount:'+ acc1.getDeposit()) System.out.println('Account2 Amount:'+ acc2.getDeposit()) System.out.println('Total Amount:'+ (acc1.getDeposit() + acc2.getDeposit())) } public void accquireLock(Lock lock1, Lock lock2) throws InterruptedException{ //2 locks are acquired at the same time before returning, otherwise the single lock acquired will be released for other threads boolean lock1Status = false boolean lock2Status = false while(true){ try{ lock1Status = lock1.tryLock() lock2Status = lock2.tryLock() }finally{ if(lock1Status && lock2Status){ return } if(lock1Status){ lock1.unlock() } if(lock2Status){ lock2.unlock() } } Thread.sleep(10) } } } public class App11 { public static void main(String[] args) throws InterruptedException { final Runner11 runner = new Runner11() Thread t1 = new Thread(new Runnable() { public void run() { runner.firstThread() } }) Thread t2 = new Thread(new Runnable() { public void run() { runner.secondThread() } }) t1.start() t2.start() t1.join() t2.join() runner.finish() } } 12.セマフォセマフォ(セマフォ)は、特定のリソースに同時にアクセスするスレッドの数を制御するために使用されます。さまざまなスレッドを調整して、共通のリソースを適切に使用できるようにしますclass Runner12{ private int connCnt = 0 private Semaphore sem = new Semaphore(10) //Limited to only connect 10 at the same time public void getConnection(){ try{ sem.acquire() synchronized (this) { connCnt++ } System.out.println('Current connection is :'+ connCnt) Thread.sleep(50) synchronized (this) { connCnt-- } }catch(Exception e){ } finally{ sem.release() } } } public class App12 { public static void main(String[] args) throws InterruptedException { final Runner12 runner = new Runner12() ExecutorService executor = Executors.newFixedThreadPool(10) for(int i=0 i<1000 i++){ executor.submit(new Runnable() { public void run() { runner.getConnection() } }) } executor.shutdown() executor.awaitTermination(100, TimeUnit.SECONDS) } }

13.呼び出し可能で未来

public class App13 { public static void main(String[] args) throws InterruptedException, ExecutionException { final Random random = new Random() ExecutorService executor = Executors.newCachedThreadPool() //Callable is different from Runnable, it can have a return value and throw an exception Future future = executor.submit(new Callable(){ public Integer call() throws InterruptedException{ int duration = random.nextInt(4000) System.out.println('Start...') Thread.sleep(duration) System.out.println('Done.') return duration } }) executor.shutdown() executor.awaitTermination(1, TimeUnit.MINUTES) //Future is used to obtain thread return value, status and other information System.out.println('Takes '+ future.get() + ' miliseconds.') } }

14.スレッドの中断

a。割り込み方式を使用する

public class App14 { public static void main(String[] args) throws InterruptedException { System.out.println('Start...') Thread t = new Thread(new Runnable() { public void run() { Random random = new Random() for(int i=0 i<1E8 i++){ if(Thread.interrupted()){ System.out.println('Interrupted...') break } Math.sin(random.nextDouble()) } } }) t.start() t.interrupt() t.join() System.out.println('End.') } }

b。 future.cancel()を使用する

public class App14 { public static void main(String[] args) throws InterruptedException { System.out.println('Start...') ExecutorService exec = Executors.newCachedThreadPool() Future fu = exec.submit(new Callable(){ public Void call(){ Random random = new Random() for(int i=0 i<1E8 i++){ if(Thread.interrupted()){ System.out.println('Interrupted...') break } Math.sin(random.nextDouble()) } return null } }) exec.shutdown() fu.cancel(true) //1. true means that the started thread also ends 2. false means that only the unstarted thread ends exec.awaitTermination(1, TimeUnit.MINUTES) System.out.println('End.') } }

c。 ExecutorServiceのshutdownNow()を使用します

public class App14 { public static void main(String[] args) throws InterruptedException { System.out.println('Start...') ExecutorService exec = Executors.newCachedThreadPool() exec.submit(new Callable(){ public Void call(){ Random random = new Random() for(int i=0 i<1E8 i++){ if(Thread.interrupted()){ System.out.println('Interrupted...') break } Math.sin(random.nextDouble()) } return null } }) exec.shutdown() exec.shutdownNow() exec.awaitTermination(1, TimeUnit.MINUTES) System.out.println('End.') } }