16.6. multiprocessing — プロセスベースの “並列処理” インタフェース

バージョン 2.6 で追加.

16.6.1. はじめに

multiprocessing は、 threading と似た API で複数のプロセスを生成をサポートするパッケージです。 multiprocessing パッケージは、ローカルとリモート両方の並行処理を提供します。また、このパッケージはスレッドの代わりにサブプロセスを使用することにより、グローバルインタープリタロック の問題を避ける工夫が行われています。このような特徴があるため multiprocessing モジュールを使うことで、マルチプロセッサーマシンの性能を最大限に活用することができるでしょう。なお、このモジュールは Unix と Windows で動作します。

multiprocessing モジュールでは threading モジュールには似たものがない API も導入しています。その最たるものが Pool オブジェクトです。これは複数の入力データに対して、サブプロセス群に入力データを分配して並列に関数実行する (データ並列) のに便利な手段を提供します。以下の例では、モジュール内で関数を定義するのに、子プロセスがそのモジュールをつつがなくインポート出来るようにする一般的な慣例を使っています。 Pool を用いたデータ並列の基礎的な実例はこのようなものです:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

標準出力に以下が出力されます:

[1, 4, 9]

16.6.1.1. Process クラス

multiprocessing モジュールでは、プロセスは以下の手順によって生成されます。はじめに Process のオブジェクトを作成し、続いて start() メソッドを呼び出します。この Process クラスは threading.Thread クラスと同様の API を持っています。まずは、簡単な例をもとにマルチプロセスを使用したプログラムについてみていきましょう

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

実行された個々のプロセス ID を表示するために拡張したサンプルコードを以下に例を示します:

from multiprocessing import Process
import os

def info(title):
    print title
    print 'module name:', __name__
    if hasattr(os, 'getppid'):  # only available on Unix
        print 'parent process:', os.getppid()
    print 'process id:', os.getpid()

def f(name):
    info('function f')
    print 'hello', name

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

(Windows 環境で) if __name__ == '__main__' という記述が必要な理由については、 プログラミングガイドライン を参照してください。

16.6.1.2. プロセス間でのオブジェクト交換

multiprocessing モジュールでは、プロセス間通信の手段が2つ用意されています。それぞれ以下に詳細を示します:

キュー (Queue)

Queue クラスは Queue.Queue クラスとほとんど同じように使うことができます。以下に例を示します。

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"
    p.join()

キューはスレッドセーフであり、プロセスセーフです。

パイプ (Pipe)

Pipe() 関数はパイプで繋がれたコネクションオブジェクトのペアを返します。デフォルトでは双方向性パイプを返します。以下に例を示します:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print parent_conn.recv()   # prints "[42, None, 'hello']"
    p.join()

パイプのそれぞれの端を表す2つのコネクションオブジェクトが Pipe() 関数から返されます。各コネクションオブジェクトには、 send()recv()、その他のメソッドがあります。2つのプロセス (またはスレッド) がパイプの 同じ 端で同時に読み込みや書き込みを行うと、パイプ内のデータが破損してしまうかもしれないことに注意してください。もちろん、各プロセスがパイプの別々の端を同時に使用するならば、データが破壊される危険性はありません。

16.6.1.3. プロセス間の同期

multiprocessingthreading モジュールと等価な同期プリミティブを備えています。以下の例では、ロックを使用して、一度に1つのプロセスしか標準出力に書き込まないようにしています:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print 'hello world', i
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

ロックを使用しないで標準出力に書き込んだ場合は、各プロセスからの出力がごちゃまぜになってしまいます。

16.6.1.4. プロセス間での状態の共有

これまでの話の流れで触れたとおり、並行プログラミングを行うときには、できるかぎり状態を共有しないのが定石です。複数のプロセスを使用するときは特にそうでしょう。

しかし、どうしてもプロセス間のデータ共有が必要な場合のために multiprocessing モジュールには2つの方法が用意されています。

共有メモリ (Shared memory)

データを共有メモリ上に保持するために Value クラス、もしくは Array クラスを使用することができます。以下のサンプルコードを使って、この機能についてみていきましょう

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

このサンプルコードを実行すると以下のように表示されます

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

numarr を生成するときに使用されている、引数 'd''i'array モジュールにより使用される種別の型コードです。ここで使用されている 'd' は倍精度浮動小数、 'i' は符号付整数を表します。これらの共有オブジェクトは、プロセスセーフでありスレッドセーフです。

共有メモリを使用して、さらに柔軟なプログラミングを行うには multiprocessing.sharedctypes モジュールを使用します。このモジュールは共有メモリから割り当てられた任意の ctypes オブジェクトの生成をサポートします。

サーバープロセス (Server process)

Manager() 関数により生成されたマネージャーオブジェクトはサーバープロセスを管理します。マネージャーオブジェクトは Python のオブジェクトを保持して、他のプロセスがプロキシ経由でその Python オブジェクトを操作することができます。

Manager() 関数が返すマネージャは listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventQueueValueArray をサポートします。以下にサンプルコードを示します。

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    l = manager.list(range(10))

    p = Process(target=f, args=(d, l))
    p.start()
    p.join()

    print d
    print l

このサンプルコードを実行すると以下のように表示されます

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

サーバープロセスのマネージャーオブジェクトは共有メモリのオブジェクトよりも柔軟であるといえます。それは、どのような型のオブジェクトでも使えるからです。また、1つのマネージャーオブジェクトはネットワーク経由で他のコンピューター上のプロセスによって共有することもできます。しかし、共有メモリより動作が遅いという欠点があります。

16.6.1.5. ワーカープロセスのプールを使用

Pool クラスは、ワーカープロセスをプールする機能を備えています。このクラスには、異なる方法でワーカープロセスへタスクを割り当てるいくつかのメソッドがあります。

例えば:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print pool.map(f, range(10))

    # print same numbers in arbitrary order
    for i in pool.imap_unordered(f, range(10)):
        print i

    # evaluate "f(20)" asynchronously
    res = pool.apply_async(f, (20,))      # runs in *only* one process
    print res.get(timeout=1)              # prints "400"

    # evaluate "os.getpid()" asynchronously
    res = pool.apply_async(os.getpid, ()) # runs in *only* one process
    print res.get(timeout=1)              # prints the PID of that process

    # launching multiple evaluations asynchronously *may* use more processes
    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print [res.get(timeout=1) for res in multiple_results]

    # make a single worker sleep for 10 secs
    res = pool.apply_async(time.sleep, (10,))
    try:
        print res.get(timeout=1)
    except TimeoutError:
        print "We lacked patience and got a multiprocessing.TimeoutError"

プールオブジェクトのメソッドは、そのプールを作成したプロセスのみが呼び出すべきです。

注釈

このパッケージに含まれる機能を使用するためには、子プロセスから __main__ モジュールをインポートできる必要があります。このことについては プログラミングガイドライン で触れていますが、ここであらためて強調しておきます。なぜかというと、いくつかのサンプルコード、例えば multiprocessing.pool.Pool のサンプルはインタラクティブシェル上では動作しないからです。以下に例を示します:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(このサンプルを試すと、3つのトレースバックすべてがほぼランダムに交互に重なって表示されます。そうなったら、なんとかしてマスタープロセスを止めましょう。)

16.6.2. リファレンス

multiprocessing パッケージは threading モジュールの API とほとんど同じです。

16.6.2.1. Process クラスと例外

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})

Process オブジェクトは各プロセスの処理を表します。 Process クラスは threading.Thread クラスのすべてのメソッドと同じインタフェースを提供します。

コンストラクタは必ずキーワード引数で呼び出すべきです。引数 group には必ず None を渡してください。この引数は threading.Thread クラスとの互換性のためだけに残されています。引数 target には、呼び出し可能オブジェクト (Callable Object) を渡します。このオブジェクトは run() メソッドから呼び出されます。この引数はデフォルトで None となっており、何も呼び出されません。引数 name にはプロセス名を渡します。デフォルトでは、自動でユニークな名前が割り当てられます。命名規則は、 ‘Process-N1:N2:...:Nk‘ となります。ここで N1,N2,...,Nk は整数の数列で、 作成した プロセス数に対応します。引数 args は target で指定された呼び出し可能オブジェクトへの引数を渡します。同じく、引数 kwargs はキーワード引数を渡します。デフォルトでは、 target には引数が渡されないようになっています。

サブクラスがコンストラクターをオーバーライドする場合は、そのプロセスに対する処理を行う前に基底クラスのコンストラクター (Process.__init__()) を実行しなければなりません。

run()

プロセスが実行する処理を表すメソッドです。

このメソッドはサブクラスでオーバーライドすることができます。標準の run() メソッドは、コンストラクターの target 引数として渡された呼び出し可能オブジェクトを呼び出します。もしコンストラクターに args もしくは kwargs 引数が渡されていれば、呼び出すオブジェクトにこれらの引数を渡します。

start()

プロセスの処理を開始するためのメソッドです。

各 Process オブジェクトに対し、このメソッドが2回以上呼び出されてはいけません。各プロセスでオブジェクトの run() メソッドを呼び出す準備を行います。

join([timeout])

join() されたプロセスが terminate を呼び出すまで、もしくはオプションで指定したタイムアウトが発生するまで呼び出し側のスレッドをブロックします。

timeoutNone ならタイムアウトは設定されません。

1つのプロセスは何回も join されることができます。

プロセスは自分自身を join することはできません。それはデッドロックを引き起こすことがあるからです。プロセスが start される前に join しようとするとエラーが発生します。

name

プロセス名です。

この名前は文字列で、プロセスの識別にのみ使用されます。特別な命名規則はありません。複数のプロセスが同じ名前を持つ場合もあります。また、この名前はコンストラクタにより初期化されます。

is_alive()

プロセスが実行中かを判別します。

おおまかに言って、プロセスオブジェクトは start() メソッドを呼び出してから子プロセス終了までの期間が実行中となります。

daemon

デーモンプロセスであるかのフラグであり、ブール値です。この属性は start() が呼び出される前に設定されている必要があります。

初期値は作成するプロセスから継承します。

あるプロセスが終了するとき、そのプロセスはその子プロセスであるデーモンプロセスすべてを終了させようとします。

デーモンプロセスは子プロセスを作成できないことに注意してください。もし作成できてしまうと、そのデーモンプロセスの親プロセスが終了したときにデーモンプロセスの子プロセスが孤児になってしまう場合があるからです。さらに言えば、デーモンプロセスはUnix デーモンやサービスでは なく 通常のプロセスであり、非デーモンプロセスが終了すると終了されます (そして join されません)。

threading.Thread クラスの API に加えて Process クラスのオブジェクトには以下の属性およびメソッドがあります:

pid

プロセスIDを返します。プロセスの生成前は None が設定されています。

exitcode

子プロセスの終了コードです。子プロセスがまだ終了していない場合は None が返されます。負の値 -N は子プロセスがシグナル N で終了したことを表します。

authkey

プロセスの認証キーです (バイト文字列です)。

multiprocessing モジュールがメインプロセスにより初期化される場合には、 os.urandom() 関数を使用してランダムな値が設定されます。

