8.10. Queue — 同期キュークラス

注釈

Queue モジュールは、Python 3 では queue にリネームされました。 2to3 ツールが自動的にソースコードの import を修正します。

ソースコード: Lib/Queue.py


Queue モジュールは、複数プロデューサ-複数コンシューマ(multi-producer, multi-consumer)キューを実装します。これは、複数のスレッドの間で情報を安全に交換しなければならないときのマルチスレッドプログラミングで特に有益です。このモジュールの Queue クラスは、必要なすべてのロックセマンティクスを実装しています。これはPythonのスレッドサポートの状況に依存します。 threading モジュールを参照してください。

このモジュールでは3種類のキューが実装されています。それらはキューから取り出されるエントリの順番だけが違います。 FIFOキューでは、最初に追加されたエントリが最初に取り出されます。 LIFOキューでは、最後に追加されたエントリが最初に取り出されます(スタックのように振る舞います)。 優先順位付きキュー(priority queue)では、エントリは(heapq モジュールを利用して)ソートされ、 最も低い値のエントリが最初に取り出されます。

Queue モジュールは以下のクラスと例外を定義します:

class Queue.Queue(maxsize=0)

FIFOキューのコンストラクタです。maxsize はキューに置くことのできる要素数の上限を設定する整数です。いったんこの大きさに達したら、挿入はキューの要素が消費されるまでブロックされます。もし maxsize が0以下であるならば、キューの大きさは無限です。

class Queue.LifoQueue(maxsize=0)

LIFOキューのコンストラクタです。maxsize はキューに置くことのできる要素数の上限を設定する整数です。いったんこの大きさに達したら、挿入はキューの要素が消費されるまでブロックされます。もし maxsize が0以下であるならば、キューの大きさは無限です。

バージョン 2.6 で追加.

class Queue.PriorityQueue(maxsize=0)

優先順位付きキューのコンストラクタです。maxsize はキューに置くことのできる要素数の上限を設定する整数です。いったんこの大きさに達したら、挿入はキューの要素が消費されるまでブロックされます。もし maxsize が0以下であるならば、キューの大きさは無限です。

最小の値を持つ要素が最初に検索されます (最小の値を持つ値は、sorted(list(entries))[0] によって返されるものです)。典型的な要素のパターンは、(priority_number, data) 形式のタプルです。

バージョン 2.6 で追加.

exception Queue.Empty

空な Queue オブジェクトで、非ブロックメソッドとして get() (または get_nowait()) が呼ばれたとき、送出される例外です。

exception Queue.Full

満杯な Queue オブジェクトで、非ブロックメソッドとして put() (または put_nowait()) が呼ばれたとき、送出される例外です。

参考

collections.deque は、ロックなしで popleft()append() といったアトミック操作が可能なキューの実装です。

8.10.1. キューオブジェクト

キューオブジェクト(Queue, LifoQueue, PriorityQueue)は、以下のpublicメソッドを提供しています。

Queue.qsize()

キューの近似サイズを返します。ここで、qsize() > 0 は後続の get() がブロックしないことを保証しないこと、また qsize() < maxsize が put() がブロックしないことを保証しないことに注意してください。

Queue.empty()

キューが空の場合は True を返し、そうでなければ False を返します。empty() が True を返しても、後続の put() の呼び出しがブロックしないことは保証されません。同様に、empty() が False を返しても、後続の get() の呼び出しがブロックしないことは保証されません。

Queue.full()

キューが一杯の場合は True を返し、そうでなければ False を返します。full() が True を返しても、後続の get() の呼び出しがブロックしないことは保証されません。同様に、full() が False を返しても、後続の put() の呼び出しがブロックしないことは保証されません。

Queue.put(item[, block[, timeout]])

item をキューに入れます。もしオプション引数 block がTrueで timeout がNone(デフォルト)ならば、フリースロットが利用可能になるまでブロックします。 timeout が正の値の場合、最大で timeout 秒間ブロックし、その時間内に空きスロットが利用可能にならなければ、例外 Full を送出します。他方(block がFalse)、直ちにフリースロットが利用できるならば、キューにアイテムを置きます。できないならば、例外 Full を送出します (この場合 timeout は無視されます)。

バージョン 2.3 で追加: timeout パラメータが追加されました。

Queue.put_nowait(item)

put(item, False) と等価です。

Queue.get([block[, timeout]])

キューからアイテムを取り除き、それを返します。もしオプション引数 block がTrueで timeout がNone(デフォルト)ならば、アイテムが利用可能になるまでブロックします。もし timeout が正の値の場合、最大で timeout 秒間ブロックし、その時間内でアイテムが利用可能にならなければ、例外 Empty を送出します。他方(block がFalse)、直ちにアイテムが利用できるならば、それを返します。できないならば、例外 Empty を送出します (この場合 timeout は無視されます)。

バージョン 2.3 で追加: timeout パラメータが追加されました。

Queue.get_nowait()

get(False) と等価です。

キューに入れられたタスクが全てコンシューマスレッドに処理されたかどうかを追跡するために 2つのメソッドが提供されます。

Queue.task_done()

過去にキューに入れられたタスクが完了した事を示します。キューのコンシューマスレッドに利用されます。タスクの取り出しに使われた、各 get() に対して、それに続く task_done() の呼び出しは、取り出したタスクに対する処理が完了した事をキューに教えます。

join() がブロックされていた場合、全itemが処理された (キューに put() された全てのitemに対して task_done() が呼び出されたことを意味します) 時に復帰します。

キューにある要素より多く呼び出された場合 ValueError が発生します。

バージョン 2.5 で追加.

Queue.join()

キューの中の全アイテムが処理される間でブロックします。

キューにitemが追加される度に、未完了タスクカウントが増やされます。コンシューマスレッドが task_done() を呼び出して、itemを受け取ってそれに対する処理が完了した事を知らせる度に、未完了タスクカウントが減らされます。未完了タスクカウントが0になったときに、 join() のブロックが解除されます。

バージョン 2.5 で追加.

キューに入れたタスクが完了するのを待つ例:

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done