replace broadcast with dispatch in the gateway code

This commit is contained in:
hippoz 2022-04-10 21:28:36 +03:00
parent dcb4983302
commit 701f6ae1ac
No known key found for this signature in database
GPG key ID: 7C52899193467641
3 changed files with 21 additions and 20 deletions

View file

@ -10,50 +10,50 @@ import { GatewayPayloadType } from "./gatewaypayloadtype";
const GATEWAY_BATCH_INTERVAL = 25000 || process.env.GATEWAY_BATCH_INTERVAL; const GATEWAY_BATCH_INTERVAL = 25000 || process.env.GATEWAY_BATCH_INTERVAL;
const GATEWAY_PING_INTERVAL = 20000 || process.env.GATEWAY_PING_INTERVAL; const GATEWAY_PING_INTERVAL = 20000 || process.env.GATEWAY_PING_INTERVAL;
// mapping between a broadcast id and a websocket client // mapping between a dispatch id and a websocket client
const broadcastChannels = new Map<string, Set<WebSocket>>(); const dispatchChannels = new Map<string, Set<WebSocket>>();
function clientSubscribe(ws: WebSocket, broadcastChannel: string) { function clientSubscribe(ws: WebSocket, dispatchChannel: string) {
ws.state.broadcastChannels.add(broadcastChannel); ws.state.dispatchChannels.add(dispatchChannel);
if (!broadcastChannels.get(broadcastChannel)) { if (!dispatchChannels.get(dispatchChannel)) {
broadcastChannels.set(broadcastChannel, new Set()); dispatchChannels.set(dispatchChannel, new Set());
} }
broadcastChannels.get(broadcastChannel)?.add(ws); dispatchChannels.get(dispatchChannel)?.add(ws);
} }
function clientUnsubscribe(ws: WebSocket, broadcastChannel: string) { function clientUnsubscribe(ws: WebSocket, dispatchChannel: string) {
if (!ws.state) return; if (!ws.state) return;
ws.state.broadcastChannels.delete(broadcastChannel); ws.state.dispatchChannels.delete(dispatchChannel);
const set = broadcastChannels.get(broadcastChannel); const set = dispatchChannels.get(dispatchChannel);
if (!set) return; if (!set) return;
set.delete(ws); set.delete(ws);
if (set.size < 1) { if (set.size < 1) {
broadcastChannels.delete(broadcastChannel); dispatchChannels.delete(dispatchChannel);
} }
} }
function clientUnsubscribeAll(ws: WebSocket) { function clientUnsubscribeAll(ws: WebSocket) {
if (!ws.state) return; if (!ws.state) return;
ws.state.broadcastChannels.forEach(e => { ws.state.dispatchChannels.forEach(e => {
const set = broadcastChannels.get(e); const set = dispatchChannels.get(e);
if (!set) return; if (!set) return;
set.delete(ws); set.delete(ws);
if (set && set.size < 1) { if (set && set.size < 1) {
broadcastChannels.delete(e); dispatchChannels.delete(e);
} }
}); });
ws.state.broadcastChannels = new Set(); ws.state.dispatchChannels = new Set();
} }
export function broadcast(channel: string, message: GatewayPayload) { export function dispatch(channel: string, message: GatewayPayload) {
const members = broadcastChannels.get(channel); const members = dispatchChannels.get(channel);
if (!members) return; if (!members) return;
members.forEach(e => e.send(JSON.stringify(message))); members.forEach(e => e.send(JSON.stringify(message)));
@ -133,7 +133,7 @@ export default function(server: Server) {
alive: false, alive: false,
ready: false, ready: false,
lastAliveCheck: performance.now(), lastAliveCheck: performance.now(),
broadcastChannels: new Set() dispatchChannels: new Set()
}; };
sendPayload(ws, { sendPayload(ws, {
@ -145,7 +145,6 @@ export default function(server: Server) {
ws.on("close", () => { ws.on("close", () => {
clientUnsubscribeAll(ws); clientUnsubscribeAll(ws);
console.log(broadcastChannels);
}); });
ws.on("message", async (rawData, isBinary) => { ws.on("message", async (rawData, isBinary) => {
@ -171,6 +170,7 @@ export default function(server: Server) {
// each user should have their own list of channels that they join // each user should have their own list of channels that they join
const channels = await query("SELECT id, name, owner_id FROM channels"); const channels = await query("SELECT id, name, owner_id FROM channels");
clientSubscribe(ws, "*");
channels.rows.forEach(c => { channels.rows.forEach(c => {
clientSubscribe(ws, `channel:${c.id}`); clientSubscribe(ws, `channel:${c.id}`);
}); });

View file

@ -3,6 +3,7 @@ import { body, param, validationResult } from "express-validator";
import { authenticateRoute } from "../../../auth"; import { authenticateRoute } from "../../../auth";
import { query } from "../../../database"; import { query } from "../../../database";
import { errors } from "../../../errors"; import { errors } from "../../../errors";
import { dispatch } from "../../../gateway";
const router = express.Router(); const router = express.Router();

View file

@ -3,5 +3,5 @@ interface GatewayClientState {
ready: boolean, ready: boolean,
alive: boolean, alive: boolean,
lastAliveCheck: number, lastAliveCheck: number,
broadcastChannels: Set<string> dispatchChannels: Set<string>
} }