const websockets = require("ws"); const { v4 } = require("uuid"); const mongoose = require("mongoose"); const { policies, gatewayPingInterval, gatewayPingCheckInterval, clientFacingPingInterval } = require("../../../config"); const { experiments } = require("../../../config"); const User = require("../../../models/User"); const Channel = require("../../../models/Channel"); const Message = require("../../../models/Message"); const { parseMessage, packet } = require("./messageparser"); const { checkToken } = require("../../../common/auth/authfunctions"); const wsCloseCodes = { PAYLOAD_ERROR: [4001, "Error while handling payload"], NOT_AUTHENTICATED: [4002, "Not authenticated"], SERVER_DENIED_CONNECTION: [4003, "Server denied connection"], AUTHENTICATION_ERROR: [4004, "Authentication error"], TOO_MANY_SESSIONS: [4005, "Too many sessions"], NOT_AUTHORIZED: [4006, "Not authorized"], FLOODING: [4007, "Flooding"], NO_PING: [4008, "No ping"], UNSUPPORTED_ATTRIBUTE: [4009, "Unsupported attribute."], }; const attributes = { PRESENCE_UPDATES: "PRESENCE_UPDATES", }; const supportedAttributes = [attributes.PRESENCE_UPDATES]; class GatewaySession { constructor() { this.authenticated = false; this.user = null; this.token = null; this.sessionId = v4(); this.attributes = []; // Specific to websocket sessions this.isWebsocketConnection = false; this.ws = null; this.lastPing = new Date(); this.channels = []; } hasAttribute(roleName) { if (roleName.length < 1) return true; // TODO: HACK return this.attributes.includes(roleName); } setWebsocketClient(ws) { this.ws = ws; this.isWebsocketConnection = true; } async authenticateWithToken(token) { let user = null; try { user = await checkToken(token); if (!user) return false; this.token = token; this.user = user; this.authenticated = true; return true; } catch(e) { return false; } } send(name, data) { this.ws.send(packet(name, data)); } isReady() { return this.authenticated && this.ws; } } class GatewayHandler { constructor() { this.sessionCounters = {}; } // server event handlers handleServerConfigRequest() { return { path: "/gateway" }; } handleServerReady(wss) { this.wss = wss; this.pingCheckIntervalFunction = setInterval(() => { this.wss.clients.forEach((client) => { if ((new Date() - client.session.lastPing) >= gatewayPingInterval) { client.close(wsCloseCodes.NO_PING[0], wsCloseCodes.NO_PING[1]); } }); }, gatewayPingCheckInterval); } handleServerClose() { clearInterval(this.pingCheckIntervalFunction); } handleConnection(ws) { if (!policies.allowGatewayConnection) { return ws.close(wsCloseCodes.SERVER_DENIED_CONNECTION[0], wsCloseCodes.SERVER_DENIED_CONNECTION[1]); } const session = new GatewaySession(); session.setWebsocketClient(ws); session.send("HELLO", { pingInterval: clientFacingPingInterval, supportedAttributes }); return session; } handleConnectionClose(ws) { if (ws.session && ws.session.user && ws.session.channels) { if (this.sessionCounters[ws.session.user._id] <= 1) { this.eachInChannel({channelId: ws.session.channels[0], role: attributes.PRESENCE_UPDATES}, (client) => { if (client.session && client.session.isReady()) { client.session.send("EVENT_CHANNEL_MEMBERS", { [ws.session.user._id]: { _id: ws.session.user._id, username: ws.session.user.username, status: 0, status_text: "" } }); } }); } this.sessionCounters[ws.session.user._id]--; if (this.sessionCounters[ws.session.user._id] <= 0) { this.sessionCounters[ws.session.user._id] = null; delete this.sessionCounters[ws.session.user._id]; } } } handleMessage(senderSession, packet) { return this[`handle_${packet.opcodeType}`](packet, senderSession); } // utility functions addSessionCounter(id) { if (!this.sessionCounters[id]) this.sessionCounters[id] = 0; this.sessionCounters[id]++; if (this.sessionCounters[id] > policies.perUserMaxGatewayConnections) return false; return true; } getClients() { return this.wss.clients; } eachInChannel({channelId, role=""}, callback) { const clients = this.getClients(); clients.forEach((client) => { if (client.session && client.session.isReady() && client.session.hasAttribute(role) && client.session.channels.includes(channelId)) callback(client); }); } // Gateway message handlers async handle_YOO({ data }, session) { if (session.authenticated) return {error: wsCloseCodes.PAYLOAD_ERROR}; try { if (!(await session.authenticateWithToken(data.token))) return {error: wsCloseCodes.AUTHENTICATION_ERROR}; } catch(e) { return {error: wsCloseCodes.AUTHENTICATION_ERROR}; } if (!this.addSessionCounter(session.user._id)) return {error: wsCloseCodes.TOO_MANY_SESSIONS}; const channels = (await Channel.find().lean().sort({ _id: -1 }).limit(50).select("-posts -__v").populate("creator", User.getPulicFields(true))) || []; session.channels = channels.map(x => x._id.toString()); if (data.attributes) { if (!Array.isArray(data.attributes) || data.attributes.length > 8) return {error: wsCloseCodes.PAYLOAD_ERROR}; for (let i = 0; i < data.attributes.length; i++) { if (!supportedAttributes.includes(data.attributes[i])) return {error: wsCloseCodes.UNSUPPORTED_ATTRIBUTE}; } session.attributes = data.attributes; } session.send("YOO_ACK", { session_id: session.sessionId, channels, user: { username: session.user.username, _id: session.user._id }, __global_experiments: experiments }); const channel = session.channels[0]; if (channel) { const presence = {}; this.eachInChannel({channelId: channel}, ({ session: remoteSession }) => { presence[remoteSession.user._id] = { _id: remoteSession.user._id, username: remoteSession.user.username, status: 1, status_text: "Online" }; if (remoteSession.sessionId !== session.sessionId && remoteSession.hasAttribute(attributes.PRESENCE_UPDATES)) { remoteSession.send("EVENT_CHANNEL_MEMBERS", { [session.user._id]: { _id: session.user._id, username: session.user.username, status: 1, status_text: "Online" } }); } }); (session.hasAttribute(attributes.PRESENCE_UPDATES)) && session.send("EVENT_CHANNEL_MEMBERS", presence); } } async handle_ACTION_PING({ data }, session) { if (!session.isReady()) return {error: wsCloseCodes.NOT_AUTHENTICATED}; if (data !== "1") throw new Error("msg: ACTION_PING payload data should be a `1`"); const user = await checkToken(session.token); if (!user) return {error: wsCloseCodes.AUTHENTICATION_ERROR}; session.lastPing = new Date(); } async handle_ACTION_CREATE_MESSAGE({ data }, session) { if (!session.isReady()) return {error: wsCloseCodes.NOT_AUTHENTICATED}; if (typeof data.content !== "string" || typeof data.channel !== "object" || typeof data.channel._id !== "string") throw new Error("msg: invalid fields in json payload"); const messageContent = data.content.trim(); if (messageContent.length > 2000 || messageContent.length < 1) return; if (data.channel._id.length !== 24) throw new Error("msg: payload has invalid id"); // MONGODB ONLY!! // Check if the user is in that channel before broadcasting the message if (!session.channels.includes(data.channel._id)) return {error: wsCloseCodes.NOT_AUTHORIZED}; this.eachInChannel({channelId: data.channel._id}, async ({ session: remoteSession }) => { let id; if (policies.allowSavingMessages) { const message = await Message.create({ author: session.user._id, channel: data.channel._id, content: messageContent, createdAt: new Date().getTime() }); id = message._id; } else { id = new mongoose.Types.ObjectId(); } remoteSession.send("EVENT_CREATE_MESSAGE", { content: messageContent, channel: { _id: data.channel._id }, author: { _id: session.user._id, username: session.user.username }, _id: id }); }); } } class GatewayServer { constructor(httpServer) { this.httpServer = httpServer; this.wss = null; this.handler = null; } bindHandler(handler) { if (this.wss) throw new Error("GatewayServer: cannot bindHandler() to a server which already has a bound handler."); const newHandler = new handler(); this.handler = newHandler; this.wss = new websockets.Server({ server: this.httpServer, ...(this.handler.handleServerConfigRequest()) }); this.handler.handleServerReady(this.wss); this.wss.on("close", () => this.handler.handleServerClose()); this.wss.on("connection", (ws) => { const session = this.handler.handleConnection(ws); if (!session) return ws.close(); if (session.__error) return ws.close(session.__error[0], session.__error[1]); ws.session = session; ws.on("message", async (data, isBinary) => { try { if (isBinary || !ws.session) return ws.close(wsCloseCodes.PAYLOAD_ERROR[0], wsCloseCodes.PAYLOAD_ERROR[1]); const status = await this.handler.handleMessage(ws.session, parseMessage(data.toString())); if (status && status.error) { return ws.close(status.error[0], status.error[1]); } } catch(e) { console.error("GatewayServer: unexpected error while attempting to handle payload", e); ws.close(wsCloseCodes.PAYLOAD_ERROR[0], wsCloseCodes.PAYLOAD_ERROR[1]); } }); ws.on("close", (code, reason) => { this.handler.handleConnectionClose(ws, code, reason); }); }); } } module.exports = { GatewayServer, GatewayHandler };