voice consumer: cap live connections at 6 per room (defense-in-depth) — TDD
The deposit gate already caps PRESENCE at 5 visitors (+owner = 6), so voice membership is bounded — but a present member opening multiple tabs could still oversubscribe the mesh. RoomVoiceConsumer now claims a slot on connect via an atomic cache counter (cache.incr — atomic on Redis + LocMem) and refuses past VOICE_MAX_MEMBERS=6; the slot frees on disconnect (26h TTL backstop so a leaked slot self-clears). Room-agnostic, so epic rooms inherit it. +1 channels test: 6 connections fit, the 7th is refused, a disconnect reopens a slot. 9 voice consumer channels tests green. Code architected by Disco DeDisco <discodedisco@outlook.com> Git commit message Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -22,6 +22,13 @@ import uuid as uuidlib
|
|||||||
from channels.db import database_sync_to_async
|
from channels.db import database_sync_to_async
|
||||||
from channels.generic.websocket import AsyncJsonWebsocketConsumer
|
from channels.generic.websocket import AsyncJsonWebsocketConsumer
|
||||||
|
|
||||||
|
# Max live connections per room — the owner (seat 1C) + up to 5 visitors
|
||||||
|
# (2C–6C), mirroring `gameboard.models.MY_SEA_MAX_VISITORS`. The deposit gate
|
||||||
|
# already caps *presence* at 5 visitors, so this is defense-in-depth: it also
|
||||||
|
# refuses extra connections from multi-tab / stale-presence edge cases.
|
||||||
|
VOICE_MAX_MEMBERS = 6
|
||||||
|
_SLOT_TTL = 26 * 3600 # > the 24h voice window, so a leaked slot self-clears
|
||||||
|
|
||||||
|
|
||||||
class RoomVoiceConsumer(AsyncJsonWebsocketConsumer):
|
class RoomVoiceConsumer(AsyncJsonWebsocketConsumer):
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
@@ -30,12 +37,18 @@ class RoomVoiceConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
self.room_group = None
|
self.room_group = None
|
||||||
self.peer_group = None
|
self.peer_group = None
|
||||||
self.peer_id = None
|
self.peer_id = None
|
||||||
|
self._claimed = False
|
||||||
if not (self.user and getattr(self.user, "is_authenticated", False)):
|
if not (self.user and getattr(self.user, "is_authenticated", False)):
|
||||||
await self.close()
|
await self.close()
|
||||||
return
|
return
|
||||||
if not await self._can_join(self.user, self.room_id):
|
if not await self._can_join(self.user, self.room_id):
|
||||||
await self.close()
|
await self.close()
|
||||||
return
|
return
|
||||||
|
# Member cap — claim a slot or refuse (the room is full).
|
||||||
|
if not await self._claim_slot():
|
||||||
|
await self.close()
|
||||||
|
return
|
||||||
|
self._claimed = True
|
||||||
self.peer_id = uuidlib.uuid4().hex
|
self.peer_id = uuidlib.uuid4().hex
|
||||||
self.room_group = f"voice.{self.room_id}"
|
self.room_group = f"voice.{self.room_id}"
|
||||||
self.peer_group = f"peer.{self.peer_id}"
|
self.peer_group = f"peer.{self.peer_id}"
|
||||||
@@ -53,6 +66,9 @@ class RoomVoiceConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
await self.channel_layer.group_discard(self.room_group, self.channel_name)
|
await self.channel_layer.group_discard(self.room_group, self.channel_name)
|
||||||
if self.peer_group:
|
if self.peer_group:
|
||||||
await self.channel_layer.group_discard(self.peer_group, self.channel_name)
|
await self.channel_layer.group_discard(self.peer_group, self.channel_name)
|
||||||
|
if self._claimed:
|
||||||
|
await self._release_slot()
|
||||||
|
self._claimed = False
|
||||||
|
|
||||||
async def receive_json(self, content):
|
async def receive_json(self, content):
|
||||||
mtype = content.get("type")
|
mtype = content.get("type")
|
||||||
@@ -110,6 +126,39 @@ class RoomVoiceConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
"payload": event["payload"],
|
"payload": event["payload"],
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# ── member-cap slot counter (atomic, cache-backed) ──────────────────
|
||||||
|
def _slot_key(self):
|
||||||
|
return f"voice_slot:{self.room_id}"
|
||||||
|
|
||||||
|
@database_sync_to_async
|
||||||
|
def _claim_slot(self):
|
||||||
|
"""Atomically take a connection slot; False if the room is already
|
||||||
|
full (VOICE_MAX_MEMBERS). `cache.incr` is atomic on Redis + LocMem, so
|
||||||
|
concurrent connects can't oversubscribe past the cap."""
|
||||||
|
from django.core.cache import cache
|
||||||
|
key = self._slot_key()
|
||||||
|
cache.add(key, 0, _SLOT_TTL)
|
||||||
|
try:
|
||||||
|
n = cache.incr(key)
|
||||||
|
except ValueError: # key expired between add + incr
|
||||||
|
cache.set(key, 1, _SLOT_TTL)
|
||||||
|
n = 1
|
||||||
|
if n > VOICE_MAX_MEMBERS:
|
||||||
|
try:
|
||||||
|
cache.decr(key)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
@database_sync_to_async
|
||||||
|
def _release_slot(self):
|
||||||
|
from django.core.cache import cache
|
||||||
|
try:
|
||||||
|
cache.decr(self._slot_key())
|
||||||
|
except ValueError: # already 0 / expired
|
||||||
|
pass
|
||||||
|
|
||||||
# ── membership gate ─────────────────────────────────────────────────
|
# ── membership gate ─────────────────────────────────────────────────
|
||||||
@database_sync_to_async
|
@database_sync_to_async
|
||||||
def _can_join(self, user, room_id):
|
def _can_join(self, user, room_id):
|
||||||
|
|||||||
@@ -81,6 +81,40 @@ class RoomVoiceConsumerGateTest(TransactionTestCase):
|
|||||||
self.assertFalse(connected)
|
self.assertFalse(connected)
|
||||||
|
|
||||||
|
|
||||||
|
@tag("channels")
|
||||||
|
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
|
||||||
|
class RoomVoiceConsumerCapacityTest(TransactionTestCase):
|
||||||
|
"""Defense-in-depth member cap (2026-05-29): at most VOICE_MAX_MEMBERS live
|
||||||
|
connections per room, even from multi-tab. The slot is freed on disconnect
|
||||||
|
so the room reopens. The owner can hold many connections here (the counter
|
||||||
|
is per-connection), which keeps the test independent of the deposit gate."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.owner = User.objects.create(email="owner@test.io", username="discoman")
|
||||||
|
self.room = f"mysea-{self.owner.id}" # fresh UUID → fresh slot counter
|
||||||
|
|
||||||
|
async def test_refuses_a_connection_past_the_member_cap(self):
|
||||||
|
from apps.voice.consumers import VOICE_MAX_MEMBERS
|
||||||
|
comms = []
|
||||||
|
for i in range(VOICE_MAX_MEMBERS):
|
||||||
|
c = _comm(self.owner, self.room)
|
||||||
|
connected, _ = await c.connect()
|
||||||
|
self.assertTrue(connected, f"member {i} should fit")
|
||||||
|
comms.append(c)
|
||||||
|
# One past the cap is refused.
|
||||||
|
over = _comm(self.owner, self.room)
|
||||||
|
connected, _ = await over.connect()
|
||||||
|
self.assertFalse(connected)
|
||||||
|
# A disconnect frees a slot → the room reopens.
|
||||||
|
await comms.pop().disconnect()
|
||||||
|
reopened = _comm(self.owner, self.room)
|
||||||
|
connected, _ = await reopened.connect()
|
||||||
|
self.assertTrue(connected)
|
||||||
|
comms.append(reopened)
|
||||||
|
for c in comms:
|
||||||
|
await c.disconnect()
|
||||||
|
|
||||||
|
|
||||||
@tag("channels")
|
@tag("channels")
|
||||||
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
|
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
|
||||||
class RoomVoiceConsumerSignalingTest(TransactionTestCase):
|
class RoomVoiceConsumerSignalingTest(TransactionTestCase):
|
||||||
|
|||||||
Reference in New Issue
Block a user