Process クラスのオブジェクトの作成時にその親プロセスから認証キーを継承します。もしくは authkey に別のバイト文字列を設定することもできます。

詳細は 認証キー を参照してください。

terminate()

プロセスを終了します。Unix 環境では SIGTERM シグナルを、 Windows 環境では TerminateProcess() を使用して終了させます。終了ハンドラーや finally 節などは、実行されないことに注意してください。

このメソッドにより終了するプロセスの子孫プロセスは、終了 しません 。そういった子孫プロセスは単純に孤児になります。

警告

このメソッドの使用時に、関連付けられたプロセスがパイプやキューを使用している場合には、使用中のパイプやキューが破損して他のプロセスから使用できなくなる可能性があります。同様に、プロセスがロックやセマフォなどを取得している場合には、このプロセスが終了してしまうと他のプロセスのデッドロックの原因になるでしょう。

プロセスオブジェクトが作成したプロセスのみが start(), join(), is_alive(), terminate()exitcode のメソッドを呼び出すべきです。

以下の例では Process のメソッドの使い方を示しています:

>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print p, p.is_alive()
<Process(Process-1, initial)> False
>>> p.start()
>>> print p, p.is_alive()
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print p, p.is_alive()
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.BufferTooShort

この例外は Connection.recv_bytes_into() によって発生し、バッファーオブジェクトが小さすぎてメッセージが読み込めないことを示します。

eBufferTooShort のインスタンスとすると、 e.args[0] はバイト文字列でそのメッセージを取得できます。

16.6.2.2. パイプ (Pipe) とキュー (Queue)

複数のプロセスを使う場合、一般的にはメッセージパッシングをプロセス間通信に使用し、ロックのような同期プリミティブを使用しないようにします。

メッセージのやりとりのために Pipe() (2つのプロセス間の通信用)、もしくはキュー (複数のメッセージ生成プロセス (producer)、消費プロセス (consumer) の実現用) を使うことができます。

Queue, multiprocessing.queues.SimpleQueue, JoinableQueue は複数プロセスから生成/消費を行う FIFO キューです。これらのキューは標準ライブラリの Queue.Queue を模倣しています。 Queue には Python 2.5 の Queue.Queue クラスで導入された task_done()join() メソッドがないことが違う点です。

もし JoinableQueue を使用するなら、キューから削除される各タスクのために JoinableQueue.task_done() を呼び出さなければ なりません 。さもないと、いつか完了していないタスクを数えるためのセマフォがオーバーフローし、例外を発生させるでしょう。

管理オブジェクトを使用することで共有キューを作成できることも覚えておいてください。詳細は マネージャー を参照してください。

注釈

multiprocessing は通常の Queue.Empty と、タイムアウトのシグナルを送るために Queue.Full 例外を使用します。それらは Queue からインポートする必要があるので multiprocessing の名前空間では利用できません。

注釈

オブジェクトがキューに追加される際、そのオブジェクトは pickle 化されています。そのため、バックグラウンドのスレッドが後になって下位層のパイプに pickle 化されたデータをフラッシュすることがあります。これにより、少し驚くような結果になりますが、実際に問題になることはないはずです。これが問題になるような状況では、かわりに manager を使ってキューを作成することができるからです。

  1. 空のキューの中にオブジェクトを追加した後、キューの empty() メソッドが False を返すまでの間にごくわずかな遅延が起きることがあり、get_nowait()Queue.Empty を発生させることなく制御が呼び出し元に返ってしまうことがあります。

  2. 複数のプロセスがオブジェクトをキューに詰めている場合、キューの反対側ではオブジェクトが詰められたのとは違う順序で取得される可能性があります。ただし、同一のプロセスから詰め込まれたオブジェクトは、それらのオブジェクト間では、必ず期待どおりの順序になります。

警告

Queue を利用しようとしている最中にプロセスを Process.terminate()os.kill() で終了させる場合、キューにあるデータは破損し易くなります。終了した後で他のプロセスがキューを利用しようとすると、例外を発生させる可能性があります。

警告

上述したように、もし子プロセスがキューへ要素を追加するなら (かつ JoinableQueue.cancel_join_thread を使用しないなら) そのプロセスはバッファーされたすべての要素がパイプへフラッシュされるまで終了しません。

これは、そのプロセスを join しようとする場合、キューに追加されたすべての要素が消費されたことが確実でないかぎり、デッドロックを発生させる可能性があることを意味します。似たような現象で、子プロセスが非デーモンプロセスの場合、親プロセスは終了時に非デーモンのすべての子プロセスを join しようとしてハングアップする可能性があります。

マネージャーを使用して作成されたキューではこの問題はありません。詳細は プログラミングガイドライン を参照してください。

プロセス間通信におけるキューの使用例を知りたいなら を参照してください。

multiprocessing.Pipe([duplex])

パイプの両端を表す Connection オブジェクトのペア (conn1, conn2) を返します。

duplexTrue (デフォルト) ならパイプは双方向性です。duplexFalse ならパイプは一方向性で、conn1 はメッセージの受信専用、conn2 はメッセージの送信専用になります。

class multiprocessing.Queue([maxsize])

パイプや2~3個のロック/セマフォを使用して実装されたプロセス共有キューを返します。あるプロセスが最初に要素をキューへ追加するとき、バッファーからパイプの中へオブジェクトを転送する供給スレッドが開始されます。

標準ライブラリの Queue モジュールからの通常の Queue.EmptyQueue.Full 例外はタイムアウトのシグナルを送るために発生します。

Queuetask_done()join() を除く Queue.Queue の全てのメソッドを実装します。

qsize()

おおよそのキューのサイズを返します。マルチスレッディング/マルチプロセスの特性上、この数値は信用できません。

これは sem_getvalue() が実装されていない Mac OS X のような Unix プラットホーム上で NotImplementedError を発生させる可能性があることを覚えておいてください。

empty()

キューが空っぽなら True を、そうでなければ False を返します。マルチスレッディング/マルチプロセシングの特性上、これは信用できません。

full()

キューがいっぱいなら True を、そうでなければ False を返します。マルチスレッディング/マルチプロセシングの特性上、これは信用できません。

put(obj[, block[, timeout]])

キューの中へ要素を追加します。オプションの引数 blockTrue (デフォルト) 且つ timeoutNone (デフォルト) なら、空きスロットが利用可能になるまで必要であればブロックします。 timeout が正の数なら、最大 timeout 秒ブロックして、その時間内に空きスロットが利用できなかったら Queue.Full 例外を発生させます。それ以外 ( blockFalse ) で、空きスロットがすぐに利用可能な場合はキューに要素を追加します。そうでなければ Queue.Full 例外が発生します(その場合 timeout は無視されます)。

put_nowait(obj)

put(obj, False) と等価です。

get([block[, timeout]])

キューから要素を取り出して削除します。オプションの引数 blockTrue (デフォルト) 且つ timeoutNone (デフォルト) なら、要素が取り出せるまで必要であればブロックします。 timeout が正の数なら、最大 timeout 秒ブロックして、その時間内に要素が取り出せなかったら Queue.Empty 例外を発生させます。それ以外 ( blockFalse ) で、要素がすぐに取り出せる場合は要素を返します。そうでなければ Queue.Empty 例外が発生します(その場合 timeout は無視されます)。

get_nowait()

get(False) と等価です。

QueueQueue.Queue にはない追加メソッドがあります。これらのメソッドは通常、ほとんどのコードに必要ありません。

close()

カレントプロセスからこのキューへそれ以上データが追加されないことを表します。バックグラウンドスレッドはパイプへバッファーされたすべてのデータをフラッシュするとすぐに終了します。これはキューがガベージコレクトされるときに自動的に呼び出されます。

join_thread()

バックグラウンドスレッドを join します。このメソッドは close() が呼び出された後でのみ使用されます。バッファーされたすべてのデータがパイプへフラッシュされるのを保証するため、バックグラウンドスレッドが終了するまでブロックします。

デフォルトでは、あるプロセスがキューを作成していない場合、終了時にキューのバックグラウンドスレッドを join しようとします。そのプロセスは join_thread() が何もしないように cancel_join_thread() を呼び出すことができます。

cancel_join_thread()

join_thread() がブロッキングするのを防ぎます。特にこれはバックグラウンドスレッドがそのプロセスの終了時に自動的に join されるのを防ぎます。詳細は join_thread() を参照してください。

このメソッドは allow_exit_without_flush() という名前のほうがよかったかもしれません。キューに追加されたデータが失われてしまいがちなため、このメソッドを使う必要はほぼ確実にないでしょう。本当にこれが必要になるのは、キューに追加されたデータを下位層のパイプにフラッシュすることなくカレントプロセスを直ちに終了する必要があり、かつ失われるデータに関心がない場合です。

注釈

このクラスに含まれる機能には、ホストとなるオペレーティングシステム上で動作している共有セマフォ (shared semaphore) を使用しているものがあります。これが使用できない場合には、このクラスが無効になり、 Queue をインスタンス化する時に ImportError が発生します。詳細は bpo-3770 を参照してください。同様のことが、以下に列挙されている特殊なキューでも成り立ちます。

class multiprocessing.queues.SimpleQueue

単純化された Queue 型です。ロックされた Pipe に非常に似ています。

empty()

もしキューが空ならば True を、そうでなければ False を返します。

get()

キューからアイテムを取り除いて返します。

put(item)

item をキューに追加します。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueueQueue のサブクラスであり、 task_done()join() メソッドが追加されているキューです。

task_done()

以前にキューへ追加されたタスクが完了したことを表します。キュー消費スレッドによって使用されます。タスクをフェッチするために使用されるそれぞれの get() では、次に task_done() を呼び出してタスクの処理が完了したことをキューへ伝えます。

もし join() がブロッキング状態なら、全ての要素が処理されたときに復帰します( task_done() 呼び出しが全ての要素からキュー内へ put() されたと受け取ったことを意味します)。

キューにある要素より多く呼び出された場合 ValueError が発生します。

join()

キューにあるすべてのアイテムが取り出されて処理されるまでブロックします。

キューに要素が追加されると未終了タスク数が増えます。キューの要素が取り出されて全て処理が完了したことを表す task_done() を消費スレッドが呼び出すと数が減ります。未終了タスク数がゼロになると join() はブロッキングを解除します。

16.6.2.3. その他の関数

multiprocessing.active_children()

カレントプロセスのアクティブな子プロセスのすべてのリストを返します。

これを呼び出すと “join” してすでに終了しているプロセスには副作用があります。

multiprocessing.cpu_count()

システムの CPU 数を返します。 NotImplementedError が送出される場合があります。

multiprocessing.current_process()

カレントプロセスに対応する Process オブジェクトを返します。

threading.current_thread() とよく似た関数です。

multiprocessing.freeze_support()

multiprocessing を使用するプログラムが Windows の実行可能形式を生成しようとして固まったときのサポートを追加します。(py2exe , PyInstallercx_Freeze でテストされています。)

メインモジュールの if __name__ == '__main__' の直後にこの関数を呼び出す必要があります。以下に例を示します:

