18.5.4. トランスポートとプロトコル (コールバックベースの API)

18.5.4.1. トランスポート

Transports are classes provided by asyncio in order to abstract various kinds of communication channels. You generally won’t instantiate a transport yourself; instead, you will call an AbstractEventLoop method which will create the transport and try to initiate the underlying communication channel, calling you back when it succeeds.

いったん通信チャンネルが確立されると、トランスポートは常に プロトコル インスタンスとのペアを成します。プロトコルはその後さまざまな用途のためトランスポートのメソッドを呼び出します。

asyncio は現在 TCP、UDP、SSL およびサブプロセスパイプのトランスポートを実装しています。利用可能なトランスポートのメソッドはトランスポートの種類に依存します。

トランスポートクラスは スレッド安全ではありません

バージョン 3.6 で変更: The socket option TCP_NODELAY is now set by default.

18.5.4.1.1. BaseTransport

class asyncio.BaseTransport

トランスポートの基底クラスです。

close(self)

トランスポートをクローズします。トランスポートが発信データのバッファーを持っていた場合、バッファーされたデータは非同期にフラッシュされます。それ以降データは受信されません。バッファーされていたデータがすべてフラッシュされた後、そのプロトコルの connection_lost() メソッドが引数 None で呼び出されます。

is_closing(self)

トランスポートを閉じている最中か閉じていた場合 True を返します。

バージョン 3.5.1 で追加.

get_extra_info(name, default=None)

オプションのトランスポート情報を返します。name は取得したトランスポート固有の情報を表す文字列で、default は情報が存在しなかったときに返す値になります。

このメソッドはトランスポートの実装に容易にチャンネル固有の情報を渡すことができます。

  • ソケット:

  • SSL ソケット:

    • 'compression': 圧縮アルゴリズムで、ssl.SSLSocket.compression() の結果になります。圧縮されていないときは None になります

    • 'cipher': 3 個の値 (使用されている暗号アルゴリズムの名称、使用が定義されている SSL プロトコルのバージョン、および使用されている秘密鍵のビット数) からなるタプルで、ssl.SSLSocket.cipher() の結果になります

    • 'peercert': ピアの証明書で、ssl.SSLSocket.getpeercert() の結果になります

    • 'sslcontext': ssl.SSLContext のインスタンスになります

    • 'ssl_object': ssl.SSLObject または ssl.SSLSocket インスタンス

  • パイプ:

    • 'pipe': パイプオブジェクトです

  • サブプロセス:

set_protocol(protocol)

Set a new protocol. Switching protocol should only be done when both protocols are documented to support the switch.

バージョン 3.5.3 で追加.

get_protocol()

Return the current protocol.

バージョン 3.5.3 で追加.

バージョン 3.5.1 で変更: 'ssl_object' 情報が SSL ソケットに追加されました。

18.5.4.1.2. ReadTransport

class asyncio.ReadTransport

読み込み専用トランスポートのインターフェースです。

pause_reading()

トランスポートの受信側を一時停止します。resume_reading() が呼び出されるまでそのプロトコルの data_received() メソッドにデータは渡されません。

resume_reading()

受信を再開します。読み込み可能データが存在した場合そのプロトコルの data_received() メソッドが一度呼び出されます。

18.5.4.1.3. WriteTransport

class asyncio.WriteTransport

書き込み専用トランスポートのインターフェースです。

abort()

トランスポートを即座にクローズします。未完了の処理があってもそれを待ちません。バッファーされているデータは失われます。それ以降データは受信されません。最終的にそのプロトコルの connection_lost() メソッドが引数 None で呼び出されます。

can_write_eof()

トランスポートが write_eof() をサポートしている場合 True を、サポートしていない場合は False を返します。

get_write_buffer_size()

トランスポートで使用されている出力バッファーの現在のサイズを返します。

get_write_buffer_limits()

書き込みフロー制御の 最高 および 最低 水位点 (high- and low-water limits) を取得します。(low, high) のタプルを返します。low および high は整数のバイト列になります。

水位点の設定は set_write_buffer_limits() で行います。

バージョン 3.4.2 で追加.

set_write_buffer_limits(high=None, low=None)

書き込みフロー制御の 最高 および 最低 水位点 (high- and low-water limits) を設定します。

