Below is a clean, production-grade way to convert your current gateway into a Redis-backed message channel, without breaking your existing auth / role / flair logic.
This keeps Socket.IO for transport and moves message fan-out + history ownership to Redis.
What You Are Changing (Conceptually)
Before
client → gateway → in-memory ChatService → socket.emit()After
client → gateway → Redis PUBLISH
Redis SUBSCRIBE → gateway → socket.emit()Key rules:
- WebSocket gateway becomes stateless
- Redis is the source of truth for messages
- Any number of WS nodes can scale horizontally
Redis Primitives to Use
| Purpose | Redis primitive |
|---|---|
| Live fan-out | PUB/SUB |
| Message history | XADD (Streams) or LPUSH |
| Presence | SET / ZSET (optional later) |
We’ll start with:
- PUB/SUB for realtime
- LIST for recent history (simple)
Step 1 — Add Redis Service
// redis.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy } from "@nestjs/common";
import Redis from "ioredis";
@Injectable()
export class RedisService
implements OnModuleInit, OnModuleDestroy
{
pub!: Redis;
sub!: Redis;
onModuleInit() {
this.pub = new Redis(process.env.REDIS_URL);
this.sub = new Redis(process.env.REDIS_URL);
}
onModuleDestroy() {
this.pub.quit();
this.sub.quit();
}
}Step 2 — Define Channel Naming (Critical)
const redisChannel = (room: string) => `chat:${room}`;
const redisHistory = (room: string) => `chat:${room}:history`;Never hardcode raw names inline.
Step 3 — Subscribe Once per Gateway Instance
constructor(
private readonly chat: ChatService,
private readonly emotes: EmotesService,
private readonly redis: RedisService,
) {}
async afterInit() {
for (const channel of ALLOWED_CHANNELS) {
const key = redisChannel(channel);
await this.redis.sub.subscribe(key);
this.redis.sub.on("message", (chan, payload) => {
if (chan !== key) return;
const msg: ChatMessage = JSON.parse(payload);
this.server.to(channel).emit("chat:message", msg);
});
}
}Important
- Do not resubscribe per socket
- One subscription per gateway process
Step 4 — Publish Instead of Emitting
Replace this:
this.chat.addMessage(channel, msg);
this.server.to(channel).emit("chat:message", msg);With this:
await this.redis.pub.publish(
redisChannel(channel),
JSON.stringify(msg),
);
// Persist history (trimmed)
await this.redis.pub.lpush(
redisHistory(channel),
JSON.stringify(msg),
);
await this.redis.pub.ltrim(redisHistory(channel), 0, 199);Now:
- Gateway does zero fan-out logic
- Redis handles distribution
Step 5 — Load History from Redis on Join
Replace:
client.emit("chat:history", this.chat.getHistory(channel));With:
const historyRaw = await this.redis.pub.lrange(
redisHistory(channel),
0,
49,
);
const history = historyRaw
.map((x) => JSON.parse(x))
.reverse();
client.emit("chat:history", history);Redis now owns chat history.
Step 6 — Remove In-Memory Chat Message Storage
You can now:
- Delete
chat.addMessage - Delete in-memory message buffers
- Keep
chat.join / leavefor presence (for now)
Later, presence can also move to Redis.
Step 7 — Failure & Ordering Guarantees
Ordering
- Redis PUB/SUB preserves publish order per channel
- LIST preserves message order
Node restarts
- No data loss (history still exists)
Horizontal scaling
- Every gateway node receives all messages
- Socket.IO room routing is local only
Optional (But Strongly Recommended Next)
Upgrade History → Redis Streams
If you want:
- Replay
- Cursor-based history
- Durable offsets
Use:
XADD chat:global:stream * payload "{...}"Then:
XRANGEfor historyXREADfor backfill
This is what Discord-scale systems do.
What You Did Not Touch (Correctly)
- Auth resolution
- Role + flair logic
- Emotes sync
- Client protocol
- Socket events
This keeps blast radius minimal.
Mental Model Going Forward
- Gateway = protocol adapter
- Redis = message bus
- ChatService = presence + policy only
If you want next:
- Redis-backed presence (ZSET)
- Message ACKs + moderation hooks
- Multi-region Redis fan-out strategy
- Or converting this to EventSub-style internal events
Say which direction.
// events.consumer.ts
const STREAM = “events:global”;
const GROUP = “chat-services”;
const CONSUMER = c-${process.pid};
async function ensureGroup(redis: Redis) { try { await redis.xgroup(“CREATE”, STREAM, GROUP, ”$”, “MKSTREAM”); } catch (e: any) { if (!String(e?.message || "").includes(“BUSYGROUP”)) throw e; } }
async function loop(redis: Redis) { await ensureGroup(redis);
while (true) { const res = await redis.xreadgroup( “GROUP”, GROUP, CONSUMER, “BLOCK”, 5000, “COUNT”, 50, “STREAMS”, STREAM, ”>” );
if (!res) continue;
for (const [, entries] of res) {
for (const [id, fields] of entries) {
const idx = fields.indexOf("event");
const payload = idx >= 0 ? fields[idx + 1] : null;
if (!payload) {
await redis.xack(STREAM, GROUP, id);
continue;
}
const ev: InternalEvent = JSON.parse(payload);
// route by type
// - persist analytics
// - trigger clip capture
// - enforce moderation
// - sync to DB
// etc.
await redis.xack(STREAM, GROUP, id);
}
}} }
## 1) Redis-backed presence with ZSET
### Goals
* Presence survives multi-node WS + restarts.
* “Online users” queryable per room.
* Stale connections cleaned automatically.
### Keys
* **Room presence (ZSET):** `presence:{room}`
* member: `userId` (or `userId|socketId` if you allow multiple sessions)
* score: `lastSeenMs`
* **User sockets (SET):** `presence:user:{userId}:sockets` (optional)
* **Socket -> user mapping (STRING w/ TTL):** `presence:socket:{socketId}` (optional)
### RedisService additions
```ts
// presence.redis.ts
const presenceKey = (room: string) => `presence:${room}`;
const userSocketsKey = (userId: string) => `presence:user:${userId}:sockets`;
const socketKey = (socketId: string) => `presence:socket:${socketId}`;
const PRESENCE_TTL_MS = 45_000; // consider 30-90s
const PRESENCE_SWEEP_MS = 60_000; // sweep cadenceOn connect/join
You should “heartbeat” on join and periodically while connected.
// in gateway
private async presenceTouch(room: string, userId: string, socketId: string) {
const now = Date.now();
// Single session per user: member=userId
await this.redis.pub.zadd(presenceKey(room), now, userId);
// Optional: track sockets for a user
await this.redis.pub.sadd(userSocketsKey(userId), socketId);
// Optional: map socket -> user for cleanup, with TTL
await this.redis.pub.set(socketKey(socketId), userId, "PX", PRESENCE_TTL_MS);
}Heartbeat (client → server)
Have client emit e.g. presence:ping every 15s. Server touches Redis.
@SubscribeMessage("presence:ping")
async handlePresencePing(@ConnectedSocket() client: Socket) {
const room = this.normalizeChannel(client);
const user: ChatUser | undefined = client.data.user;
if (!user) return;
await this.presenceTouch(room, user.id, client.id);
}On disconnect
You can’t safely ZREM userId if user has multiple sockets. If single-session-per-user is enforced, you can remove immediately. If multi-session, remove socket from set and only remove user from ZSET when no sockets remain.
private async presenceLeave(room: string, userId: string, socketId: string) {
await this.redis.pub.srem(userSocketsKey(userId), socketId);
const remaining = await this.redis.pub.scard(userSocketsKey(userId));
if (remaining <= 0) {
await this.redis.pub.del(userSocketsKey(userId));
await this.redis.pub.zrem(presenceKey(room), userId);
}
await this.redis.pub.del(socketKey(socketId));
}Stale cleanup (important)
Run in each node, but use a Redis lock so only one sweeper is active.
private presenceSweeperStarted = false;
private async startPresenceSweeper() {
if (this.presenceSweeperStarted) return;
this.presenceSweeperStarted = true;
setInterval(async () => {
const lockKey = "presence:sweep:lock";
const lock = await this.redis.pub.set(lockKey, "1", "NX", "PX", 10_000);
if (!lock) return;
const cutoff = Date.now() - PRESENCE_TTL_MS;
for (const room of ALLOWED_CHANNELS) {
// remove stale users by score
await this.redis.pub.zremrangebyscore(presenceKey(room), 0, cutoff);
}
}, PRESENCE_SWEEP_MS).unref?.();
}Call startPresenceSweeper() from afterInit().
Query presence snapshot
private async getPresence(room: string) {
const cutoff = Date.now() - PRESENCE_TTL_MS;
const ids = await this.redis.pub.zrangebyscore(presenceKey(room), cutoff, "+inf");
return ids;
}Then emit presence:update based on that list (and optionally enrich via cached user display info).
2) Message ACKs + moderation hooks
Targets
- Client knows if message was accepted / rejected.
- Moderation pipeline can block, redact, timeout, shadowban.
- Works across multiple WS nodes.
Protocol changes
- Client sends:
chat:sendwithclientMsgId - Server immediately returns:
chat:ack(accepted/rejected + reason) - Server publishes message only if accepted.
DTO
export type SendMessageDto = {
text: string;
clientMsgId: string; // uuid from client
};ACK shape
type ChatAck =
| { clientMsgId: string; status: "accepted"; serverMsgId: string; ts: number }
| { clientMsgId: string; status: "rejected"; reason: string; ts: number };Moderation hook interface
Keep it synchronous (fast) with optional async follow-up actions.
export type ModerationDecision =
| { action: "allow" }
| { action: "deny"; reason: string }
| { action: "shadow"; reason?: string }
| { action: "timeout"; seconds: number; reason: string }
| { action: "redact"; replacement: string; reason?: string };
export interface ModerationHook {
decide(args: {
room: string;
user: ChatUser;
text: string;
ip?: string;
}): Promise<ModerationDecision>;
}Example: cheap policy hook (rate limit + basic rules)
Use Redis for rate-limit (per user per room). Token bucket is overkill; start with fixed window.
private async rateLimit(room: string, userId: string) {
const key = `rl:${room}:${userId}`;
const n = await this.redis.pub.incr(key);
if (n === 1) await this.redis.pub.pexpire(key, 10_000); // 10s window
return n <= 8; // 8 msgs / 10s
}Gateway send handler with ACK + moderation
@SubscribeMessage("chat:send")
async handleMessage(
@MessageBody() dto: SendMessageDto,
@ConnectedSocket() client: Socket,
) {
const room = this.normalizeChannel(client);
const user: ChatUser = client.data.user;
// auth-only send
if (!user || user.id.startsWith("guest:")) {
const ack: ChatAck = {
clientMsgId: dto.clientMsgId,
status: "rejected",
reason: "auth_required",
ts: Date.now(),
};
client.emit("chat:ack", ack);
client.disconnect(true);
return;
}
// basic validation
const text = (dto.text ?? "").trim();
if (!text || text.length > 500) {
const ack: ChatAck = {
clientMsgId: dto.clientMsgId,
status: "rejected",
reason: "invalid_message",
ts: Date.now(),
};
client.emit("chat:ack", ack);
return;
}
// rate limit
if (!(await this.rateLimit(room, user.id))) {
const ack: ChatAck = {
clientMsgId: dto.clientMsgId,
status: "rejected",
reason: "rate_limited",
ts: Date.now(),
};
client.emit("chat:ack", ack);
return;
}
// moderation decision (plug your hooks)
const decision = await this.moderation.decide({
room,
user,
text,
ip: client.handshake.address,
});
if (decision.action === "deny") {
const ack: ChatAck = {
clientMsgId: dto.clientMsgId,
status: "rejected",
reason: decision.reason,
ts: Date.now(),
};
client.emit("chat:ack", ack);
return;
}
// shadowban: ack accepted but only echo to sender
const serverMsgId = crypto.randomUUID();
const ts = Date.now();
let finalText = text;
if (decision.action === "redact") finalText = decision.replacement;
const msg: ChatMessage = {
id: serverMsgId,
userId: user.id,
author: user.name,
roles: user.roles,
flair: user.flair,
channel: room,
text: finalText,
timestamp: ts,
};
const ack: ChatAck = {
clientMsgId: dto.clientMsgId,
status: "accepted",
serverMsgId,
ts,
};
client.emit("chat:ack", ack);
if (decision.action === "shadow") {
client.emit("chat:message", msg);
return;
}
if (decision.action === "timeout") {
// publish a moderation event (see EventSub section) + optionally disconnect
await this.publishInternalEvent("chat.moderation.timeout", {
room,
userId: user.id,
seconds: decision.seconds,
reason: decision.reason,
});
// you can also client.disconnect(true) or set a redis "muted" key checked on send
}
// publish (redis bus) + persist history
await this.redis.pub.publish(`chat:${room}`, JSON.stringify(msg));
await this.redis.pub.lpush(`chat:${room}:history`, JSON.stringify(msg));
await this.redis.pub.ltrim(`chat:${room}:history`, 0, 199);
}Moderation “delete message” / “purge user” should be separate events:
chat.moderation.delete_messagechat.moderation.purge_userand clients handle them (remove from UI).
3) Convert to EventSub-style internal events (recommended)
Why
You’re already evolving into multiple producers/consumers:
- Twitch ingestion
- YouTube ingestion
- Donations (Stripe)
- Chat messages
- Moderation actions
- Admin panel actions
EventSub-style means:
- Everything becomes an event envelope
- You can add new downstream services without rewriting producers
Minimal event envelope
export type InternalEvent<T = any> = {
id: string; // uuid
type: string; // e.g. "chat.message.created"
source: string; // "ws-gateway", "twitch-ingestor"
ts: number; // ms
room?: string; // "global"
userId?: string;
data: T;
};Transport options in Redis
- Best: Redis Streams (
XADD) for durability + replay - Okay: Pub/Sub for realtime only
Do Streams. Pub/Sub is lossy.
Streams naming
events:global(single stream) orevents:{domain}likeevents:chat,events:twitch,events:billing
Publisher
private async publishInternalEvent(type: string, data: any, meta?: Partial<InternalEvent>) {
const ev: InternalEvent = {
id: crypto.randomUUID(),
type,
source: "ws-gateway",
ts: Date.now(),
...meta,
data,
};
// XADD fields must be flat; store JSON as one field
await this.redis.pub.xadd("events:global", "*", "event", JSON.stringify(ev));
}Consumer group (worker/service)
Create a dedicated Nest service (or BullMQ worker) that reads events.
// events.consumer.ts
const STREAM = "events:global";
const GROUP = "chat-services";
const CONSUMER = `c-${process.pid}`;
async function ensureGroup(redis: Redis) {
try {
await redis.xgroup("CREATE", STREAM, GROUP, "$", "MKSTREAM");
} catch (e: any) {
if (!String(e?.message || "").includes("BUSYGROUP")) throw e;
}
}
async function loop(redis: Redis) {
await ensureGroup(redis);
while (true) {
const res = await redis.xreadgroup(
"GROUP", GROUP, CONSUMER,
"BLOCK", 5000,
"COUNT", 50,
"STREAMS", STREAM, ">"
);
if (!res) continue;
for (const [, entries] of res) {
for (const [id, fields] of entries) {
const idx = fields.indexOf("event");
const payload = idx >= 0 ? fields[idx + 1] : null;
if (!payload) {
await redis.xack(STREAM, GROUP, id);
continue;
}
const ev: InternalEvent = JSON.parse(payload);
// route by type
// - persist analytics
// - trigger clip capture
// - enforce moderation
// - sync to DB
// etc.
await redis.xack(STREAM, GROUP, id);
}
}
}
}How this plugs into chat
- On chat send: publish
chat.message.created - Moderation actions: publish
chat.moderation.* - Presence changes: publish
presence.user.joined/left(optional; usually derived)
Then:
- A “chat-history service” consumes events and writes to DB
- A “clip-trigger service” consumes chat spike events
- An “admin audit service” consumes everything
You get clean separation.
What I’d do in your stack
- ZSET presence first (cheap, big reliability win).
- ACK + moderation hook next (stability + control).
- Move all “side effects” to Redis Streams internal events (scale + sanity).
If you want, I can give you:
- a drop-in
ModerationServicewith Redis-backed mute/timeout + message delete, - client-side ACK handling pattern (optimistic UI),
- and a proper Stream consumer module structure for NestJS.