Files
python-tdd/src/apps/epic/tests/integrated/test_consumers.py

38 lines
1.2 KiB
Python
Raw Normal View History

from channels.testing.websocket import WebsocketCommunicator
from channels.layers import get_channel_layer
from django.test import SimpleTestCase, override_settings
from core.asgi import application
TEST_CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
}
}
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
class RoomConsumerTest(SimpleTestCase):
async def test_can_connect_and_disconnect(self):
communicator = WebsocketCommunicator(application, "/ws/room/test-room/")
connected, _ = await communicator.connect()
self.assertTrue(connected)
await communicator.disconnect()
async def test_receives_gate_update_broadcast(self):
communicator = WebsocketCommunicator(application, "/ws/room/test-room/")
await communicator.connect()
channel_layer = get_channel_layer()
await channel_layer.group_send(
"room_test-room",
{"type": "gate_update", "gate_state": "some_state"},
)
response = await communicator.receive_json_from()
self.assertEqual(response["type"], "gate_update")
self.assertEqual(response["gate_state"], "some_state")
await communicator.disconnect()