from multiprocessing import Process, freeze_support

def f():
    print 'hello world!'

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

もし freeze_support() の行がない場合、固まった実行可能形式を実行しようとすると RuntimeError を発生させます。

freeze_support() の呼び出しは Windows 以外の OS では効果がありません。さらに、もしモジュールが Windows の通常の Python インタプリタによって実行されているならば(プログラムがフリーズされていなければ) freeze_support() は効果がありません。

multiprocessing.set_executable()

子プロセスを開始するときに、使用する Python インタープリターのパスを設定します。(デフォルトでは sys.executable が使用されます)。コードに組み込むときは、おそらく次のようにする必要があります

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

子プロセスを生成する前に行なってください。 (Windows only)

16.6.2.4. Connection オブジェクト

Connection オブジェクトは pickle でシリアライズ可能なオブジェクトか文字列を送ったり、受け取ったりします。そういったオブジェクトはメッセージ指向の接続ソケットと考えられます。

Connection オブジェクトは通常は Pipe() を使用して作成されます。詳細は リスナーとクライアント も参照してください。

class multiprocessing.Connection
send(obj)

コネクションの相手側へ recv() を使用して読み込むオブジェクトを送ります。

オブジェクトは pickle でシリアライズ可能でなければなりません。 pickle が極端に大きすぎる (OS にも依りますが、およそ 32 MB+) と、 ValueError 例外が送出されることがあります。

recv()

コネクションの相手側から send() を使用して送られたオブジェクトを返します。 何か受け取るまでブロックします。何も受け取らずにコネクションの相手側でクローズされた場合 EOFError が発生します。

fileno()

コネクションが使用するハンドラーか、ファイル記述子を返します。

close()

コネクションをクローズします。

コネクションがガベージコレクトされるときに自動的に呼び出されます。

poll([timeout])

読み込み可能なデータがあるかどうかを返します。

timeout が指定されていなければすぐに返します。 timeout に数値を指定すると、最大指定した秒数をブロッキングします。 timeoutNone を指定するとタイムアウトせずにずっとブロッキングします。

send_bytes(buffer[, offset[, size]])

バッファインタフェースをサポートするオブジェクトから完全なメッセージとしてバイトデータを送ります。

offset が指定されると buffer のその位置からデータが読み込まれます。 size が指定されるとバッファーからその量のデータが読み込まれます。非常に大きなバッファー (OS に依存しますが、およそ 32MB+) を指定すると、 ValueError 例外が発生するかもしれません。

recv_bytes([maxlength])

コネクションの相手側から送られたバイトデータの完全なメッセージを文字列として返します。何か受け取るまでブロックします。受け取るデータが何も残っておらず、相手側がコネクションを閉じていた場合、 EOFError が送出されます。

maxlength を指定して、且つ maxlength よりメッセージが長い場合、 IOError を発生させて、それ以上はコネクションから読み込めなくなります。

recv_bytes_into(buffer[, offset])

コネクションの相手側から送られたバイトデータを buffer に読み込み、メッセージのバイト数を返します。 何か受け取るまでブロックします。何も受け取らずにコネクションの相手側でクローズされた場合 EOFError が発生します。

buffer は書き込み可能なバッファインタフェースを備えたオブジェクトでなければなりません。 offset が与えられたら、その位置からバッファへメッセージが書き込まれます。オフセットは buffer バイトよりも小さい正の数でなければなりません。

バッファーがあまりに小さいと BufferTooShort 例外が発生します。 e が例外インスタンスとすると完全なメッセージは e.args[0] で確認できます。

例えば:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

Connection.recv() メソッドは受信したデータを自動的に unpickle 化します。それはメッセージを送ったプロセスが信頼できる場合を除いてセキュリティリスクになります。

そのため Pipe() を使用してコネクションオブジェクトを生成する場合を除いて、何らかの認証処理を実行した後で recv()send() メソッドのみを使用すべきです。詳細は 認証キー を参照してください。

警告

もしプロセスがパイプの読み込みまたは書き込み中に kill されると、メッセージの境界がどこなのか分からなくなってしまうので、そのパイプ内のデータは破損してしまいがちです。

16.6.2.5. 同期プリミティブ

一般的にマルチプロセスプログラムは、マルチスレッドプログラムほどは同期プリミティブを必要としません。詳細は threading モジュールのドキュメントを参照してください。

マネージャーオブジェクトを使用して同期プリミティブを作成できることも覚えておいてください。詳細は マネージャー を参照してください。

class multiprocessing.BoundedSemaphore([value])

有限セマフォオブジェクト: threading.BoundedSemaphore のクローンです。

インターフェイスがスレッドのものと一つだけ違います: acquire メソッドの最初の引数は block 、オプション引数として timeout を取ります。これは Lock.acquire() でのものと同じです。

注釈

Mac OS X では sem_getvalue() が実装されていないので Semaphore と区別がつきません。

class multiprocessing.Condition([lock])

状態変数: threading.Condition のクローンです。

lock を指定するなら multiprocessingLockRLock オブジェクトにすべきです。

class multiprocessing.Event

threading.Event のクローンです。このメソッドは、終了時の内部セマフォの状態を返すので、タイムアウトが与えられ、実際にオペレーションがタイムアウトしたのでなければ、必ず True を返します。

バージョン 2.7 で変更: 以前はこのメソッドは常に None を返していました。

class multiprocessing.Lock

再帰しないロックオブジェクトで、 threading.Lock 相当のものです。プロセスやスレッドがロックをいったん獲得 (acquire) すると、それに続くほかのプロセスやスレッドが獲得しようとする際、それが解放 (release) されるまではブロックされます。解放はどのプロセス、スレッドからも行えます。スレッドに対して適用される threading.Lock のコンセプトと振る舞いは、特筆すべきものがない限り、プロセスとスレッドに適用される multiprocessing.Lock に引き継がれています。

Lock は実際にはファクトリ関数で、デフォルトコンテキストで初期化された multiprocessing.synchronize.Lock のインスタンスを返すことに注意してください。

Lockcontext manager プロトコルをサポートしています。つまり with 文で使うことができます。

acquire(block=True, timeout=None)

ブロックあり、またはブロックなしでロックを獲得します。

引数 blockTrue (デフォルト) に設定して呼び出した場合、ロックがアンロック状態になるまでブロックします。ブロックから抜けるとそれをロック状態にしてから True を返します。 threading.Lock.acquire() の最初の引数とは名前が違っているので注意してください。

引数 block の値を False にして呼び出すとブロックしません。 現在ロック状態であれば、直ちに False を返します。それ以外の場合には、ロックをロック状態にして True を返します。

timeout として正の浮動小数点数を与えて呼び出すと、ロックが獲得出来ないあいだ最大でこれで指定した秒数だけブロックします。 timeout 値の負数はゼロと同じです。 timeout 値の None (デフォルト) は無限にブロックすることを意味します。 blockFalse の場合には timeout には実際的な意味はないので無視されます。ロックを獲得すると True 、タイムアウトした場合は False で戻ります。 timeout 引数は類似品の threading.Lock.acquire() にはないことに注意してください。

release()

ロックを解放します。これはロックを獲得したプロセスやスレッドだけでなく、任意のプロセスやスレッドから呼ぶことができます。

threading.Lock.release() と同じように振舞いますが、ロックされていない場合に呼び出すと ValueError となる点だけが違います。

class multiprocessing.RLock

再帰ロックオブジェクトで、 threading.RLock 相当のものです。再帰ロックオブジェクトはそれを獲得 (acquire) したプロセスやスレッドが解放 (release) しなければなりません。プロセスやスレッドがロックをいったん獲得すると、同じプロセスやスレッドはブロックされずに再度獲得出来ます。そのプロセスやスレッドは獲得した回数ぶん解放しなければなりません。

RLock は実際にはファクトリ関数で、デフォルトコンテキストで初期化された multiprocessing.synchronize.Lock のインスタンスを返すことに注意してください。

RLockcontext manager プロトコルをサポートしています。つまり with 文で使うことができます。

acquire(block=True, timeout=None)

ブロックあり、またはブロックなしでロックを獲得します。

block 引数を True にして呼び出した場合、ロックが既にカレントプロセスもしくはカレントスレッドが既に所有していない限りは、アンロック状態 (どのプロセス、スレッドも所有していない状態) になるまでブロックします。ブロックから抜けるとカレントプロセスもしくはカレントスレッドが (既に持っていなければ) 所有権を得て、再帰レベルをインクリメントし、 True で戻ります。 threading.RLock.acquire() の実装とはこの最初の引数の振る舞いが、その名前自身を始めとしていくつか違うので注意してください。

block 引数を False にして呼び出した場合、ブロックしません。ロックが他のプロセスもしくはスレッドにより獲得済み (つまり所有されている) であれば、カレントプロセスまたはカレントスレッドは所有権を得ず、再帰レベルも変更せずに、 False で戻ります。ロックがアンロック状態の場合、カレントプロセスもしくはカレントスレッドは所有権を得て再帰レベルがインクリメントされ、 True で戻ります。(—訳注: block の True/False 関係なくここでの説明では「所有権を持っている場合の2度目以降の aquire」の説明が欠けています。2度目以降の acquire では再帰レベルがインクリメントされて即座に返ります。全体読めばわかるとは思いますが一応。—)

timeout 引数の使い方と振る舞いは Lock.acquire() と同じです。 timeout 引数は類似品の threading.RLock.acquire() にはないことに注意してください。

release()

再帰レベルをデクリメントしてロックを解放します。デクリメント後に再帰レベルがゼロになった場合、ロックの状態をアンロック (いかなるプロセス、いかなるスレッドにも所有されていない状態) にリセットし、ロックの状態がアンロックになるのを待ってブロックしているプロセスもしくはスレッドがある場合にはその中のただ一つだけが処理を進行できるようにします。デクリメント後も再帰レベルがゼロでない場合、ロックの状態はロックのままで、呼び出し側のプロセスもしくはスレッドに所有されたままになります。

このメソッドは呼び出しプロセスあるいはスレッドがロックを所有している場合に限り呼び出してください。所有者でないプロセスもしくはスレッドによって呼ばれるか、あるいはアンロック (未所有) 状態で呼ばれた場合、 AssertionError が送出されます。同じ状況での threading.RLock.release() 実装とは例外の型が異なるので注意してください。

class multiprocessing.Semaphore([value])

セマフォオブジェクト: threading.Semaphore のクローンです。

インターフェイスがスレッドのものと一つだけ違います: acquire メソッドの最初の引数は block 、オプション引数として timeout を取ります。これは Lock.acquire() でのものと同じです。

注釈

BoundedSemaphore, Lock, RLockSemaphoreacquire() メソッドは threading ではサポートされていないタイムアウトパラメータを取ります。その引数はキーワード引数で受け取れる acquire(block=True, timeout=None) です。 blockTrue 且つ timeoutNone ではないなら、タイムアウトが秒単位で設定されます。 blockFalse なら timeout は無視されます。

