Skip to Content
🧪 ScratchpadChatgptmusingsChat Redis

Below is a clean, production-grade way to convert your current gateway into a Redis-backed message channel, without breaking your existing auth / role / flair logic.

This keeps Socket.IO for transport and moves message fan-out + history ownership to Redis.


What You Are Changing (Conceptually)

Before

client → gateway → in-memory ChatService → socket.emit()

After

client → gateway → Redis PUBLISH Redis SUBSCRIBE → gateway → socket.emit()

Key rules:

  • WebSocket gateway becomes stateless
  • Redis is the source of truth for messages
  • Any number of WS nodes can scale horizontally

Redis Primitives to Use

PurposeRedis primitive
Live fan-outPUB/SUB
Message historyXADD (Streams) or LPUSH
PresenceSET / ZSET (optional later)

We’ll start with:

  • PUB/SUB for realtime
  • LIST for recent history (simple)

Step 1 — Add Redis Service

// redis.service.ts import { Injectable, OnModuleInit, OnModuleDestroy } from "@nestjs/common"; import Redis from "ioredis"; @Injectable() export class RedisService implements OnModuleInit, OnModuleDestroy { pub!: Redis; sub!: Redis; onModuleInit() { this.pub = new Redis(process.env.REDIS_URL); this.sub = new Redis(process.env.REDIS_URL); } onModuleDestroy() { this.pub.quit(); this.sub.quit(); } }

Step 2 — Define Channel Naming (Critical)

const redisChannel = (room: string) => `chat:${room}`; const redisHistory = (room: string) => `chat:${room}:history`;

Never hardcode raw names inline.


Step 3 — Subscribe Once per Gateway Instance

constructor( private readonly chat: ChatService, private readonly emotes: EmotesService, private readonly redis: RedisService, ) {} async afterInit() { for (const channel of ALLOWED_CHANNELS) { const key = redisChannel(channel); await this.redis.sub.subscribe(key); this.redis.sub.on("message", (chan, payload) => { if (chan !== key) return; const msg: ChatMessage = JSON.parse(payload); this.server.to(channel).emit("chat:message", msg); }); } }

Important

  • Do not resubscribe per socket
  • One subscription per gateway process

Step 4 — Publish Instead of Emitting

Replace this:

this.chat.addMessage(channel, msg); this.server.to(channel).emit("chat:message", msg);

With this:

await this.redis.pub.publish( redisChannel(channel), JSON.stringify(msg), ); // Persist history (trimmed) await this.redis.pub.lpush( redisHistory(channel), JSON.stringify(msg), ); await this.redis.pub.ltrim(redisHistory(channel), 0, 199);

Now:

  • Gateway does zero fan-out logic
  • Redis handles distribution

Step 5 — Load History from Redis on Join

Replace:

client.emit("chat:history", this.chat.getHistory(channel));

With:

const historyRaw = await this.redis.pub.lrange( redisHistory(channel), 0, 49, ); const history = historyRaw .map((x) => JSON.parse(x)) .reverse(); client.emit("chat:history", history);

Redis now owns chat history.


Step 6 — Remove In-Memory Chat Message Storage

You can now:

  • Delete chat.addMessage
  • Delete in-memory message buffers
  • Keep chat.join / leave for presence (for now)

Later, presence can also move to Redis.


Step 7 — Failure & Ordering Guarantees

Ordering

  • Redis PUB/SUB preserves publish order per channel
  • LIST preserves message order

Node restarts

  • No data loss (history still exists)

Horizontal scaling

  • Every gateway node receives all messages
  • Socket.IO room routing is local only

Upgrade History → Redis Streams

If you want:

  • Replay
  • Cursor-based history
  • Durable offsets

Use:

XADD chat:global:stream * payload "{...}"

Then:

  • XRANGE for history
  • XREAD for backfill

This is what Discord-scale systems do.


What You Did Not Touch (Correctly)

  • Auth resolution
  • Role + flair logic
  • Emotes sync
  • Client protocol
  • Socket events

This keeps blast radius minimal.


Mental Model Going Forward

  • Gateway = protocol adapter
  • Redis = message bus
  • ChatService = presence + policy only