これら 2 個の値はプロトコルの pause_writing() および resume_writing() メソッドが呼ばれたときの振る舞いを制御します。指定する場合、lowhigh 以下でなければなりません。highlow も負数は指定できません。

The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes pause_writing() to be called whenever the buffer becomes non-empty. Setting low to zero causes resume_writing() to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.

水位点の取得には get_write_buffer_limits() を使用します。

write(data)

トランスポートにバイト列 data を書き込みます。

このメソッドはブロックしません; データをバッファーし、非同期に送信する準備を行います。

writelines(list_of_data)

バイト列のデータのリスト (またはイテラブル) をトランスポートに書き込みます。この振る舞いはイテラブルを yield して各要素で write() を呼び出すことと等価ですが、より効率的な実装となる場合があります。

write_eof()

バッファーされたデータをフラッシュした後トランスポートの送信側をクローズします。データは受信されます。

このメソッドはトランスポート (例えば SSL) がハーフクローズをサポートしていない場合 NotImplementedError を送出します。

18.5.4.1.4. DatagramTransport

DatagramTransport.sendto(data, addr=None)

リモートピア addr (トランスポート依存の対象アドレス) にバイト列 data を送信します。addrNone の場合、データはトランスポートの作成時に指定された送信先に送られます。

このメソッドはブロックしません; データをバッファーし、非同期に送信する準備を行います。

DatagramTransport.abort()

トランスポートを即座にクローズします。未完了の処理があってもそれを待ちません。バッファーされているデータは失われます。それ以降データは受信されません。最終的にそのプロトコルの connection_lost() メソッドが引数 None で呼び出されます。

18.5.4.1.5. BaseSubprocessTransport

class asyncio.BaseSubprocessTransport
get_pid()

サブプロセスのプロセス ID (整数) を返します。

get_pipe_transport(fd)

整数のファイル記述子 fd に該当する通信パイプのトランスポートを返します:

  • 0: 標準入力 (stdin) の読み込み可能ストリーミングトランスポート。サブプロセスが stdin=PIPE で作成されていない場合は None

  • 1: 標準出力 (stdout) の書き込み可能ストリーミングトランスポート。サブプロセスが stdout=PIPE で作成されていない場合は None

  • 2: 標準エラー出力 (stderr) の書き込み可能ストリーミングトランスポート。サブプロセスが stderr=PIPE で作成されていない場合は None

  • その他の fd: None

get_returncode()

サブプロセスのリターンコード (整数) を返します。リターンコードを持たない場合 None を返します。subprocess.Popen.returncode 属性と同じです。

kill(self)

Kill the subprocess, as in subprocess.Popen.kill().

POSIX システムでは、この関数はサブプロセスに SIGKILL を送信します。Windows では、このメソッドは terminate() の別名です。

send_signal(signal)

サブプロセスにシグナル signal を送信します。subprocess.Popen.send_signal() と同じです。

terminate()

サブプロセスに停止を要求します。subprocess.Popen.terminate() と同じです。このメソッドは close() メソッドの別名です。

POSIX システムでは、このメソッドはサブプロセスに SIGTERM を送信します。Windows では、Windows API 関数 TerminateProcess() が呼び出されます。

close()

サブプロセスがまだ返していない場合、terminate() メソッドの呼び出しによってサブプロセスに停止を要求し、全パイプ (stdinstdout および stderr) のトランスポートをクローズします。

18.5.4.2. プロトコル

asyncio はネットワークプロトコルの実装をサブクラス化する基底クラスを提供します。これらクラスは トランスポート と連動して使用されます: プロトコルは入力データの解析および出力データの書き込みのための問い合わせを行い、トランスポートは実際の I/O とバッファリングに責任を持ちます。

プロトコルクラスをサブクラス化するとき、いくつかのメソッドをオーバーライドすることを推奨します。これらメソッドはコールバックです: いくつかのイベントが発生したとき (例えばデータの受信など) に呼び出されます; あなたがトランスポートを実装する場合を除き、これらを直接呼び出すべきではありません。

注釈

すべてのコールバックはデフォルトで空の実装を持ちます。したがって、あなたが興味を持ったイベント用のコールバックのみ実装が必要になります。