Mac OS X では sem_timedwait がサポートされていないので、acquire() にタイムアウトを与えて呼ぶと、ループ内でスリープすることでこの関数がエミュレートされます。

注釈

メインスレッドが BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() 又は Condition.wait() を呼び出してブロッキング状態のときに Ctrl-C で生成される SIGINT シグナルを受け取ると、その呼び出しはすぐに中断されて KeyboardInterrupt が発生します。

これは同等のブロッキング呼び出しが実行中のときに SIGINT が無視される threading の振る舞いとは違っています。

注釈

このパッケージに含まれる機能には、ホストとなるオペレーティングシステム上で動作している共有セマフォを使用しているものがあります。これが使用できない場合には、multiprocessing.synchronize モジュールが無効になり、このモジュールのインポート時に ImportError が発生します。詳細は bpo-3770 を参照してください。

16.6.2.6. 共有 ctypes オブジェクト

子プロセスにより継承される共有メモリを使用する共有オブジェクトを作成することができます。

multiprocessing.Value(typecode_or_type, *args[, lock])

共有メモリから割り当てられた ctypes オブジェクトを返します。デフォルトでは、返り値は実際のオブジェクトの同期ラッパーです。

typecode_or_type は返されるオブジェクトの型を決めます。それは ctypes の型か array モジュールで使用されるような1文字の型コードかのどちらか一方です。 *args は型のコンストラクターへ渡されます。

lockTrue (デフォルト) なら、値へ同期アクセスするために新たに再帰的なロックオブジェクトが作成されます。 lockLockRLock なら値への同期アクセスに使用されます。 lockFalse なら、返されたオブジェクトへのアクセスはロックにより自動的に保護されません。そのため、必ずしも “プロセスセーフ” ではありません。

+= のような演算は、読み込みと書き込みを含むためアトミックでありません。このため、たとえば自動的に共有の値を増加させたい場合、以下のようにするのでは不十分です

counter.value += 1

関連するロックが再帰的 (それがデフォルトです) なら、かわりに次のようにします

with counter.get_lock():
    counter.value += 1

lock はキーワード引数でのみ指定することに注意してください。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

共有メモリから割り当てられた ctypes 配列を返します。デフォルトでは、返り値は実際の配列の同期ラッパーです。

typecode_or_type は返される配列の要素の型を決めます。それは ctypes の型か array モジュールで使用されるような1文字の型コードかのどちらか一方です。 size_or_initializer が整数なら、配列の長さを決定し、その配列はゼロで初期化されます。別の使用方法として size_or_initializer は配列の初期化に使用されるシーケンスになり、そのシーケンス長が配列の長さを決定します。

lockTrue (デフォルト) なら、値へ同期アクセスするために新たなロックオブジェクトが作成されます。 lockLockRLock なら値への同期アクセスに使用されます。 lockFalse なら、返されたオブジェクトへのアクセスはロックにより自動的に保護されません。そのため、必ずしも “プロセスセーフ” ではありません。

lock はキーワード引数としてのみ利用可能なことに注意してください。

ctypes.c_char の配列は文字列を格納して取り出せる valueraw 属性を持っていることを覚えておいてください。

16.6.2.6.1. multiprocessing.sharedctypes モジュール

multiprocessing.sharedctypes モジュールは子プロセスに継承される共有メモリの ctypes オブジェクトを割り当てる関数を提供します。

注釈

共有メモリのポインターを格納することは可能ではありますが、特定プロセスのアドレス空間の位置を参照するということを覚えておいてください。しかし、そのポインターは別のプロセスのコンテキストにおいて無効になる確率が高いです。そして、別のプロセスからそのポインターを逆参照しようとするとクラッシュを引き起こす可能性があります。

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

共有メモリから割り当てられた ctypes 配列を返します。

typecode_or_type は返される配列の要素の型を決めます。それは ctypes の型か array モジュールで使用されるような1文字の型コードのどちらか一方です。 size_or_initializer が整数なら、それが配列の長さになり、その配列はゼロで初期化されます。別の使用方法として size_or_initializer には配列の初期化に使用されるシーケンスを設定することもでき、その場合はシーケンスの長さが配列の長さになります。

要素を取得したり設定したりすることは潜在的に非アトミックであることに注意してください。ロックを使用して自動的に同期化されたアクセスを保証するには Array() を使用してください。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

共有メモリから割り当てられた ctypes オブジェクトを返します。

typecode_or_type は返されるオブジェクトの型を決めます。それは ctypes の型か array モジュールで使用されるような1文字の型コードかのどちらか一方です。 *args は型のコンストラクターへ渡されます。

値を取得したり設定したりすることは潜在的に非アトミックであることに注意してください。ロックを使用して自動的に同期化されたアクセスを保証するには Value() を使用してください。

ctypes.c_char の配列は文字列を格納して取り出せる valueraw 属性を持っていることを覚えておいてください。詳細は ctypes を参照してください。

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *args[, lock])

RawArray() と同様ですが、 lock の値によっては ctypes 配列をそのまま返す代わりに、プロセスセーフな同期ラッパーが返されます。

lockTrue (デフォルト) なら、値へ同期アクセスするために新たな ロックオブジェクトが作成されます。 lockLockRLock なら値への同期アクセスに使用されます。 lockFalse なら、返された オブジェクトへのアクセスはロックにより自動的に保護されません。 そのため、必ずしも “プロセスセーフ” ではありません。

lock はキーワード引数でのみ指定することに注意してください。

multiprocessing.sharedctypes.Value(typecode_or_type, *args[, lock])

RawValue() と同様ですが、 lock の値によっては ctypes オブジェクトをそのまま返す代わりに、プロセスセーフな同期ラッパーが返されます。

lockTrue (デフォルト) なら、値へ同期アクセスするために新たな ロックオブジェクトが作成されます。 lockLockRLock なら値への同期アクセスに使用されます。 lockFalse なら、返された オブジェクトへのアクセスはロックにより自動的に保護されません。 そのため、必ずしも “プロセスセーフ” ではありません。

lock はキーワード引数でのみ指定することに注意してください。

multiprocessing.sharedctypes.copy(obj)

共有メモリから割り当てられた ctypes オブジェクト obj をコピーしたオブジェクトを返します。

multiprocessing.sharedctypes.synchronized(obj[, lock])

同期アクセスに lock を使用する ctypes オブジェクトのためにプロセスセーフなラッパーオブジェクトを返します。 lockNone (デフォルト) なら、 multiprocessing.RLock オブジェクトが自動的に作成されます。

同期ラッパーがラップするオブジェクトに加えて2つのメソッドがあります。 get_obj() はラップされたオブジェクトを返します。 get_lock() は同期のために使用されるロックオブジェクトを返します。

ラッパー経由で ctypes オブジェクトにアクセスすることは raw ctypes オブジェクトへアクセスするよりずっと遅くなることに注意してください。

次の表は通常の ctypes 構文で共有メモリから共有 ctypes オブジェクトを作成するための構文を比較します。 (MyStruct テーブル内には ctypes.Structure のサブクラスがあります。)

ctypes

type を使用する sharedctypes

typecode を使用する sharedctypes

c_double(2.4) RawValue(c_double, 2.4) RawValue(‘d’, 2.4)
MyStruct(4, 6) RawValue(MyStruct, 4, 6)  
(c_short * 7)() RawArray(c_short, 7) RawArray(‘h’, 7)
(c_int * 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray(‘i’, (9, 2, 8))

以下に子プロセスが多くの ctypes オブジェクトを変更する例を紹介します:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', 'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print n.value
    print x.value
    print s.value
    print [(a.x, a.y) for a in A]

結果は以下のように表示されます

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

16.6.2.7. マネージャー

Manager は別のプロセス間で共有されるデータの作成方法を提供します。マネージャオブジェクトは 共有オブジェクト を管理するサーバプロセスを制御します。他のプロセスはプロキシ経由で共有オブジェクトへアクセスすることができます。

multiprocessing.Manager()

プロセス間でオブジェクトを共有するために使用される SyncManager オブジェクトを返します。返されたマネージャーオブジェクトは生成される子プロセスに対応付けられ、共有オブジェクトを作成するメソッドや、共有オブジェクトに対応するプロキシを返すメソッドを持ちます。

マネージャープロセスは親プロセスが終了するか、ガベージコレクトされると停止します。マネージャークラスは multiprocessing.managers モジュールで定義されています:

class multiprocessing.managers.BaseManager([address[, authkey]])

BaseManager オブジェクトを作成します。

作成後、start()get_server().serve_forever() を呼び出して、マネージャーオブジェクトが確実に開始されたマネージャープロセスを参照するようにしてください。

address はマネージャープロセスが新たなコネクションを待ち受けるアドレスです。addressNone の場合、任意のアドレスが設定されます。

authkey はサーバプロセスへ接続しようとするコネクションの正当性を検証するために使用される認証キーです。 authkeyNone の場合 current_process().authkey が使用されます。 authkey を使用する場合は文字列でなければなりません。

start([initializer[, initargs]])

マネージャーを開始するためにサブプロセスを開始します。initializerNone でなければ、サブプロセスは開始時に initializer(*initargs) を呼び出します。

get_server()

マネージャーの制御下にある実際のサーバーを表す Server オブジェクトを返します。 Server オブジェクトは serve_forever() メソッドをサポートします:

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey='abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server はさらに address 属性も持っています。

connect()

ローカルからリモートのマネージャーオブジェクトへ接続します:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey='abc')
>>> m.connect()
shutdown()

マネージャーが使用するプロセスを停止します。これはサーバープロセスを開始するために start() が使用された場合のみ有効です。

これは複数回呼び出すことができます。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

マネージャークラスで呼び出し可能オブジェクト(callable)や型を登録するために使用されるクラスメソッドです。

typeid は特に共有オブジェクトの型を識別するために使用される “型識別子” です。これは文字列でなければなりません。

callable はこの型識別子のオブジェクトを作成するために使用される呼び出し可能オブジェクトです。マネージャインスタンスが from_address() クラスメソッドを使用して作成されるか、 create_method 引数が False の場合は None でも構いません。

proxytype はこの typeid で共有オブジェクトのプロキシを作成するために使用される BaseProxy のサブクラスです。 None の場合、プロキシクラスは自動的に作成されます。

exposedBaseProxy._callmethod() を使用したアクセスが許されるべき typeid をプロキシするメソッド名のシーケンスを指定するために使用されます (exposedNone の場合 proxytype._exposed_ が存在すればそれが代わりに使用されます)。exposed リストが指定されない場合、共有オブジェクトのすべての “パブリックメソッド” がアクセス可能になります。 (ここでいう “パブリックメソッド” とは __call__() メソッドを持つものと名前が '_' で始まらないあらゆる属性を意味します。)

method_to_typeid はプロキシが返す exposed メソッドの返り値の型を指定するために使用されるマッピングで、メソッド名を typeid 文字列にマップします。 (method_to_typeidNone の場合 proxytype._method_to_typeid_ が存在すれば、それが代わりに使用されます。) メソッド名がこのマッピングのキーではないか、マッピングが None の場合、そのメソッドによって返されるオブジェクトが値として (by value) コピーされます。