If you want next:

  • Redis-backed presence (ZSET)
  • Message ACKs + moderation hooks
  • Multi-region Redis fan-out strategy
  • Or converting this to EventSub-style internal events

Say which direction.

// events.consumer.ts const STREAM = “events:global”; const GROUP = “chat-services”; const CONSUMER = c-${process.pid};

async function ensureGroup(redis: Redis) { try { await redis.xgroup(“CREATE”, STREAM, GROUP, ”$”, “MKSTREAM”); } catch (e: any) { if (!String(e?.message || "").includes(“BUSYGROUP”)) throw e; } }

async function loop(redis: Redis) { await ensureGroup(redis);

while (true) { const res = await redis.xreadgroup( “GROUP”, GROUP, CONSUMER, “BLOCK”, 5000, “COUNT”, 50, “STREAMS”, STREAM, ”>” );

if (!res) continue; for (const [, entries] of res) { for (const [id, fields] of entries) { const idx = fields.indexOf("event"); const payload = idx >= 0 ? fields[idx + 1] : null; if (!payload) { await redis.xack(STREAM, GROUP, id); continue; } const ev: InternalEvent = JSON.parse(payload); // route by type // - persist analytics // - trigger clip capture // - enforce moderation // - sync to DB // etc. await redis.xack(STREAM, GROUP, id); } }

} }

## 1) Redis-backed presence with ZSET ### Goals * Presence survives multi-node WS + restarts. * “Online users” queryable per room. * Stale connections cleaned automatically. ### Keys * **Room presence (ZSET):** `presence:{room}` * member: `userId` (or `userId|socketId` if you allow multiple sessions) * score: `lastSeenMs` * **User sockets (SET):** `presence:user:{userId}:sockets` (optional) * **Socket -> user mapping (STRING w/ TTL):** `presence:socket:{socketId}` (optional) ### RedisService additions ```ts // presence.redis.ts const presenceKey = (room: string) => `presence:${room}`; const userSocketsKey = (userId: string) => `presence:user:${userId}:sockets`; const socketKey = (socketId: string) => `presence:socket:${socketId}`; const PRESENCE_TTL_MS = 45_000; // consider 30-90s const PRESENCE_SWEEP_MS = 60_000; // sweep cadence

On connect/join

You should “heartbeat” on join and periodically while connected.

// in gateway private async presenceTouch(room: string, userId: string, socketId: string) { const now = Date.now(); // Single session per user: member=userId await this.redis.pub.zadd(presenceKey(room), now, userId); // Optional: track sockets for a user await this.redis.pub.sadd(userSocketsKey(userId), socketId); // Optional: map socket -> user for cleanup, with TTL await this.redis.pub.set(socketKey(socketId), userId, "PX", PRESENCE_TTL_MS); }

Heartbeat (client → server)

Have client emit e.g. presence:ping every 15s. Server touches Redis.

@SubscribeMessage("presence:ping") async handlePresencePing(@ConnectedSocket() client: Socket) { const room = this.normalizeChannel(client); const user: ChatUser | undefined = client.data.user; if (!user) return; await this.presenceTouch(room, user.id, client.id); }

On disconnect

You can’t safely ZREM userId if user has multiple sockets. If single-session-per-user is enforced, you can remove immediately. If multi-session, remove socket from set and only remove user from ZSET when no sockets remain.

private async presenceLeave(room: string, userId: string, socketId: string) { await this.redis.pub.srem(userSocketsKey(userId), socketId); const remaining = await this.redis.pub.scard(userSocketsKey(userId)); if (remaining <= 0) { await this.redis.pub.del(userSocketsKey(userId)); await this.redis.pub.zrem(presenceKey(room), userId); } await this.redis.pub.del(socketKey(socketId)); }

Stale cleanup (important)

Run in each node, but use a Redis lock so only one sweeper is active.

private presenceSweeperStarted = false; private async startPresenceSweeper() { if (this.presenceSweeperStarted) return; this.presenceSweeperStarted = true; setInterval(async () => { const lockKey = "presence:sweep:lock"; const lock = await this.redis.pub.set(lockKey, "1", "NX", "PX", 10_000); if (!lock) return; const cutoff = Date.now() - PRESENCE_TTL_MS; for (const room of ALLOWED_CHANNELS) { // remove stale users by score await this.redis.pub.zremrangebyscore(presenceKey(room), 0, cutoff); } }, PRESENCE_SWEEP_MS).unref?.(); }

Call startPresenceSweeper() from afterInit().

Query presence snapshot

private async getPresence(room: string) { const cutoff = Date.now() - PRESENCE_TTL_MS; const ids = await this.redis.pub.zrangebyscore(presenceKey(room), cutoff, "+inf"); return ids; }

Then emit presence:update based on that list (and optionally enrich via cached user display info).


2) Message ACKs + moderation hooks

Targets

  • Client knows if message was accepted / rejected.
  • Moderation pipeline can block, redact, timeout, shadowban.
  • Works across multiple WS nodes.

Protocol changes

  • Client sends: chat:send with clientMsgId
  • Server immediately returns: chat:ack (accepted/rejected + reason)
  • Server publishes message only if accepted.

DTO

export type SendMessageDto = { text: string; clientMsgId: string; // uuid from client };

ACK shape

type ChatAck = | { clientMsgId: string; status: "accepted"; serverMsgId: string; ts: number } | { clientMsgId: string; status: "rejected"; reason: string; ts: number };

Moderation hook interface

Keep it synchronous (fast) with optional async follow-up actions.

export type ModerationDecision = | { action: "allow" } | { action: "deny"; reason: string } | { action: "shadow"; reason?: string } | { action: "timeout"; seconds: number; reason: string } | { action: "redact"; replacement: string; reason?: string }; export interface ModerationHook { decide(args: { room: string; user: ChatUser; text: string; ip?: string; }): Promise<ModerationDecision>; }

Example: cheap policy hook (rate limit + basic rules)

Use Redis for rate-limit (per user per room). Token bucket is overkill; start with fixed window.

private async rateLimit(room: string, userId: string) { const key = `rl:${room}:${userId}`; const n = await this.redis.pub.incr(key); if (n === 1) await this.redis.pub.pexpire(key, 10_000); // 10s window return n <= 8; // 8 msgs / 10s }

Gateway send handler with ACK + moderation

@SubscribeMessage("chat:send") async handleMessage( @MessageBody() dto: SendMessageDto, @ConnectedSocket() client: Socket, ) { const room = this.normalizeChannel(client); const user: ChatUser = client.data.user; // auth-only send if (!user || user.id.startsWith("guest:")) { const ack: ChatAck = { clientMsgId: dto.clientMsgId, status: "rejected", reason: "auth_required", ts: Date.now(), }; client.emit("chat:ack", ack); client.disconnect(true); return; } // basic validation const text = (dto.text ?? "").trim(); if (!text || text.length > 500) { const ack: ChatAck = { clientMsgId: dto.clientMsgId, status: "rejected", reason: "invalid_message", ts: Date.now(), }; client.emit("chat:ack", ack); return; } // rate limit if (!(await this.rateLimit(room, user.id))) { const ack: ChatAck = { clientMsgId: dto.clientMsgId, status: "rejected", reason: "rate_limited", ts: Date.now(), }; client.emit("chat:ack", ack); return; } // moderation decision (plug your hooks) const decision = await this.moderation.decide({ room, user, text, ip: client.handshake.address, }); if (decision.action === "deny") { const ack: ChatAck = { clientMsgId: dto.clientMsgId, status: "rejected", reason: decision.reason, ts: Date.now(), }; client.emit("chat:ack", ack); return; } // shadowban: ack accepted but only echo to sender const serverMsgId = crypto.randomUUID(); const ts = Date.now(); let finalText = text; if (decision.action === "redact") finalText = decision.replacement; const msg: ChatMessage = { id: serverMsgId, userId: user.id, author: user.name, roles: user.roles, flair: user.flair, channel: room, text: finalText, timestamp: ts, }; const ack: ChatAck = { clientMsgId: dto.clientMsgId, status: "accepted", serverMsgId, ts, }; client.emit("chat:ack", ack); if (decision.action === "shadow") { client.emit("chat:message", msg); return; } if (decision.action === "timeout") { // publish a moderation event (see EventSub section) + optionally disconnect await this.publishInternalEvent("chat.moderation.timeout", { room, userId: user.id, seconds: decision.seconds, reason: decision.reason, }); // you can also client.disconnect(true) or set a redis "muted" key checked on send } // publish (redis bus) + persist history await this.redis.pub.publish(`chat:${room}`, JSON.stringify(msg)); await this.redis.pub.lpush(`chat:${room}:history`, JSON.stringify(msg)); await this.redis.pub.ltrim(`chat:${room}:history`, 0, 199); }

Moderation “delete message” / “purge user” should be separate events:

  • chat.moderation.delete_message
  • chat.moderation.purge_user and clients handle them (remove from UI).

Why

You’re already evolving into multiple producers/consumers:

  • Twitch ingestion
  • YouTube ingestion
  • Donations (Stripe)
  • Chat messages
  • Moderation actions
  • Admin panel actions

EventSub-style means:

  • Everything becomes an event envelope
  • You can add new downstream services without rewriting producers

Minimal event envelope

export type InternalEvent<T = any> = { id: string; // uuid type: string; // e.g. "chat.message.created" source: string; // "ws-gateway", "twitch-ingestor" ts: number; // ms room?: string; // "global" userId?: string; data: T; };

Transport options in Redis

  • Best: Redis Streams (XADD) for durability + replay
  • Okay: Pub/Sub for realtime only

Do Streams. Pub/Sub is lossy.

Streams naming

  • events:global (single stream) or
  • events:{domain} like events:chat, events:twitch, events:billing

Publisher

private async publishInternalEvent(type: string, data: any, meta?: Partial<InternalEvent>) { const ev: InternalEvent = { id: crypto.randomUUID(), type, source: "ws-gateway", ts: Date.now(), ...meta, data, }; // XADD fields must be flat; store JSON as one field await this.redis.pub.xadd("events:global", "*", "event", JSON.stringify(ev)); }

Consumer group (worker/service)

Create a dedicated Nest service (or BullMQ worker) that reads events.

// events.consumer.ts const STREAM = "events:global"; const GROUP = "chat-services"; const CONSUMER = `c-${process.pid}`; async function ensureGroup(redis: Redis) { try { await redis.xgroup("CREATE", STREAM, GROUP, "$", "MKSTREAM"); } catch (e: any) { if (!String(e?.message || "").includes("BUSYGROUP")) throw e; } } async function loop(redis: Redis) { await ensureGroup(redis); while (true) { const res = await redis.xreadgroup( "GROUP", GROUP, CONSUMER, "BLOCK", 5000, "COUNT", 50, "STREAMS", STREAM, ">" ); if (!res) continue; for (const [, entries] of res) { for (const [id, fields] of entries) { const idx = fields.indexOf("event"); const payload = idx >= 0 ? fields[idx + 1] : null; if (!payload) { await redis.xack(STREAM, GROUP, id); continue; } const ev: InternalEvent = JSON.parse(payload); // route by type // - persist analytics // - trigger clip capture // - enforce moderation // - sync to DB // etc. await redis.xack(STREAM, GROUP, id); } } } }

How this plugs into chat

  • On chat send: publish chat.message.created
  • Moderation actions: publish chat.moderation.*
  • Presence changes: publish presence.user.joined/left (optional; usually derived)

Then:

  • A “chat-history service” consumes events and writes to DB
  • A “clip-trigger service” consumes chat spike events
  • An “admin audit service” consumes everything

You get clean separation.


What I’d do in your stack

  1. ZSET presence first (cheap, big reliability win).
  2. ACK + moderation hook next (stability + control).
  3. Move all “side effects” to Redis Streams internal events (scale + sanity).

If you want, I can give you:

  • a drop-in ModerationService with Redis-backed mute/timeout + message delete,
  • client-side ACK handling pattern (optimistic UI),
  • and a proper Stream consumer module structure for NestJS.
Last updated on