From 3cd7fcb5f948c3bb870f320f5e88cf6b94fac35a Mon Sep 17 00:00:00 2001 From: hippoz Date: Tue, 14 Sep 2021 17:02:29 +0300 Subject: [PATCH] refactor(gateway): completely refactor gateway --- brainlet/api/v2/gateway/index.js | 443 +++++++++++++++---------------- brainlet/index.js | 5 +- 2 files changed, 210 insertions(+), 238 deletions(-) diff --git a/brainlet/api/v2/gateway/index.js b/brainlet/api/v2/gateway/index.js index 465a33a..5b30d8a 100644 --- a/brainlet/api/v2/gateway/index.js +++ b/brainlet/api/v2/gateway/index.js @@ -1,7 +1,5 @@ const websockets = require("ws"); -const EventEmitter = require("events"); const uuid = require("uuid"); -const werift = require("werift"); const { policies, gatewayPingInterval, gatewayPingCheckInterval, clientFacingPingInterval } = require("../../../config"); const { experiments } = require("../../../experiments"); @@ -20,276 +18,249 @@ const wsCloseCodes = { FLOODING: [4007, "Flooding"], NO_PING: [4008, "No ping"], }; -const closeConnectionWithCode = (ws, code) => ws.close(code[0], code[1]); -class GatewayRTCConnection { - constructor(ws, server) { +class GatewaySession { + constructor() { + this.authenticated = false; + this.user = null; + this.token = null; + this.sessionId = uuid.v4(); + + // Specific to websocket sessions + this.isWebsocketConnection = false; + this.ws = null; + this.lastPing = new Date(); + this.channels = []; + } + + setWebsocketClient(ws) { this.ws = ws; - this.server = server; - - this.connection = new werift.RTCPeerConnection(); - this.connection.onnegotiationneeded.subscribe(() => this.renegotiate); // TODO: what + this.isWebsocketConnection = true; } - async answer(offer) { - await this.connection.setRemoteDescription(offer); - await this.connection.setLocalDescription(await this.connection.createAnswer()); - return this.connection.localDescription; + async authenticateWithToken(token) { + let user = null; + try { + user = await checkToken(token); + if (!user) return false; + this.token = token; + this.user = user; + this.authenticated = true; + return true; + } catch(e) { + return false; + } } - async addTrack(track) { - if (!track) return; - this.ws.send(this.server.packet("EVENT_TRACK_NOTIFICATION", { - kind: track.kind - })); - this.connection.addTransceiver(track); + send(name, data) { + this.ws.send(packet(name, data)); } - async renegotiate() { - this.ws.send(this.server.packet("EVENT_RENEGOTIATE_REQUIRED", {})); + isReady() { + return this.authenticated && this.ws; } } -class GatewayServer extends EventEmitter { - constructor({ server }) { - super(); - this.wss = new websockets.Server({ server: server, path: "/gateway" }); +class GatewayHandler { + constructor() { this.sessionCounters = {}; + } + + // server event handlers + handleServerConfigRequest() { + return { + path: "/gateway" + }; + } + + handleServerReady(wss) { + this.wss = wss; this.pingCheckIntervalFunction = setInterval(() => { this.wss.clients.forEach((client) => { if ((new Date() - client.session.lastPing) >= gatewayPingInterval) { - closeConnectionWithCode(client, wsCloseCodes.NO_PING); + client.close(wsCloseCodes.NO_PING[0], wsCloseCodes.NO_PING[1]); } }); }, gatewayPingCheckInterval); + } - this.wss.on("close", () => { - clearInterval(this.pingCheckIntervalFunction); - console.log("gateway: websocket server closed"); - }); + handleServerClose() { + clearInterval(this.pingCheckIntervalFunction); + } - this.wss.on("connection", (ws) => { - if (!policies.allowGatewayConnection) return closeConnectionWithCode(ws, wsCloseCodes.SERVER_DENIED_CONNECTION); - ws.send(packet("HELLO", { pingInterval: clientFacingPingInterval })); - ws.session = { - authenticated: false, - user: null, - sessionId: uuid.v4(), - lastPing: new Date(), - token: null - }; + handleConnection(ws) { + const session = new GatewaySession(); + session.setWebsocketClient(ws); + session.send("HELLO", { pingInterval: clientFacingPingInterval }); + return session; + } - ws.on("close", async () => { - if (ws.session.user && ws.channels) { - if (this.sessionCounters[ws.session.user._id] <= 1) { - this.inChannel(ws.channels[0], (client) => { - console.log(client.session); - client.send(packet("EVENT_CHANNEL_MEMBERS", { - [ws.session.user._id]: { - _id: ws.session.user._id, - username: ws.session.user.username, - status: 0, - status_text: "" - } - })); + handleConnectionClose(ws) { + if (ws.session && ws.session.user && ws.session.channels) { + if (this.sessionCounters[ws.session.user._id] <= 1) { + this.eachInChannel(ws.session.channels[0], (client) => { + if (client.session && client.session.isReady()) { + client.session.send("EVENT_CHANNEL_MEMBERS", { + [ws.session.user._id]: { + _id: ws.session.user._id, + username: ws.session.user.username, + status: 0, + status_text: "" + } }); } - this.sessionCounters[ws.session.user._id]--; - } + }); + } + this.sessionCounters[ws.session.user._id]--; + if (this.sessionCounters[ws.session.user._id] <= 0) { + this.sessionCounters[ws.session.user._id] = null; + delete this.sessionCounters[ws.session.user._id]; + } + } + } - if (ws.rtc && ws.rtc.connection) await ws.rtc.connection.close(); + handleMessage(senderSession, packet) { + return this[`handle_${packet.opcodeType}`](packet, senderSession); + } + + // utility functions + addSessionCounter(id) { + if (!this.sessionCounters[id]) this.sessionCounters[id] = 0; + this.sessionCounters[id]++; + if (this.sessionCounters[id] > policies.perUserMaxGatewayConnections) return false; + return true; + } + + getClients() { + return this.wss.clients; + } + + eachInChannel(channelId, callback) { + const clients = this.getClients(); + clients.forEach((client) => { + if (client.session && client.session.isReady() && client.session.channels.includes(channelId)) + callback(client); + }); + } + + // Gateway message handlers + async handle_YOO({ data }, session) { + if (session.authenticated) return {error: wsCloseCodes.PAYLOAD_ERROR}; + try { + if (!(await session.authenticateWithToken(data.token))) return {error: wsCloseCodes.AUTHENTICATION_ERROR}; + } catch(e) { + return {error: wsCloseCodes.AUTHENTICATION_ERROR}; + } + if (!this.addSessionCounter(session.user._id)) return {error: wsCloseCodes.TOO_MANY_SESSIONS}; + + const channels = (await Channel.find().lean().sort({ _id: -1 }).limit(50).select("-posts -__v").populate("creator", User.getPulicFields(true))) || []; + session.channels = channels.map(x => x._id.toString()); + + session.send("YOO_ACK", { session_id: session.sessionId, channels, user: { username: session.user.username, _id: session.user._id }, __global_experiments: experiments }); + + const channel = session.channels[0]; + if (channel) { + const presence = {}; + + this.eachInChannel(channel, ({ session: remoteSession }) => { + presence[remoteSession.user._id] = { + _id: remoteSession.user._id, + username: remoteSession.user.username, + status: 1, + status_text: "Online" + }; + if (remoteSession.sessionId !== session.sessionId) { + remoteSession.send("EVENT_CHANNEL_MEMBERS", { + [session.user._id]: { + _id: session.user._id, + username: session.user.username, + status: 1, + status_text: "Online" + } + }); + } }); - ws.on("message", async (data, isBinary) => { - try { - if (isBinary) return closeConnectionWithCode(ws, wsCloseCodes.PAYLOAD_ERROR); - const message = parseMessage(data.toString()); - switch (message.opcodeType) { - case "YOO": { - if (ws.session.authenticated) return closeConnectionWithCode(ws, wsCloseCodes.PAYLOAD_ERROR); - // The client has responded to our HELLO with a YOO packet - try { - const user = await checkToken(message.data.token); - if (!user) return closeConnectionWithCode(ws, wsCloseCodes.AUTHENTICATION_ERROR); - ws.session.user = user; - ws.session.authenticated = true; - ws.session.token = message.data.token; - if (!this.sessionCounters[user._id]) this.sessionCounters[user._id] = 0; - this.sessionCounters[user._id]++; - if (this.sessionCounters[user._id] > policies.perUserMaxGatewayConnections) return closeConnectionWithCode(ws, wsCloseCodes.TOO_MANY_SESSIONS); + session.send("EVENT_CHANNEL_MEMBERS", presence); + } + } - // The user is now successfully authenticated, send the YOO_ACK packet - // TODO: This is probably not efficient + async handle_ACTION_PING({ data }, session) { + if (!session.isReady()) return {error: wsCloseCodes.NOT_AUTHENTICATED}; + if (data !== "1") throw new Error("msg: ACTION_PING payload data should be a `1`"); - let channels = await Channel.find().lean().sort({ _id: -1 }).limit(50).select("-posts -__v").populate("creator", User.getPulicFields(true)); - if (!channels) channels = []; - channels = channels.map(x => ({ ...x, _id: x._id.toString() })); + const user = await checkToken(session.token); + if (!user) return {error: wsCloseCodes.AUTHENTICATION_ERROR}; + session.lastPing = new Date(); + } - ws.channels = channels.map(x => x._id); + async handle_ACTION_CREATE_MESSAGE({ data }, session) { + if (!session.isReady()) return {error: wsCloseCodes.NOT_AUTHENTICATED}; + if (typeof data.content !== "string" || typeof data.channel !== "object" || typeof data.channel._id !== "string") throw new Error("msg: invalid fields in json payload"); + + const messageContent = data.content.trim(); + if (messageContent.length > 2000 || messageContent === "") return; + if (data.channel._id.length !== 24) throw new Error("msg: payload has invalid id"); // MONGODB ONLY!! - ws.send(packet("YOO_ACK", { session_id: ws.session.sessionId, channels, user: { username: user.username, _id: user._id }, __global_experiments: experiments })); - - let presence = {}; - const channel = channels[0]; + // Check if the user is in that channel before broadcasting the message + if (!session.channels.includes(data.channel._id)) return {error: wsCloseCodes.NOT_AUTHORIZED}; - if (channel) { - this.inChannel(channel._id, (client) => { - presence[client.session.user._id] = { - _id: client.session.user._id, - username: client.session.user.username, - status: 1, - status_text: "Online" - }; - if (client.session.sessionId !== ws.session.sessionId) client.send(packet("EVENT_CHANNEL_MEMBERS", { - [ws.session.user._id]: { - _id: ws.session.user._id, - username: ws.session.user.username, - status: 1, - status_text: "Online" - } - })); - }); - - ws.send(packet("EVENT_CHANNEL_MEMBERS", presence)); - } - - console.log(`gateway: user ${user.username}: handshake complete`); - } catch (e) { - console.log("gateway:", e); - return closeConnectionWithCode(ws, wsCloseCodes.AUTHENTICATION_ERROR); - } - break; - } - case "ACTION_PING": { - if (!this.authMessage(ws)) return; - if (message.data !== "1") throw new Error("msg: ACTION_PING payload data should be a `1`"); - - const user = await checkToken(ws.session.token); - if (!user) return closeConnectionWithCode(ws, wsCloseCodes.AUTHENTICATION_ERROR); - - ws.session.lastPing = new Date(); - break; - } - case "ACTION_CREATE_MESSAGE": { - if (!this.authMessage(ws)) return; - if (typeof message.data.content !== "string" || typeof message.data.channel !== "object" || typeof message.data.channel._id !== "string") throw new Error("msg: invalid fields in json payload"); - - const messageContent = message.data.content.trim(); - if (messageContent.length > 2000) return; - if (messageContent === "") return; - if (message.data.channel._id.length !== 24) throw new Error("msg: payload has invalid id"); // MONGODB ONLY!! - - // Check if the user is in that channel before broadcasting the message - if (!ws.channels.includes(message.data.channel._id)) return closeConnectionWithCode(ws, wsCloseCodes.NOT_AUTHORIZED); - - this.broadcast(message.data.channel._id, packet("EVENT_CREATE_MESSAGE", { - content: messageContent, - channel: { - _id: message.data.channel._id - }, - author: { - _id: ws.session.user._id, - username: ws.session.user.username - }, - _id: uuid.v4() - })); - - break; - } - case "ACTION_VOICE_REQUEST_SESSION": { - if (!experiments.voiceSFUTesting) return; - // just send ourselves as the voice server lol - ws.send(packet("EVENT_VOICE_ASSIGN_SERVER", { - reportTo: "/gateway", - channel: message.data.channel - })); - break; - } - case "ACTION_VOICE_CONNECTION_REQUEST": { - if (!this.authMessage(ws)) return; - if (!experiments.voiceSFUTesting) return; - - const offer = message.data.offer; - const channel = message.data.channel; - if (!channel || !channel._id) throw new Error("Client did not send a valid channel in ACTION_VOICE_CONNECTION_REQUEST"); - if (!offer) throw new Error("Client did not send any offer in ACTION_VOICE_CONNECTION_REQUEST"); - - let isNewConnection = true; - if (ws.rtc) { - isNewConnection = false; - } else { - ws.rtc = new GatewayRTCConnection(ws, this); - } - - if (isNewConnection) { - ws.rtc.connection.onTransceiver.subscribe(async (transceiver) => { - const [track] = await transceiver.onTrack.asPromise(); - this.inChannel(channel._id, (otherWs) => { - if (!this.clientRTCReady(otherWs)) return; - if (otherWs.session.sessionId === ws.session.sessionId) return; // Don't perform the actions below on ourselves - - otherWs.rtc.addTrack(track); - otherWs.rtc.renegotiate(); - }); - }); - } - - ws.send(packet("EVENT_VOICE_CONNECTION_ANSWER", { - answer: await ws.rtc.answer(offer) - })); - - if (isNewConnection) { - this.inChannel(channel._id, (otherWs) => { - if (!this.clientRTCReady(otherWs)) return; - if (otherWs.session.sessionId === ws.session.sessionId) return; // Don't perform the actions below on ourselves - - const otherReceivers = otherWs.rtc.connection.getReceivers(); - for (let i = 0; i < otherReceivers.length; i++) { - const otherRecevier = otherReceivers[i]; - ws.rtc.addTrack(otherRecevier.tracks[0]); - } - }); - ws.rtc.renegotiate(); - } - break; - } - } - } catch(e) { - console.error("gateway:", e); - return closeConnectionWithCode(ws, wsCloseCodes.PAYLOAD_ERROR); - } + this.eachInChannel(data.channel._id, ({ session: remoteSession }) => { + remoteSession.send("EVENT_CREATE_MESSAGE", { + content: messageContent, + channel: { + _id: data.channel._id + }, + author: { + _id: session.user._id, + username: session.user.username + }, + _id: uuid.v4() }); }); } } -GatewayServer.prototype.broadcast = function(channelId, data) { - this.wss.clients.forEach((client) => { - if (this.clientReady(client) && client.channels.includes(channelId)) client.send(data); - }); -}; - -GatewayServer.prototype.inChannel = function(channelId, func) { - this.wss.clients.forEach((client) => { - if (this.clientReady(client) && client.channels.includes(channelId)) func(client); - }); -}; - -GatewayServer.prototype.clientReady = function(ws) { - return ws.readyState === websockets.OPEN && ws.session && ws.session.authenticated; -}; - -GatewayServer.prototype.clientRTCReady = function(ws) { - return ws.rtc; // TODO: add more checks -}; - -GatewayServer.prototype.authMessage = function(ws) { - if (!this.clientReady(ws)) { - closeConnectionWithCode(ws, wsCloseCodes.NOT_AUTHENTICATED); - return false; +class GatewayServer { + constructor(httpServer) { + this.httpServer = httpServer; + this.wss = null; + this.handler = null; } - return true; -}; -module.exports = GatewayServer; + bindHandler(handler) { + if (this.wss) throw new Error("GatewayServer: cannot bindHandler() to a server which already has a bound handler."); + + const newHandler = new handler(); + this.handler = newHandler; + this.wss = new websockets.Server({ server: this.httpServer, ...(this.handler.handleServerConfigRequest()) }); + this.handler.handleServerReady(this.wss); + this.wss.on("close", () => this.handler.handleServerClose()); + this.wss.on("connection", (ws) => { + const session = this.handler.handleConnection(ws); + if (!session) + return ws.close(); + if (session.__error) + return ws.close(session.__error[0], session.__error[1]); + ws.session = session; + ws.on("message", async (data, isBinary) => { + try { + if (isBinary || !ws.session) return ws.close(wsCloseCodes.PAYLOAD_ERROR[0], wsCloseCodes.PAYLOAD_ERROR[1]); + const status = await this.handler.handleMessage(ws.session, parseMessage(data.toString())); + if (status && status.error) { + return ws.close(status.error[0], status.error[1]); + } + } catch(e) { + console.error("GatewayServer: unexpected error while attempting to handle payload", e); + ws.close(wsCloseCodes.PAYLOAD_ERROR[0], wsCloseCodes.PAYLOAD_ERROR[1]); + } + }); + ws.on("close", (code, reason) => { + this.handler.handleConnectionClose(ws, code, reason); + }); + }); + } +} + +module.exports = { GatewayServer, GatewayHandler }; diff --git a/brainlet/index.js b/brainlet/index.js index 4a703ce..02bf0a3 100755 --- a/brainlet/index.js +++ b/brainlet/index.js @@ -1,6 +1,6 @@ const config = require("./config"); const apiRoute = require("./api/v1"); -const GatewayServerV2 = require("./api/v2/gateway/index"); +const { GatewayServer, GatewayHandler } = require("./api/v2/gateway/index"); const express = require("express"); const cookieParser = require("cookie-parser"); @@ -10,7 +10,8 @@ const http = require("http"); const app = express(); const httpServer = http.createServer(app); -new GatewayServerV2({ server: httpServer }); +const gatewayServer = new GatewayServer(httpServer); +gatewayServer.bindHandler(GatewayHandler); app.use(express.urlencoded({ extended: false })); app.use(express.json());