pythonマルチスレッド-スレッド間通信(グローバル変数、キュー)



Python Multithreading Inter Thread Communication Global Variables



簡単な紹介

スレッド間通信とは何ですか?

Multiple threads to process the same resources, but different tasks

なぜコミュニケーションするのですか?



If all of the dry between threads, you do not need communication, such code is also very simple. But it is generally not possible, at least to the thread and the main Thread communication, or the results and so can not be retrieved. The actual situation is much more complex, the need to exchange data among multiple threads in order to get the correct results of the implementation.

1、共有変数(グローバル変数)

同じグローバル変数プロセス通信を共有することによるスレッド間

最も簡単なアイデアは、グローバル変数を作成することです。いくつかの協調サブスレッドグローバル変数(いくつかのスレッドは変数を書き込み、変数はいくつかのスレッドを読み取ります)



たとえば、爬虫類の場合、記事のすべての詳細でブログサイトをクロールする必要があると想定します。まず、記事リストページをクロールしてurl、次に記事に従ってurl記事の特定のコンテンツをクロールします。一般に、記事のクロールurl記事のコンテンツは比較的大量のデータであるため、速度が遅くなり、 スレッド クロール記事urlリスト、詳細 スレッドB によるとurl記事の特定のコンテンツにアクセスしてクロールします。この時点で、2つのスレッド間に相互作用があり、記事のコンテンツをクロールします スレッドB 記事リストはクロールする必要があります スレッド 特定のデータ、および スレッド (クロール記事のリスト)する必要はありません スレッドB (記事コンテンツのクロール)データ。

import threading # import thread package import time detail_url_list = [] # Crawling article details page def get_detail_html(detail_url_list, id): while True: if len (detail_url_list) == 0: # list is empty, waiting for another thread it into data continue url = detail_url_list.pop() time.sleep (2) # delay 2s, analog network request print('thread {id}: get {url} detail finished'.format(id=id,url=url)) # Crawling Articles List def get_detail_url(detail_url_list): for i in range(10000): time.sleep (1) # delay 1s, simulation than crawling faster article details detail_url_list.append('http://projectedu.com/{id}'.format(id=i)) print('get detail url {id} end'.format(id=i)) if __name__ == '__main__': # Create a reading list page thread thread = threading.Thread(target=get_detail_url, args=(detail_url_list,)) # Create a read details page thread html_thread= [] for i in range(4): thread2 = threading.Thread(target=get_detail_html, args=(detail_url_list,i)) html_thread.append(thread2) start_time = time.time() # Start two threads thread.start() for i in range(4): html_thread[i].start() # Wait for the end of all threads thread.join() for i in range(4): html_thread[i].join() print('last time: {} s'.format(time.time()-start_time))

結果は非常に完璧に見えますが、一般的に非常にゆっくりと公開されますが、いくつかのリスクがあります。

2つの問題があります:



  1. in python of Listスレッドセーフではない可能性がありますpop()別のスレッドの実行中に半分を実行する関数pop()または別のスレッドを実行するappend()、この時点でdetail_url_listデータエラーが発生し、プログラムがハングしたり、誤った結果が得られたりします。
  2. 仮説detail_url_listスレッドがリストがまだ空でないと判断した場合、1つの要素のみpop()データ、タイムスライスが別のスレッドによって削除された場合、同じリストの要素があり、同じではありません空の場合、データの成功が取り出され、リストが空になり、スレッドのタイムスライス、実行のスレッドpop()が放棄され、結果としてpop from empty list例外が発生します! in url = detail_url_list.pop()前に追加されたステートメントtime.sleep(1)この問題が発生する可能性があります。

スペースは複数のスレッドプロセスによって共有されるため、スレッド間の通信は比較的単純で、主にグローバル変数を使用します。プロセス内のすべてのスレッドのグローバル変数が表示されるため、複数のスレッドが相互に通信して、グローバル変数を操作することで効果を実現できます。しかし、これも問題であり、競争の「リソース」です。

ここで言及されているリソースは、まさにこの競合のためにグローバル変数を参照しています(複数のスレッドが同時に実行されているが、実行されているスレッドの順序を制御しない傾向があるため)、そうでない場合はマルチスレッドを使用しない可能性があります結果を確認するために、グローバル変数への各スレッドはアトミックになりたいと考えています。

スレッド同期でこの問題を解決するには、3つの相互排除メカニズム、つまりセマフォ、ミューテックス、条件変数の導入を参照してください。


2、メッセージキューモジュールを介して--queue

メッセージキューと上記のプロセスを使用しますが、queue非常に優れたパッケージがあり、その時点での値とスレッドセーフの値を入力します。

queueモジュールは、マルチプロデューサー、マルチコンシューマーキューを実装します。情報の要求を複数のスレッド間で安全に交換する必要がある場合、このモジュール スレッドプログラミング 非常に便利な場合。主にキューを実装する3。

  1. class queue.Queue(maxsize = 0):FIFOキュー構造。maxsizeはキューのサイズを制限できます。キューのサイズがキューの上限に達した場合、キューはロックされ、追加すると、キューの内容が消費されるまでブロックされます。 Maxsize値が0以下の場合、キューサイズは無制限です
  2. class queue.LifoQueue(maxsize = 0):LIFO(後入れ先出し)キューを構築します
  3. class PriorityQueue(maxsize = 0):最初の最も低い優先度のうち、最も低い優先度が一般的にソートされて使用されます(リスト(エントリ))[0]。典型的なのは、要素が追加されたタプル(優先度データ)です。

キューの前に書き直されたコードを使用する:

import threading # import thread package from queue import Queue import time # Crawling article details page def get_detail_html(detail_url_list, id): while True: url = detail_url_list.get() time.sleep (2) # delay 2s, analog network request print('thread {id}: get {url} detail finished'.format(id=id,url=url)) # Crawling Articles List def get_detail_url(queue): for i in range(10000): time.sleep (1) # delay 1s, simulation than crawling faster article details queue.put('http://projectedu.com/{id}'.format(id=i)) print('get detail url {id} end'.format(id=i)) if __name__ == '__main__': detail_url_queue = Queue(maxsize=1000) # To create two threads thread = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) html_thread= [] for i in range(3): thread2 = threading.Thread(target=get_detail_html, args=(detail_url_queue,i)) html_thread.append(thread2) start_time = time.time() # Start two threads thread.start() for i in range(3): html_thread[i].start() # Wait for the end of all threads thread.join() for i in range(3): html_thread[i].join() print('last time: {} s'.format(time.time()-start_time))
  1. スレッド間の通信では、グローバル変数を使用する必要があり、ロックする必要があります。
  2. use queue Moduleは、スレッド間で通信でき、セキュリティスレッドを確保します。