チュートリアル:基本的なチャットサーバーの構築#

このチュートリアルでは、非常に基本的なチャットサーバーを構築します。これにより、誰でもサーバーに現在接続されている他のユーザー全員にメッセージを送信できます。

このチュートリアルは、QuartにおけるWebSocketsの紹介として役立つことを目的としています。最後まで飛ばしたい場合は、コードはGithubにあります。

1:プロジェクトの作成#

チャットサーバー用のプロジェクトを作成する必要があります。Poetryを使用することをお勧めします。Poetryはpip(またはBrew)を介してインストールされます。

pip install poetry

Poetryを使用して新しいチャットプロジェクトを作成できます。

poetry new --src chat

これで、プロジェクトはchatディレクトリで開発でき、以降のコマンドはすべてchatディレクトリで実行する必要があります。

2:依存関係の追加#

このシンプルなチャットサーバーを構築するには、Quartのみが必要です。これは、次のコマンドを実行してプロジェクトの依存関係としてインストールできます。

poetry add quart

Poetryは、次のコマンドを実行することで、この依存関係が存在し、パスが正しいことを確認します。

poetry install

3:アプリの作成#

WebサーバーとしてQuartアプリが必要です。これは、src/chat/__init__.pyに以下を追加することで作成されます。

src/chat/__init__.py#
from quart import Quart

app = Quart(__name__)

def run() -> None:
    app.run()

アプリを簡単に実行するために、pyproject.tomlに以下を追加して、poetryスクリプトからrunメソッドを呼び出すことができます。

pyproject.toml#
[tool.poetry.scripts]
start = "chat:run"

これにより、次のコマンドでアプリを起動できます。

poetry run start

4:UIの配信#

ユーザーがチャットウェブサイトにアクセスしたときに、メッセージの入力と受信に使用できるUIを表示する必要があります。次のHTMLテンプレートをsrc/chat/templates/index.htmlに追加する必要があります。

src/chat/templates/index.html#
<script type="text/javascript">
  const ws = new WebSocket(`ws://${location.host}/ws`);

  ws.addEventListener('message', function (event) {
    const li = document.createElement("li");
    li.appendChild(document.createTextNode(event.data));
    document.getElementById("messages").appendChild(li);
  });

  function send(event) {
    const message = (new FormData(event.target)).get("message");
    if (message) {
      ws.send(message);
    }
    event.target.reset();
    return false;
  }
</script>

<div style="display: flex; height: 100%; flex-direction: column">
  <ul id="messages" style="flex-grow: 1; list-style-type: none"></ul>

  <form onsubmit="return send(event)">
    <input type="text" name="message" minlength="1" />
    <button type="submit">Send</button>
  </form>
</div>

これは、スタイルの面だけでなく、WebSocketのエラー処理がないため、非常に基本的なUIです。

これで、ルートパス(/)に対してこのテンプレートを提供できます。そのためには、src/chat/__init__.pyに以下を追加します。

from quart import render_template

@app.get("/")
async def index():
    return await render_template("index.html")

5:ブローカーの構築#

WebSocketルートを追加する前に、接続されたクライアント間でメッセージを渡すことができる必要があります。そのためには、メッセージブローカーが必要です。まず、src/chat/broker.pyに以下を追加して、独自のインメモリブローカーを構築します。

src/chat/broker.py#
import asyncio
from typing import AsyncGenerator

from quart import Quart

class Broker:
    def __init__(self) -> None:
        self.connections = set()

    async def publish(self, message: str) -> None:
        for connection in self.connections:
            await connection.put(message)

    async def subscribe(self) -> AsyncGenerator[str, None]:
        connection = asyncio.Queue()
        self.connections.add(connection)
        try:
            while True:
                yield await connection.get()
        finally:
            self.connections.remove(connection)

このBrokerは、パブリッシュ・サブスクライブパターンに基づいたインターフェースを持ち、クライアントはメッセージを送信する他のクライアントにパブリッシュし、送信されたメッセージをサブスクライブすることが期待されます。

6:WebSocketの実装#

これで、src/chat/__init__.pyに以下を追加して、WebSocketルートを実装できます。

src/chat/__init__.py#
import asyncio

from quart import websocket

from chat.broker import Broker

broker = Broker()

async def _receive() -> None:
    while True:
        message = await websocket.receive()
        await broker.publish(message)

@app.websocket("/ws")
async def ws() -> None:
    try:
        task = asyncio.ensure_future(_receive())
        async for message in broker.subscribe():
            await websocket.send(message)
    finally:
        task.cancel()
        await task

_receiveコルーチンは、送受信が同時に実行されるように、別々のタスクとして実行する必要があります。さらに、このタスクは適切にキャンセルされ、クリーンアップされる必要があります。

ユーザーが切断すると、CancelledErrorが発生し、whileループが中断され、finallyブロックがトリガーされます。

7:テスト#

アプリをテストするには、WebSocketルートを介して送信されたメッセージがエコーバックされることを確認する必要があります。これを行うには、tests/test_chat.pyに以下を追加します。

tests/test_chat.py#
import asyncio

from quart.testing.connections import TestWebsocketConnection as _TestWebsocketConnection

from chat import app

async def _receive(test_websocket: _TestWebsocketConnection) -> str:
    return await test_websocket.receive()

async def test_websocket() -> None:
    test_client = app.test_client()
    async with test_client.websocket("/ws") as test_websocket:
        task = asyncio.ensure_future(_receive(test_websocket))
        await test_websocket.send("message")
        result = await task
        assert result == "message"

テストは非同期関数であるため、次のコマンドを実行してpytest-asyncioをインストールする必要があります。

poetry add --dev pytest-asyncio

インストールしたら、pyproject.tomlに以下を追加して構成する必要があります。

[tool.pytest.ini_options]
asyncio_mode = "auto"

最後に、このコマンドでテストを実行できます。

poetry run pytest tests/

Quartのサンプルフォルダーでこれを実行している場合は、pytestがQuartのpytest設定を使用しないようにするために、-c pyproject.tomlオプションを追加する必要があります。

8:サマリー#

これまでに構築したメッセージブローカーはインメモリでのみ機能するため、メッセージは同じサーバーインスタンスに接続しているユーザーとの間でのみ共有されます。サーバーインスタンス間でメッセージを共有するには、pub/subインターフェースをサポートするaioredisライブラリなど、redisのようなサードパーティのブローカーを使用する必要があります。