18.5.4.2.1. プロトコルクラス群

class asyncio.Protocol

(例えば TCP や SSL トランスポートとともに使用する) ストリーミングプロトコルを実装する基底クラスです。

class asyncio.DatagramProtocol

(例えば UDP トランスポートともに使用する) データグラムプロトコルを実装する基底クラスです。

class asyncio.SubprocessProtocol

子プロセスと (一方向パイプを使用して) 通信するプロトコルを実装する基底クラスです。

18.5.4.2.2. コネクションコールバック

これらコールバックは ProtocolDatagramProtocol および SubprocessProtocol インスタンスから呼び出される場合があります:

BaseProtocol.connection_made(transport)

コネクションが作成されたときに呼び出されます。

引数 transport はコネクションを表すトランスポートです。必要であれば、それをどこに格納するか (例えば属性へ) を決めるのはあなたです。

BaseProtocol.connection_lost(exc)

コネクションが失われた、あるいはクローズされたときに呼び出されます。

引数は例外オブジェクトまたは None になります。None のとき、通常の EOF が受信されたか、あるいはコネクションがこちら側から中止またはクローズされたことを意味します。

connection_made() および connection_lost() は接続が成功するたびに厳密に 1 回呼び出されます。その他のすべてのコールバックはこれら 2 つのメソッドの間に呼び出され、あなたのプロトコルの実装内のリソース管理を容易に行えます。

以下のコールバックを呼び出すのは SubprocessProtocol インスタンスのみかもしれません:

SubprocessProtocol.pipe_data_received(fd, data)

子プロセスが自身の標準出力や標準エラー出力のパイプにデータを書き込んだときに呼び出されます。fd はパイプのファイル記述子 (整数) になります。data はデータを含む空ではないバイト列になります。

SubprocessProtocol.pipe_connection_lost(fd, exc)

子プロセスと通信するパイプの一つがクローズされると呼び出されます。fd はクローズされたファイル記述子 (整数) になります。

SubprocessProtocol.process_exited()

子プロセスが終了したときに呼び出されます。

18.5.4.2.3. ストリーミングプロトコル

以下のコールバックは Protocol インスタンス上で呼び出されます:

Protocol.data_received(data)

データを受信したときに呼び出されます。data は受信したデータを含む空ではないバイト列オブジェクトになります。

注釈

トランスポートによって、データのバッファー、チャンクあるいは再構築のどれかが行われます。一般に、固有のセマンティックを信頼すべきではなく、代わりに全体的かつ十分に柔軟な解析を行うべきです。ただし、データは常に正しい順序で受信されます。

Protocol.eof_received()

相手方が送信するデータがないことを伝えてきたとき (例えば相手方が asyncio を使用しており write_eof() を呼び出したなど) 呼び出されます。

This method may return a false value (including None), in which case the transport will close itself. Conversely, if this method returns a true value, closing the transport is up to the protocol. Since the default implementation returns None, it implicitly closes the connection.

注釈

SSL のような一部のトランスポートはハーフクローズ接続をサポートしていません。その場合このメソッドが真値を返すとコネクションのクローズを回避できません。

接続中、data_received() は複数回呼び出されえます。eof_received() が呼び出されるのは 1 回で、1 度呼び出されると、その後 data_received() が呼び出されることはありません。

State machine:

18.5.4.2.4. データグラムプロトコル

以下のコールバックは DatagramProtocol インスタンス上で呼び出されます。

DatagramProtocol.datagram_received(data, addr)

データグラムを受信したときに呼び出されます。data は受信データを含むバイトオブジェクトです。addr はデータを送信するピアのアドレスです; 正確な形式はトランスポートに依存します。

DatagramProtocol.error_received(exc)

直前の送信あるいは受信が OSError を送出したときに呼び出されます。excOSError のインスタンスになります。

このメソッドが呼ばれるのは、トランスポート (UDP など) がデータグラムを受信側に配信できなかったことが検出されたなどの、まれな場合においてのみです。ほとんどの場合、データグラムが配信できなければそのまま通知されることなく破棄されます。

18.5.4.2.5. フロー制御コールバック

これらコールバックは ProtocolDatagramProtocol および SubprocessProtocol インスタンスから呼び出される場合があります:

