124 lines
3.9 KiB
TypeScript
124 lines
3.9 KiB
TypeScript
|
import { Server } from "node:http";
|
||
|
import { performance } from "node:perf_hooks";
|
||
|
import WebSocket, { WebSocketServer } from "ws";
|
||
|
import { decodeTokenOrNull } from "../auth";
|
||
|
import { gatewayErrors } from "../errors";
|
||
|
import { GatewayPayload } from "../types/gatewaypayload";
|
||
|
import { GatewayPayloadType } from "./gatewaypayloadtype";
|
||
|
|
||
|
const GATEWAY_BATCH_INTERVAL = 25000 || process.env.GATEWAY_BATCH_INTERVAL;
|
||
|
const GATEWAY_PING_INTERVAL = 20000 || process.env.GATEWAY_PING_INTERVAL;
|
||
|
|
||
|
function closeWithError(ws: WebSocket, { code, message }: { code: number, message: string }) {
|
||
|
return ws.close(1000, `(${code}) ${message}`);
|
||
|
}
|
||
|
|
||
|
function closeWithBadPayload(ws: WebSocket, hint: string) {
|
||
|
return ws.close(gatewayErrors.BAD_PAYLOAD.code, `${gatewayErrors.BAD_PAYLOAD.message}: ${hint}`);
|
||
|
}
|
||
|
|
||
|
function parseJsonOrNull(payload: string): any {
|
||
|
try {
|
||
|
return JSON.parse(payload);
|
||
|
} catch (e) {
|
||
|
return null;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// The function below ensures `payload` is of the GatewayPayload
|
||
|
// interface payload. If it does not match, null is returned.
|
||
|
function ensureFormattedGatewayPayload(payload: any): GatewayPayload | null {
|
||
|
if (!payload) {
|
||
|
return null;
|
||
|
}
|
||
|
|
||
|
let foundT = false;
|
||
|
let foundD = false;
|
||
|
for (const [k, v] of Object.entries(payload)) {
|
||
|
if (k === "t" && typeof v === "number") {
|
||
|
foundT = true;
|
||
|
} else if (k === "d") {
|
||
|
foundD = true;
|
||
|
} else {
|
||
|
return null;
|
||
|
}
|
||
|
}
|
||
|
if (!foundT || !foundD) {
|
||
|
return null;
|
||
|
}
|
||
|
const asPayload = payload as GatewayPayload;
|
||
|
return asPayload;
|
||
|
}
|
||
|
|
||
|
function sendPayload(ws: WebSocket, payload: GatewayPayload) {
|
||
|
ws.send(JSON.stringify(payload));
|
||
|
}
|
||
|
|
||
|
export default function(server: Server) {
|
||
|
const wss = new WebSocketServer({ server });
|
||
|
|
||
|
setInterval(() => {
|
||
|
wss.clients.forEach((e) => {
|
||
|
const now = performance.now();
|
||
|
if (e.state && (now - e.state.lastAliveCheck) >= GATEWAY_PING_INTERVAL) {
|
||
|
if (!e.state.ready) {
|
||
|
return closeWithError(e, gatewayErrors.AUTHENTICATION_TIMEOUT);
|
||
|
}
|
||
|
}
|
||
|
});
|
||
|
}, GATEWAY_BATCH_INTERVAL);
|
||
|
|
||
|
wss.on("connection", (ws) => {
|
||
|
ws.state = {
|
||
|
user: undefined,
|
||
|
alive: false,
|
||
|
ready: false,
|
||
|
lastAliveCheck: performance.now()
|
||
|
};
|
||
|
|
||
|
sendPayload(ws, {
|
||
|
t: GatewayPayloadType.Hello,
|
||
|
d: {
|
||
|
pingInterval: GATEWAY_PING_INTERVAL
|
||
|
}
|
||
|
});
|
||
|
|
||
|
ws.on("message", async (rawData, isBinary) => {
|
||
|
if (isBinary) {
|
||
|
return closeWithBadPayload(ws, "Binary messages are not supported");
|
||
|
}
|
||
|
|
||
|
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(rawData.toString()));
|
||
|
if (!payload) {
|
||
|
return closeWithBadPayload(ws, "Invalid JSON or message does not match schema");
|
||
|
}
|
||
|
|
||
|
switch (payload.t) {
|
||
|
case GatewayPayloadType.Authenticate: {
|
||
|
const token = payload.d;
|
||
|
if (typeof token !== "string") {
|
||
|
return closeWithBadPayload(ws, "d: expected string");
|
||
|
}
|
||
|
const user = await decodeTokenOrNull(token);
|
||
|
if (!user) {
|
||
|
return closeWithError(ws, gatewayErrors.BAD_AUTH);
|
||
|
}
|
||
|
ws.state.user = user;
|
||
|
ws.state.ready = true;
|
||
|
|
||
|
sendPayload(ws, {
|
||
|
t: GatewayPayloadType.Ready,
|
||
|
d: {
|
||
|
user: ws.state.user,
|
||
|
}
|
||
|
})
|
||
|
break;
|
||
|
}
|
||
|
default: {
|
||
|
return closeWithBadPayload(ws, "t: unknown type");
|
||
|
}
|
||
|
}
|
||
|
});
|
||
|
});
|
||
|
};
|