create_method は、共有オブジェクトを作成し、それに対するプロキシを返すようサーバープロセスに伝える、名前 typeid のメソッドを作成するかを決定します。デフォルトでは True です。

BaseManager インスタンスも読み取り専用属性を1つ持っています:

address

マネージャーが使用するアドレスです。

class multiprocessing.managers.SyncManager

プロセス間の同期のために使用される BaseManager のサブクラスです。 multiprocessing.Manager() はこの型のオブジェクトを返します。

また共有のリストやディクショナリの作成もサポートします。

BoundedSemaphore([value])

共有 threading.BoundedSemaphore オブジェクトを作成して、そのプロキシを返します。

Condition([lock])

共有 threading.Condition オブジェクトを作成して、そのプロキシを返します。

lock が提供される場合 threading.Lockthreading.RLock オブジェクトのためのプロキシになります。

Event()

共有 threading.Event オブジェクトを作成して、そのプロキシを返します。

Lock()

共有 threading.Lock オブジェクトを作成して、そのプロキシを返します。

Namespace()

共有 Namespace オブジェクトを作成して、そのプロキシを返します。

Queue([maxsize])

共有 Queue.Queue オブジェクトを作成して、そのプロキシを返します。

RLock()

共有 threading.RLock オブジェクトを作成して、そのプロキシを返します。

Semaphore([value])

共有 threading.Semaphore オブジェクトを作成して、そのプロキシを返します。

Array(typecode, sequence)

配列を作成して、そのプロキシを返します。

Value(typecode, value)

書き込み可能な value 属性を作成して、そのプロキシを返します。

dict()
dict(mapping)
dict(sequence)

共有 dict オブジェクトを作成して、そのプロキシを返します。

list()
list(sequence)

共有 list オブジェクトを作成して、そのプロキシを返します。

注釈

プロキシには、自身の持つ辞書とリストのミュータブルな値や項目がいつ変更されたのかを知る方法がないため、これらの値や項目の変更はマネージャーを通して伝播しません。このような要素を変更するには、コンテナーのプロキシに変更されたオブジェクトを再代入してください:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# reassigning the dictionary, the proxy is notified of the change
lproxy[0] = d
class multiprocessing.managers.Namespace

SyncManager に登録することのできる型です。

Namespace オブジェクトにはパブリックなメソッドはありませんが、書き込み可能な属性を持ちます。そのオブジェクト表現はその属性の値を表示します。

しかし、Namespace オブジェクトのためにプロキシを使用するとき '_' が先頭に付く属性はプロキシの属性になり、参照対象の属性にはなりません:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print Global
Namespace(x=10, y='hello')

16.6.2.7.1. カスタマイズされたマネージャー

独自のマネージャーを作成するには、BaseManager のサブクラスを作成して、 マネージャークラスで呼び出し可能なオブジェクトか新たな型を登録するために register() クラスメソッドを使用します。例えば:

from multiprocessing.managers import BaseManager

class MathsClass(object):
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    manager = MyManager()
    manager.start()
    maths = manager.Maths()
    print maths.add(4, 3)         # prints 7
    print maths.mul(7, 8)         # prints 56

16.6.2.7.2. リモートマネージャーを使用する

あるマシン上でマネージャーサーバーを実行して、他のマシンからそのサーバーを使用するクライアントを持つことができます(ファイアウォールを通過できることが前提)。

次のコマンドを実行することでリモートクライアントからアクセスを受け付ける1つの共有キューのためにサーバーを作成します:

>>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

あるクライアントからサーバーへのアクセスは次のようになります:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

別のクライアントもそれを使用することができます:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

ローカルプロセスもそのキューへアクセスすることができます。クライアント上で上述のコードを使用してアクセスします:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

16.6.2.8. Proxy オブジェクト

プロキシは別のプロセスで(おそらく)有効な共有オブジェクトを 参照する オブジェクトです。共有オブジェクトはプロキシの 参照対象 になるということができます。複数のプロキシオブジェクトが同じ参照対象を持つ可能性もあります。

プロキシオブジェクトはその参照対象が持つ対応メソッドを実行するメソッドを持ちます。(そうは言っても、参照対象のすべてのメソッドが必ずしもプロキシ経由で利用可能ではありません) プロキシは通常その参照対象ができることと同じ方法で使用されます:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print l
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print repr(l)
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

プロキシに str() を適用すると参照対象のオブジェクト表現を返すのに対して、 repr() を適用するとプロキシのオブジェクト表現を返すことに注意してください。

プロキシオブジェクトの重要な機能はプロセス間で受け渡し可能な pickle 化ができることです。しかし、プロキシが対応するマネージャープロセスに対して送信される場合、そのプロキシを unpickle するとその参照対象を生成することを覚えておいてください。例えば、これはある共有オブジェクトに別の共有オブジェクトが含められることを意味します:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print a, b
[[]] []
>>> b.append('hello')
>>> print a, b
[['hello']] ['hello']

注釈

multiprocessing のプロキシ型は値による比較に対して何もサポートしません。そのため、例えば以下のようになります:

>>> manager.list([1,2,3]) == [1,2,3]
False

比較を行いたいときは参照対象のコピーを使用してください。

class multiprocessing.managers.BaseProxy

プロキシオブジェクトは BaseProxy のサブクラスのインスタンスです。

_callmethod(methodname[, args[, kwds]])

プロキシの参照対象のメソッドの実行結果を返します。

proxy がプロキシで、プロキシ内の参照対象が obj ならこの式

proxy._callmethod(methodname, args, kwds)

はこの式を評価します

getattr(obj, methodname)(*args, **kwds)

(マネージャープロセス内の)。

返される値はその呼び出し結果のコピーか、新たな共有オブジェクトに対するプロキシになります。詳細は BaseManager.register()method_to_typeid 引数のドキュメントを参照してください。

その呼び出しによって例外が発生した場合、_callmethod() によってその例外は再送出されます。他の例外がマネージャープロセスで発生したなら、RemoteError 例外に変換されたものが _callmethod() によって送出されます。

特に methodname公開 されていない場合は例外が発生することに注意してください。

_callmethod() の使用例になります:

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getslice__', (2, 7))   # equiv to `l[2:7]`
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))     # equiv to `l[20]`
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

参照対象のコピーを返します。

参照対象が unpickle 化できるなら例外を発生します。

__repr__()

プロキシオブジェクトのオブジェクト表現を返します。

__str__()

参照対象のオブジェクト表現を返します。

16.6.2.8.1. クリーンアップ

プロキシオブジェクトは弱参照(weakref)コールバックを使用します。プロキシオブジェクトがガベージコレクトされるときにその参照対象が所有するマネージャーからその登録を取り消せるようにするためです。

共有オブジェクトはプロキシが参照しなくなったときにマネージャープロセスから削除されます。

16.6.2.9. プロセスプール

Pool クラスでタスクを実行するプロセスのプールを作成することができます。

class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])

プロセスプールオブジェクトは、ジョブを送り込めるワーカープロセスのプールを制御します。タイムアウトやコールバックのある非同期の実行をサポートし、並列 map 実装を持ちます。

processes は使用するワーカープロセスの数です。 processesNone の場合 cpu_count() が返す数を使用します。 initializerNone でない場合、各ワーカープロセスが開始時に initializer(*initargs) を呼び出します。

プールオブジェクトのメソッドは、そのプールを作成したプロセスのみが呼び出すべきです。

バージョン 2.7 で追加: maxtasksperchild は、ワーカープロセスが exit して新たなワーカープロセスと置き替えられるまでの間に、ワーカープロセスが完了することのできるタスクの数です。この設定により未利用のリソースが解放されるようなります。デフォルトの maxtasksperchildNone で、これはワーカープロセスがプールと同じ期間だけ生き続けるということを意味します。

注釈

Pool 中のワーカープロセスは、典型的にはプールのワークキューの存続期間とちょうど同じだけ生き続けます。ワーカーに確保されたリソースを解放するために (Apache, mod_wsgi, などのような) 他のシステムによく見られるパターンは、プール内のワーカーが設定された量だけの仕事を完了したら exit とクリーンアップを行い、古いプロセスを置き換えるために新しいプロセスを生成するというものです。 Poolmaxtasksperchild 引数は、この能力をエンドユーザーに提供します。

apply(func[, args[, kwds]])

組み込み関数 apply() の同等品で、結果が取得出来るようになるまでブロックします。ですから作業を並列に実行するのにより相応しいのは apply_async() です。加えて func はプールのワーカーのうち一つの中で実行されるに過ぎません。

apply_async(func[, args[, kwds[, callback]]])

apply() メソッドの派生版で結果オブジェクトを返します。

callback を指定する場合は 1 つの引数を受け取る呼び出し可能オブジェクトでなければなりません。その結果を返せるようになったときに callback が結果オブジェクトに対して (その呼び出しが失敗しない限り)適用されます。コールバックは直ちに完了すべきです。なぜなら、そうしなければ、結果を扱うスレッドがブロックするからです。

map(func, iterable[, chunksize])

map() 組み込み関数の並列版です (iterable な引数を1つだけサポートするという違いはありますが)。結果が出るまでブロックします。

このメソッドはイテラブルをいくつものチャンクに分割し、プロセスプールにそれぞれ独立したタスクとして送ります。(概算の) チャンクサイズは chunksize を正の整数に設定することで指定できます。

map_async(func, iterable[, chunksize[, callback]])

map() メソッドの派生版で結果オブジェクトを返します。

callback を指定する場合は 1 つの引数を受け取る呼び出し可能オブジェクトでなければなりません。その結果を返せるようになったときに callback が結果オブジェクトに対して (その呼び出しが失敗しない限り)適用されます。コールバックは直ちに完了すべきです。なぜなら、そうしなければ、結果を扱うスレッドがブロックするからです。

imap(func, iterable[, chunksize])

itertools.imap() と同じです。

chunksize 引数は map() メソッドで使用されるものと同じです。 引数 iterable がとても長いなら chunksize に大きな値を指定して使用する方がデフォルト値の 1 を使用するよりもジョブの完了が かなり 速くなります。

また chunksize1 の場合 imap() メソッドが返すイテレーターの next() メソッドはオプションで timeout パラメーターを持ちます。 next(timeout) は、その結果が timeout 秒以内に返されないときに multiprocessing.TimeoutError を発生させます。

imap_unordered(func, iterable[, chunksize])

イテレーターが返す結果の順番が任意の順番で良いと見なされることを除けば imap() と同じです。 (ワーカープロセスが1つしかない場合のみ “正しい” 順番になることが保証されます。)

close()

これ以上プールでタスクが実行されないようにします。すべてのタスクが完了した後でワーカープロセスが終了します。

terminate()

実行中の処理を完了させずにワーカープロセスをすぐに停止します。プールオブジェクトがガベージコレクトされるときに terminate() が呼び出されます。

join()

ワーカープロセスが終了するのを待ちます。 join() を使用する前に close()terminate() を呼び出さなければなりません。

class multiprocessing.pool.AsyncResult