BaseProtocol.pause_writing()

トランスポートのバッファーサイズが最高水位点 (High-Water Mark) を超えたときに呼び出されます。

BaseProtocol.resume_writing()

トランスポートのバッファーサイズが最低水位点 (Low-Water Mark) に達したきに呼び出されます。

pause_writing() および resume_writing() の呼び出しは対になります。– pause_writing() はバッファーが完全に最高水位点を超えたとき (後続の書き込みがさらにバッファーサイズを増やすとしても) 1 度呼び出され、バッファーサイズが最終的に最低水位点に達したときに resume_writing() が 1 度呼び出されます。

注釈

バッファーサイズが最高水位点と等しくなった時点では pause_writing() は呼び出されません – 完全に超えなければなりません。対して、バッファーサイズが最低水位点と等しくなったときは resume_writing() は呼び出されます。これら端末条件は各点がゼロになったとき予定通りに動作するかどうか確認するために重要です。

注釈

BSD システム (OS X、FreeBSD など) では、DatagramProtocol でのフロー制御は、大量のパケット送信による送信失敗を検知するのが容易ではないためサポートされていません。ソケットは常に ‘待機状態’ のように見え、超過分のパケットは破棄されます; エラー番号 errno.ENOBUFS が設定された OSError が送出されるときもあればされないときもあります; 送出された場合、DatagramProtocol.error_received() に通知されますが、送出されないと無視されます。

18.5.4.2.6. コルーチンとプロトコル

Coroutines can be scheduled in a protocol method using ensure_future(), but there is no guarantee made about the execution order. Protocols are not aware of coroutines created in protocol methods and so will not wait for them.

信頼できる実行順を持つには、コルーチンの yield fromストリームオブジェクト を使用します。例えば、StreamWriter.drain() コルーチンは書き込みバッファーがフラッシュされるまで待機することができます。

18.5.4.3. プロトコルの例

18.5.4.3.1. TCP Echo クライアントプロトコル

TCP echo client using the AbstractEventLoop.create_connection() method, send data and wait until the connection is closed:

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

The event loop is running twice. The run_until_complete() method is preferred in this short example to raise an exception if the server is not listening, instead of having to write a short coroutine to handle the exception and stop the running loop. At run_until_complete() exit, the loop is no longer running, so there is no need to stop the loop in case of an error.

参考

ストリームを使った TCP Echo クライアント の例では asyncio.open_connection() 関数を使用しています。

18.5.4.3.2. TCP Echo サーバープロトコル

TCP echo server using the AbstractEventLoop.create_server() method, send back received data and close the connection:

import asyncio

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Transport.close() は、データがまだソケットに送信されていなくても、WriteTransport.write() の直後に呼び出されます: それぞれのメソッドは非同期です。これらトランスポートメソッドはコルーチンではないため、yield from は必要ありません。

参考

ストリームを使った TCP Echo サーバー の例では asyncio.start_server() 関数を使用しています。

18.5.4.3.3. UDP Echo クライアントプロトコル

UDP echo client using the AbstractEventLoop.create_datagram_endpoint() method, send data and close the transport when we received the answer:

import asyncio

class EchoClientProtocol:
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        loop = asyncio.get_event_loop()
        loop.stop()

loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
    lambda: EchoClientProtocol(message, loop),
    remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

18.5.4.3.4. UDP Echo サーバープロトコル

UDP echo server using the AbstractEventLoop.create_datagram_endpoint() method, send back received data:

import asyncio

class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
    EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

transport.close()
loop.close()

18.5.4.3.5. プロトコルを使ってデータを待つオープンソケットの登録

Wait until a socket receives data using the AbstractEventLoop.create_connection() method with a protocol, and then close the event loop

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()

class MyProtocol(asyncio.Protocol):
    transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport (it will call connection_lost())
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed, stop the event loop
        loop.stop()

# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

# Run the event loop
loop.run_forever()

# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()

参考

読み込みイベント用のファイル記述子の監視 の例では、ソケットのファイル記述子を登録するのに低水準の AbstractEventLoop.add_reader() メソッドを使用しています。

ストリームを使ってデータを待つオープンソケットの登録 の例ではコルーチンの open_connection() 関数によって作成された高水準ストリームを使用しています。