Pool.apply_async()Pool.map_async() で返される結果のクラスです。

get([timeout])

結果を受け取ったときに返します。 timeoutNone ではなくて、その結果が timeout 秒以内に受け取れない場合 multiprocessing.TimeoutError が発生します。リモートの呼び出しが例外を発生させる場合、その例外は get() が再発生させます。

wait([timeout])

その結果が有効になるか timeout 秒経つまで待ちます。

ready()

その呼び出しが完了しているかどうかを返します。

successful()

その呼び出しが例外を発生させることなく完了したかどうかを返します。その結果が返せる状態でない場合 AssertionError が発生します。

次の例はプールの使用例を紹介します:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    result = pool.apply_async(f, (10,))   # evaluate "f(10)" asynchronously in a single process
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow

    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

    it = pool.imap(f, range(10))
    print it.next()                       # prints "0"
    print it.next()                       # prints "1"
    print it.next(timeout=1)              # prints "4" unless your computer is *very* slow

    result = pool.apply_async(time.sleep, (10,))
    print result.get(timeout=1)           # raises multiprocessing.TimeoutError

16.6.2.10. リスナーとクライアント

通常、プロセス間でメッセージを渡すにはキューを使用するか Pipe() が返す Connection オブジェクトを使用します。

しかし multiprocessing.connection モジュールはさらに柔軟な仕組みがあります。基本的にはソケットもしくは Windows の名前付きパイプを扱う高レベルのメッセージ指向 API を提供して hmac モジュールを使用して ダイジェスト認証 もサポートします。

multiprocessing.connection.deliver_challenge(connection, authkey)

ランダム生成したメッセージをコネクションの相手側へ送信して応答を待ちます。

その応答がキーとして authkey を使用するメッセージのダイジェストと一致する場合、コネクションの相手側へ歓迎メッセージを送信します。そうでなければ AuthenticationError を発生させます。

multiprocessing.connection.answer_challenge(connection, authkey)

メッセージを受信して、そのキーとして authkey を使用するメッセージのダイジェストを計算し、ダイジェストを送り返します。

歓迎メッセージを受け取れない場合 AuthenticationError が発生します。

multiprocessing.connection.Client(address[, family[, authenticate[, authkey]]])

address で渡したアドレスを使用するリスナーに対してコネクションを確立しようとして Connection を返します。

コネクション種別は family 引数で決定しますが、一般的には address のフォーマットから推測できるので、これは指定されません。 (アドレスフォーマット を参照してください)

authenticateTrueauthkey が文字列の場合、ダイジェスト認証が使用されます。認証に使用されるキーは authkey 、又は authkeyNone の場合は current_process().authkey のどちらかです。認証が失敗した場合 AuthenticationError が発生します。 認証キー を参照してください。

class multiprocessing.connection.Listener([address[, family[, backlog[, authenticate[, authkey]]]]])

コネクションを ‘待ち受ける’ 束縛されたソケットか Windows の名前付きパイプのラッパーです。

address はリスナーオブジェクトの束縛されたソケットか名前付きパイプが使用するアドレスです。

注釈

‘0.0.0.0’ のアドレスを使用する場合、Windows 上の終点へ接続することができません。終点へ接続したい場合は ‘127.0.0.1’ を使用すべきです。

family は使用するソケット(名前付きパイプ)の種別です。これは 'AF_INET' (TCP ソケット), 'AF_UNIX' (Unix ドメインソケット) または 'AF_PIPE' (Windows 名前付きパイプ) という文字列のどれか1つになります。これらのうち 'AF_INET' のみが利用可能であることが保証されています。 familyNone の場合 address のフォーマットから推測されたものが使用されます。 addressNone の場合はデフォルトが選択されます。詳細は アドレスフォーマット を参照してください。 family'AF_UNIX'addressNone の場合 tempfile.mkstemp() を使用して作成されたプライベートな一時ディレクトリにソケットが作成されます。

リスナーオブジェクトがソケットを使用する場合、ソケットに束縛されるときに backlog (デフォルトでは1つ) がソケットの listen() メソッドに対して渡されます。

authenticateTrue (デフォルトでは False) か authkeyNone ではない場合、ダイジェスト認証が使用されます。

authkey が文字列の場合、認証キーとして使用されます。そうでない場合は None でなければいけません。

authkeyNone 且つ authenticateTrue の場合 current_process().authkey が認証キーとして使用されます。 authkeyNone 且つ authenticationFalse の場合、認証は行われません。もし認証が失敗した場合 AuthenticationError が発生します。詳細 認証キー を参照してください。

accept()

リスナーオブジェクトの名前付きパイプか束縛されたソケット上でコネクションを受け付けて Connection オブジェクトを返します。認証が失敗した場合 AuthenticationError が発生します。

close()

リスナーオブジェクトの名前付きパイプか束縛されたソケットをクローズします。これはリスナーがガベージコレクトされるときに自動的に呼ばれます。そうは言っても、明示的に close() を呼び出す方が望ましいです。

リスナーオブジェクトは次の読み取り専用属性を持っています:

address

リスナーオブジェクトが使用中のアドレスです。

last_accepted

最後にコネクションを受け付けたアドレスです。有効なアドレスがない場合は None になります。

このモジュールは2つの例外を定義します。

exception multiprocessing.connection.AuthenticationError

認証エラーが起こったときに例外が発生します。

次のサーバーコードは認証キーとして 'secret password' を使用するリスナーを作成します。このサーバーはコネクションを待ってクライアントへデータを送信します:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'
listener = Listener(address, authkey='secret password')

conn = listener.accept()
print 'connection accepted from', listener.last_accepted

conn.send([2.25, None, 'junk', float])

conn.send_bytes('hello')

conn.send_bytes(array('i', [42, 1729]))

conn.close()
listener.close()

次のコードはサーバーへ接続して、サーバーからデータを受信します:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)
conn = Client(address, authkey='secret password')

print conn.recv()                 # => [2.25, None, 'junk', float]

print conn.recv_bytes()            # => 'hello'

arr = array('i', [0, 0, 0, 0, 0])
print conn.recv_bytes_into(arr)     # => 8
print arr                         # => array('i', [42, 1729, 0, 0, 0])

conn.close()

16.6.2.10.1. アドレスフォーマット

  • 'AF_INET' アドレスは (hostname, port) のタプルになります。 hostname は文字列で port は整数です。

  • 'AF_UNIX' アドレスはファイルシステム上のファイル名の文字列です。

  • 'AF_PIPE' アドレスは、次の形式を持つ文字列です

    r'\\.\pipe\PipeName'ServerName という名前のリモートコンピューター上の名前付きパイプに接続するために Client() を使用するには、代わりに r'\\ServerName\pipe\PipeName' 形式のアドレスを使用する必要があります。

デフォルトでは、2つのバックスラッシュで始まる文字列は 'AF_UNIX' よりも 'AF_PIPE' として推測されることに注意してください。

16.6.2.11. 認証キー

Connection.recv を使用するとき、データは自動的に unpickle されて受信します。信頼できない接続元からのデータを unpickle することはセキュリティリスクがあります。そのため ListenerClient() はダイジェスト認証を提供するために hmac モジュールを使用します。

認証キーはパスワードとしてみなされる文字列です。コネクションが確立すると、双方の終点で正しい接続先であることを証明するために知っているお互いの認証キーを要求します。 (双方の終点が同じキーを使用して通信しようとしても、コネクション上でそのキーを送信することは できません 。)

認証が要求されているにもかかわらず認証キーが指定されていない場合 current_process().authkey の返す値が使用されます。 (詳細は Process を参照してください。) この値はカレントプロセスを作成する Process オブジェクトによって自動的に継承されます。 これは(デフォルトでは)複数プロセスのプログラムの全プロセスが相互にコネクションを 確立するときに使用される1つの認証キーを共有することを意味します。

適当な認証キーを os.urandom() を使用して生成することもできます。

16.6.2.12. ログ記録

ロギングのためにいくつかの機能が利用可能です。しかし logging パッケージは、 (ハンドラー種別に依存して)違うプロセスからのメッセージがごちゃ混ぜになるので、プロセスの共有ロックを使用しないことに注意してください。

multiprocessing.get_logger()

multiprocessing が使用するロガーを返します。必要に応じて新たなロガーを作成します。

最初に作成するとき、ロガーはレベルに logging.NOTSET が設定されていてデフォルトハンドラーがありません。このロガーへ送られるメッセージはデフォルトではルートロガーへ伝播されません。

Windows 上では子プロセスが親プロセスのロガーレベルを継承しないことに注意してください。さらにその他のロガーのカスタマイズ内容もすべて継承されません。

multiprocessing.log_to_stderr()

この関数は get_logger() に対する呼び出しを実行しますが、 get_logger によって作成されるロガーを返すことに加えて、 '[%(levelname)s/%(processName)s] %(message)s' のフォーマットを使用して sys.stderr へ出力を送るハンドラーを追加します。

以下にロギングを有効にした例を紹介します:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

これらの2つのロギング関数があることに加えて、 multiprocessing モジュールも2つの追加ロギングレベル属性を提供します。それは SUBWARNINGSUBDEBUG です。次の表は通常のレベル階層にうまく適合していることを表します。

レベル

Numeric value
SUBWARNING 25
SUBDEBUG 5

完全なロギングレベルの表については logging モジュールを参照してください。

こういった追加のロギングレベルは主に multiprocessing モジュールの信頼できるデバッグメッセージのために使用されます。以下に上述の例に SUBDEBUG を有効にしたものを紹介します:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(multiprocessing.SUBDEBUG)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
>>> del m
[SUBDEBUG/MainProcess] finalizer calling ...
[INFO/MainProcess] sending shutdown message to manager
[DEBUG/SyncManager-...] manager received shutdown message
[SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
[SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
[SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
[SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
[INFO/SyncManager-...] manager exiting with exitcode 0

16.6.2.13. multiprocessing.dummy モジュール

multiprocessing.dummymultiprocessing の API を複製しますが threading モジュールのラッパーでしかありません。

16.6.3. プログラミングガイドライン

multiprocessing を使用するときに守るべき一定のガイドラインとイディオムを挙げます。

16.6.3.1. 全てのプラットホーム

共有状態を避ける

できるだけプロセス間で巨大なデータを移動することは避けるようにすべきです。

プロセス間の通信には、 threading モジュールの低レベルな同期プリミティブを使うのではなく、キューやパイプを使うのが良いでしょう。

pickle 化の可能性

プロキシのメソッドへの引数は、 pickle 化できるものにしてください。

プロキシのスレッドセーフ性

1 つのプロキシオブジェクトは、ロックで保護しないかぎり、2 つ以上のスレッドから使用してはいけません。

(異なるプロセスで 同じ プロキシを使用することは問題ではありません。)

ゾンビプロセスを join する

Unix 上ではプロセスが終了したときに join しないと、そのプロセスはゾンビになります。新たなプロセスが開始する (または active_children() が呼ばれる) ときに、join されていないすべての完了プロセスが join されるので、あまり多くにはならないでしょう。また、終了したプロセスの Process.is_alive はそのプロセスを join します。そうは言っても、自分で開始したすべてのプロセスを明示的に join することはおそらく良いプラクティスです。

pickle/unpickle より継承する方が良い

Windows 上では multiprocessing の多くの型は子プロセスが使用できるようにするため pickle 化できなければなりません。しかし、パイプやキューを使用して他のプロセスへ共有オブジェクトを送ることは一般的に避けるべきです。その代わり、どこかで作成された共有リソースへのアクセスが必要なプロセスは、祖先のプロセスからそれを継承するようにプログラムを変更すべきです。

プロセスの強制終了を避ける

あるプロセスを停止するために Process.terminate メソッドを使用すると、そのプロセスが現在使用されている (ロック、セマフォ、パイプやキューのような) 共有リソースを破壊したり他のプロセスから利用できない状態を引き起こし易いです。

そのため、共有リソースを使用しないプロセスでのみ Process.terminate を使用することを考慮することがおそらく最善の方法です。

キューを使用するプロセスを join する

キューに要素を追加するプロセスは、全てのバッファされた要素が “feeder” スレッドによって下位層のパイプに対してフィードされるまで終了を待つということを覚えておいてください。 (子プロセスはこの動作を避けるためにキューの cancel_join_thread() メソッドを呼ぶことができます。)

これはキューを使用するときに、キューに追加されたすべての要素が最終的にそのプロセスが join される前に削除されていることを確認する必要があることを意味します。そうしないと、そのキューに要素が追加したプロセスの終了を保証できません。デーモンではないプロセスは自動的に join されることも覚えておいてください。

次の例はデッドロックを引き起こします:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

修正するには最後の2行を入れ替えます(または単純に p.join() の行を削除します)。

明示的に子プロセスへリソースを渡す

Unix 上では子プロセスはグローバルなリソースを使用する親プロセスが作成した共有リソースを使用することができます。しかし、引数としてそのオブジェクトを子プロセスのコンストラクタへ渡す方が良いです。

これにより、コードが (潜在的に) Windows 互換になるだけでなく、子プロセスが生き続ける限り、そのオブジェクトは親プロセスでガベージコレクトされないことも保証されます。これは親プロセスでそのオブジェクトがガベージコレクトされるときにリソースが解放される場合に重要になるでしょう。

そのため、例えば

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

は、次のように書き直すべきです

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

sys.stdin を file-like オブジェクトに置き換えることに注意する

multiprocessing は元々無条件に:

os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap() メソッドの中で呼び出していました — これはプロセス内プロセス (processes-in-processes) で問題が起こしてしまいます。そこで、これは以下のように変更されました:

sys.stdin.close()
sys.stdin = open(os.devnull)

これによってプロセス同士が衝突して bad file descripter エラーを起こすという根本的な問題は解決しましたが、アプリケーションの出力バッファーを sys.stdin() から “file-like オブジェクト” に置き換えるという潜在的危険を持ち込んでしまいました。危険というのは、複数のプロセスが file-like オブジェクトの close() を呼び出すと、オブジェクトに同じデータが何度もフラッシュされ、破損してしまう可能性がある、というものです。

もし file-like オブジェクトを書いて独自のキャッシュを実装するなら、キャッシュするときに常に pid を記録しておき、pid が変わったらキュッシュを捨てることで、フォークセーフにできます。例:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

より詳しい情報は bpo-5155bpo-5313bpo-5331 を見てください

16.6.3.2. Windows

Windows には os.fork() がないのでいくつか追加制限があります:

さらなる pickle 化の可能性

Process.__init__() へ渡す全ての引数は、pickle化できるものにしてください。これはとりわけ、Windows 上で、束縛メソッドや非束縛メソッドを直接 target 引数として使ってはならない、ということを意味します。メソッドではなく、関数を使うしかありません。

また Process をサブクラス化する場合、そのインスタンスが Process.start メソッドが呼ばれたときに pickle 化できるようにしてください。

グローバル変数

子プロセスで実行されるコードがグローバル変数にアクセスしようとする場合、子プロセスが見るその値は Process.start が呼ばれたときの親プロセスの値と同じではない可能性があります。

しかし、単にモジュールレベルの定数であるグローバル変数なら問題にはなりません。

メインモジュールの安全なインポート

新たな Python インタプリタによるメインモジュールのインポートが、意図しない副作用 (新たなプロセスを開始する等) を起こさずできるようにしてください。

例えば Windows で次のモジュールを実行しようとすると RuntimeError で失敗します:

from multiprocessing import Process

def foo():
    print 'hello'

p = Process(target=foo)
p.start()

代わりに、次のように if __name__ == '__main__': を使用してプログラムの “エントリポイント” を保護すべきです:

from multiprocessing import Process, freeze_support

def foo():
    print 'hello'

if __name__ == '__main__':
    freeze_support()
    p = Process(target=foo)
    p.start()

(freeze_support() 行はプログラムが固まらずに通常どおり実行されるなら取り除けます。)

これは新たに生成された Python インタープリターがそのモジュールを安全にインポートして、モジュールの foo() 関数を実行します。

プールまたはマネージャーがメインモジュールで作成される場合に似たような制限が適用されます。

16.6.4. 例

カスタマイズされたマネージャーやプロキシの作成方法と使用方法を紹介します:

#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo(object):
    def f(self):
        print 'you called Foo.f()'
    def g(self):
        print 'you called Foo.g()'
    def _h(self):
        print 'you called Foo._h()'

# A simple generator function
def baz():
    for i in xrange(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ('next', '__next__')
    def __iter__(self):
        return self
    def next(self):
        return self._callmethod('next')
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print '-' * 20

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print '-' * 20

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print '-' * 20

    it = manager.baz()
    for i in it:
        print '<%d>' % i,
    print

    print '-' * 20

    op = manager.operator()
    print 'op.add(23, 45) =', op.add(23, 45)
    print 'op.pow(2, 94) =', op.pow(2, 94)
    print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
    print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
    print 'op._exposed_ =', op._exposed_

##

if __name__ == '__main__':
    freeze_support()
    test()

Pool を使用する例です:

#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def f(x):
    return 1.0 / (x-5.0)

def pow3(x):
    return x**3

def noop(x):
    pass

#
# Test code
#

def test():
    print 'cpu_count() = %d\n' % multiprocessing.cpu_count()

    #
    # Create pool
    #

    PROCESSES = 4
    print 'Creating pool with %d processes\n' % PROCESSES
    pool = multiprocessing.Pool(PROCESSES)
    print 'pool = %s' % pool
    print

    #
    # Tests
    #

    TASKS = [(mul, (i, 7)) for i in range(10)] + \
            [(plus, (i, 8)) for i in range(10)]

    results = [pool.apply_async(calculate, t) for t in TASKS]
    imap_it = pool.imap(calculatestar, TASKS)
    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

    print 'Ordered results using pool.apply_async():'
    for r in results:
        print '\t', r.get()
    print

    print 'Ordered results using pool.imap():'
    for x in imap_it:
        print '\t', x
    print

    print 'Unordered results using pool.imap_unordered():'
    for x in imap_unordered_it:
        print '\t', x
    print

    print 'Ordered results using pool.map() --- will block till complete:'
    for x in pool.map(calculatestar, TASKS):
        print '\t', x
    print

    #
    # Simple benchmarks
    #

    N = 100000
    print 'def pow3(x): return x**3'

    t = time.time()
    A = map(pow3, xrange(N))
    print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
          (N, time.time() - t)

    t = time.time()
    B = pool.map(pow3, xrange(N))
    print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
          (N, time.time() - t)

    t = time.time()
    C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
    print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
          ' seconds' % (N, N//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    L = [None] * 1000000
    print 'def noop(x): pass'
    print 'L = [None] * 1000000'

    t = time.time()
    A = map(noop, L)
    print '\tmap(noop, L):\n\t\t%s seconds' % \
          (time.time() - t)

    t = time.time()
    B = pool.map(noop, L)
    print '\tpool.map(noop, L):\n\t\t%s seconds' % \
          (time.time() - t)

    t = time.time()
    C = list(pool.imap(noop, L, chunksize=len(L)//8))
    print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
          (len(L)//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    del A, B, C, L

    #
    # Test error handling
    #

    print 'Testing error handling:'

    try:
        print pool.apply(f, (5,))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from pool.apply()'
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print pool.map(f, range(10))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from pool.map()'
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print list(pool.imap(f, range(10)))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from list(pool.imap())'
    else:
        raise AssertionError('expected ZeroDivisionError')

    it = pool.imap(f, range(10))
    for i in range(10):
        try:
            x = it.next()
        except ZeroDivisionError:
            if i == 5:
                pass
        except StopIteration:
            break
        else:
            if i == 5:
                raise AssertionError('expected ZeroDivisionError')

    assert i == 9
    print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
    print

    #
    # Testing timeouts
    #

    print 'Testing ApplyResult.get() with timeout:',
    res = pool.apply_async(calculate, TASKS[0])
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % res.get(0.02))
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print
    print

    print 'Testing IMapIterator.next() with timeout:',
    it = pool.imap(calculatestar, TASKS)
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % it.next(0.02))
        except StopIteration:
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print
    print

    #
    # Testing callback
    #

    print 'Testing callback:'

    A = []
    B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

    r = pool.apply_async(mul, (7, 8), callback=A.append)
    r.wait()

    r = pool.map_async(pow3, range(10), callback=A.extend)
    r.wait()

    if A == B:
        print '\tcallbacks succeeded\n'
    else:
        print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)

    #
    # Check there are no outstanding tasks
    #

    assert not pool._cache, 'cache = %r' % pool._cache

    #
    # Check close() methods
    #

    print 'Testing close():'

    for worker in pool._pool:
        assert worker.is_alive()

    result = pool.apply_async(time.sleep, [0.5])
    pool.close()
    pool.join()

    assert result.get() is None

    for worker in pool._pool:
        assert not worker.is_alive()

    print '\tclose() succeeded\n'

    #
    # Check terminate() method
    #

    print 'Testing terminate():'

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
    pool.terminate()
    pool.join()

    for worker in pool._pool:
        assert not worker.is_alive()

    print '\tterminate() succeeded\n'

    #
    # Check garbage collection
    #

    print 'Testing garbage collection:'

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    processes = pool._pool
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]

    results = pool = None

    time.sleep(DELTA * 2)

    for worker in processes:
        assert not worker.is_alive()

    print '\tgarbage collection succeeded\n'


if __name__ == '__main__':
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
        print ' Using processes '.center(79, '-')
    elif sys.argv[1] == 'threads':
        print ' Using threads '.center(79, '-')
        import multiprocessing.dummy as multiprocessing
    else:
        print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
        raise SystemExit(2)

    test()

ロック、コンディションやキューのような同期の例を紹介します:

#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, random
from Queue import Empty

import multiprocessing               # may get overwritten


#### TEST_VALUE

def value_func(running, mutex):
    random.seed()
    time.sleep(random.random()*4)

    mutex.acquire()
    print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
    running.value -= 1
    mutex.release()

def test_value():
    TASKS = 10
    running = multiprocessing.Value('i', TASKS)
    mutex = multiprocessing.Lock()

    for i in range(TASKS):
        p = multiprocessing.Process(target=value_func, args=(running, mutex))
        p.start()

    while running.value > 0:
        time.sleep(0.08)
        mutex.acquire()
        print running.value,
        sys.stdout.flush()
        mutex.release()

    print
    print 'No more running processes'


#### TEST_QUEUE

def queue_func(queue):
    for i in range(30):
        time.sleep(0.5 * random.random())
        queue.put(i*i)
    queue.put('STOP')

def test_queue():
    q = multiprocessing.Queue()

    p = multiprocessing.Process(target=queue_func, args=(q,))
    p.start()

    o = None
    while o != 'STOP':
        try:
            o = q.get(timeout=0.3)
            print o,
            sys.stdout.flush()
        except Empty:
            print 'TIMEOUT'

    print


#### TEST_CONDITION

def condition_func(cond):
    cond.acquire()
    print '\t' + str(cond)
    time.sleep(2)
    print '\tchild is notifying'
    print '\t' + str(cond)
    cond.notify()
    cond.release()

def test_condition():
    cond = multiprocessing.Condition()

    p = multiprocessing.Process(target=condition_func, args=(cond,))
    print cond

    cond.acquire()
    print cond
    cond.acquire()
    print cond

    p.start()

    print 'main is waiting'
    cond.wait()
    print 'main has woken up'

    print cond
    cond.release()
    print cond
    cond.release()

    p.join()
    print cond


#### TEST_SEMAPHORE

def semaphore_func(sema, mutex, running):
    sema.acquire()

    mutex.acquire()
    running.value += 1
    print running.value, 'tasks are running'
    mutex.release()

    random.seed()
    time.sleep(random.random()*2)

    mutex.acquire()
    running.value -= 1
    print '%s has finished' % multiprocessing.current_process()
    mutex.release()

    sema.release()

def test_semaphore():
    sema = multiprocessing.Semaphore(3)
    mutex = multiprocessing.RLock()
    running = multiprocessing.Value('i', 0)

    processes = [
        multiprocessing.Process(target=semaphore_func,
                                args=(sema, mutex, running))
        for i in range(10)
        ]

    for p in processes:
        p.start()

    for p in processes:
        p.join()


#### TEST_JOIN_TIMEOUT

def join_timeout_func():
    print '\tchild sleeping'
    time.sleep(5.5)
    print '\n\tchild terminating'

def test_join_timeout():
    p = multiprocessing.Process(target=join_timeout_func)
    p.start()

    print 'waiting for process to finish'

    while 1:
        p.join(timeout=1)
        if not p.is_alive():
            break
        print '.',
        sys.stdout.flush()


#### TEST_EVENT

def event_func(event):
    print '\t%r is waiting' % multiprocessing.current_process()
    event.wait()
    print '\t%r has woken up' % multiprocessing.current_process()

def test_event():
    event = multiprocessing.Event()

    processes = [multiprocessing.Process(target=event_func, args=(event,))
                 for i in range(5)]

    for p in processes:
        p.start()

    print 'main is sleeping'
    time.sleep(2)

    print 'main is setting event'
    event.set()

    for p in processes:
        p.join()


#### TEST_SHAREDVALUES

def sharedvalues_func(values, arrays, shared_values, shared_arrays):
    for i in range(len(values)):
        v = values[i][1]
        sv = shared_values[i].value
        assert v == sv

    for i in range(len(values)):
        a = arrays[i][1]
        sa = list(shared_arrays[i][:])
        assert a == sa

    print 'Tests passed'

def test_sharedvalues():
    values = [
        ('i', 10),
        ('h', -2),
        ('d', 1.25)
        ]
    arrays = [
        ('i', range(100)),
        ('d', [0.25 * i for i in range(100)]),
        ('H', range(1000))
        ]

    shared_values = [multiprocessing.Value(id, v) for id, v in values]
    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]

    p = multiprocessing.Process(
        target=sharedvalues_func,
        args=(values, arrays, shared_values, shared_arrays)
        )
    p.start()
    p.join()

    assert p.exitcode == 0


####

def test(namespace=multiprocessing):
    global multiprocessing

    multiprocessing = namespace

    for func in [ test_value, test_queue, test_condition,
                  test_semaphore, test_join_timeout, test_event,
                  test_sharedvalues ]:

        print '\n\t######## %s\n' % func.__name__
        func()

    ignore = multiprocessing.active_children()      # cleanup any old processes
    if hasattr(multiprocessing, '_debug_info'):
        info = multiprocessing._debug_info()
        if info:
            print info
            raise ValueError('there should be no positive refcounts left')


if __name__ == '__main__':
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
        print ' Using processes '.center(79, '-')
        namespace = multiprocessing
    elif sys.argv[1] == 'manager':
        print ' Using processes and a manager '.center(79, '-')
        namespace = multiprocessing.Manager()
        namespace.Process = multiprocessing.Process
        namespace.current_process = multiprocessing.current_process
        namespace.active_children = multiprocessing.active_children
    elif sys.argv[1] == 'threads':
        print ' Using threads '.center(79, '-')
        import multiprocessing.dummy as namespace
    else:
        print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
        raise SystemExit(2)

    test(namespace)

ワーカープロセスのコレクションに対してタスクをフィードしてその結果をまとめるキューの使い方の例を紹介します:

#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue.  If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print 'Unordered results:'
    for i in range(len(TASKS1)):
        print '\t', done_queue.get()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print '\t', done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()

ワーカープロセスのプールが1つのソケットを共有してそれぞれの SimpleHTTPServer.HttpServer インスタンスを実行する方法の例を紹介します。

#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object.  (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import os
import sys

from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler

if sys.platform == 'win32':
    import multiprocessing.reduction    # make sockets pickable/inheritable


def note(format, *args):
    sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))


class RequestHandler(SimpleHTTPRequestHandler):
    # we override log_message() to show which process is handling the request
    def log_message(self, format, *args):
        note(format, *args)

def serve_forever(server):
    note('starting server')
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        pass


def runpool(address, number_of_processes):
    # create a single server object -- children will each inherit a copy
    server = HTTPServer(address, RequestHandler)

    # create child processes to act as workers
    for i in range(number_of_processes-1):
        Process(target=serve_forever, args=(server,)).start()

    # main process also acts as a worker
    serve_forever(server)


def test():
    DIR = os.path.join(os.path.dirname(__file__), '..')
    ADDRESS = ('localhost', 8000)
    NUMBER_OF_PROCESSES = 4

    print 'Serving at http://%s:%d using %d worker processes' % \
          (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
    print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']

    os.chdir(DIR)
    runpool(ADDRESS, NUMBER_OF_PROCESSES)


if __name__ == '__main__':
    freeze_support()
    test()

multiprocessingthreading を比較した簡単なベンチマークです:

#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, multiprocessing, threading, Queue, gc

if sys.platform == 'win32':
    _timer = time.clock
else:
    _timer = time.time

delta = 1


#### TEST_QUEUESPEED

def queuespeed_func(q, c, iterations):
    a = '0' * 256
    c.acquire()
    c.notify()
    c.release()

    for i in xrange(iterations):
        q.put(a)

    q.put('STOP')

def test_queuespeed(Process, q, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = Process(target=queuespeed_func, args=(q, c, iterations))
        c.acquire()
        p.start()
        c.wait()
        c.release()

        result = None
        t = _timer()

        while result != 'STOP':
            result = q.get()

        elapsed = _timer() - t

        p.join()

    print iterations, 'objects passed through the queue in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_PIPESPEED

def pipe_func(c, cond, iterations):
    a = '0' * 256
    cond.acquire()
    cond.notify()
    cond.release()

    for i in xrange(iterations):
        c.send(a)

    c.send('STOP')

def test_pipespeed():
    c, d = multiprocessing.Pipe()
    cond = multiprocessing.Condition()
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = multiprocessing.Process(target=pipe_func,
                                    args=(d, cond, iterations))
        cond.acquire()
        p.start()
        cond.wait()
        cond.release()

        result = None
        t = _timer()

        while result != 'STOP':
            result = c.recv()

        elapsed = _timer() - t
        p.join()

    print iterations, 'objects passed through connection in',elapsed,'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_SEQSPEED

def test_seqspeed(seq):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            a = seq[5]

        elapsed = _timer()-t

    print iterations, 'iterations in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_LOCK

def test_lockspeed(l):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            l.acquire()
            l.release()

        elapsed = _timer()-t

    print iterations, 'iterations in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_CONDITION

def conditionspeed_func(c, N):
    c.acquire()
    c.notify()

    for i in xrange(N):
        c.wait()
        c.notify()

    c.release()

def test_conditionspeed(Process, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        c.acquire()
        p = Process(target=conditionspeed_func, args=(c, iterations))
        p.start()

        c.wait()

        t = _timer()

        for i in xrange(iterations):
            c.notify()
            c.wait()

        elapsed = _timer()-t

        c.release()
        p.join()

    print iterations * 2, 'waits in', elapsed, 'seconds'
    print 'average number/sec:', iterations * 2 / elapsed

####

def test():
    manager = multiprocessing.Manager()

    gc.disable()

    print '\n\t######## testing Queue.Queue\n'
    test_queuespeed(threading.Thread, Queue.Queue(),
                    threading.Condition())
    print '\n\t######## testing multiprocessing.Queue\n'
    test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
                    multiprocessing.Condition())
    print '\n\t######## testing Queue managed by server process\n'
    test_queuespeed(multiprocessing.Process, manager.Queue(),
                    manager.Condition())
    print '\n\t######## testing multiprocessing.Pipe\n'
    test_pipespeed()

    print

    print '\n\t######## testing list\n'
    test_seqspeed(range(10))
    print '\n\t######## testing list managed by server process\n'
    test_seqspeed(manager.list(range(10)))
    print '\n\t######## testing Array("i", ..., lock=False)\n'
    test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
    print '\n\t######## testing Array("i", ..., lock=True)\n'
    test_seqspeed(multiprocessing.Array('i', range(10), lock=True))

    print

    print '\n\t######## testing threading.Lock\n'
    test_lockspeed(threading.Lock())
    print '\n\t######## testing threading.RLock\n'
    test_lockspeed(threading.RLock())
    print '\n\t######## testing multiprocessing.Lock\n'
    test_lockspeed(multiprocessing.Lock())
    print '\n\t######## testing multiprocessing.RLock\n'
    test_lockspeed(multiprocessing.RLock())
    print '\n\t######## testing lock managed by server process\n'
    test_lockspeed(manager.Lock())
    print '\n\t######## testing rlock managed by server process\n'
    test_lockspeed(manager.RLock())

    print

    print '\n\t######## testing threading.Condition\n'
    test_conditionspeed(threading.Thread, threading.Condition())
    print '\n\t######## testing multiprocessing.Condition\n'
    test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
    print '\n\t######## testing condition managed by a server process\n'
    test_conditionspeed(multiprocessing.Process, manager.Condition())

    gc.enable()

if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()