forked from hippoz/brainlet
312 lines
12 KiB
JavaScript
312 lines
12 KiB
JavaScript
const websockets = require("ws");
|
|
const { v4 } = require("uuid");
|
|
const mongoose = require("mongoose");
|
|
|
|
const { policies, gatewayPingInterval, gatewayPingCheckInterval, clientFacingPingInterval } = require("../../../config");
|
|
const { experiments } = require("../../../config");
|
|
const User = require("../../../models/User");
|
|
const Channel = require("../../../models/Channel");
|
|
const Message = require("../../../models/Message");
|
|
const { parseMessage, packet } = require("./messageparser");
|
|
const { checkToken } = require("../../../common/auth/authfunctions");
|
|
const config = require("../../../config");
|
|
|
|
const wsCloseCodes = {
|
|
PAYLOAD_ERROR: [4001, "Error while handling payload"],
|
|
NOT_AUTHENTICATED: [4002, "Not authenticated"],
|
|
SERVER_DENIED_CONNECTION: [4003, "Server denied connection"],
|
|
AUTHENTICATION_ERROR: [4004, "Authentication error"],
|
|
TOO_MANY_SESSIONS: [4005, "Too many sessions"],
|
|
NOT_AUTHORIZED: [4006, "Not authorized"],
|
|
FLOODING: [4007, "Flooding"],
|
|
NO_PING: [4008, "No ping"],
|
|
UNSUPPORTED_ATTRIBUTE: [4009, "Unsupported attribute"],
|
|
ILLEGAL_PAYLOAD_SIZE: [4010, "Illegal payload size"],
|
|
};
|
|
|
|
const attributes = {
|
|
PRESENCE_UPDATES: "PRESENCE_UPDATES",
|
|
};
|
|
|
|
const supportedAttributes = [attributes.PRESENCE_UPDATES];
|
|
|
|
class GatewaySession {
|
|
constructor() {
|
|
this.authenticated = false;
|
|
this.user = null;
|
|
this.token = null;
|
|
this.sessionId = v4();
|
|
this.attributes = [];
|
|
|
|
// Specific to websocket sessions
|
|
this.isWebsocketConnection = false;
|
|
this.ws = null;
|
|
this.lastPing = new Date();
|
|
this.channels = [];
|
|
}
|
|
|
|
hasAttribute(roleName) {
|
|
if (roleName.length < 1) return true; // TODO: HACK
|
|
return this.attributes.includes(roleName);
|
|
}
|
|
|
|
setWebsocketClient(ws) {
|
|
this.ws = ws;
|
|
this.isWebsocketConnection = true;
|
|
}
|
|
|
|
async authenticateWithToken(token) {
|
|
let user = null;
|
|
try {
|
|
user = await checkToken(token);
|
|
if (!user) return false;
|
|
this.token = token;
|
|
this.user = user;
|
|
this.authenticated = true;
|
|
return true;
|
|
} catch(e) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
send(name, data) {
|
|
this.ws.send(packet(name, data));
|
|
}
|
|
|
|
isReady() {
|
|
return this.authenticated && this.ws;
|
|
}
|
|
}
|
|
|
|
class GatewayHandler {
|
|
constructor() {
|
|
this.sessionCounters = {};
|
|
}
|
|
|
|
// server event handlers
|
|
handleServerConfigRequest() {
|
|
return {
|
|
path: "/gateway"
|
|
};
|
|
}
|
|
|
|
handleServerReady(wss) {
|
|
this.wss = wss;
|
|
|
|
this.pingCheckIntervalFunction = setInterval(() => {
|
|
this.wss.clients.forEach((client) => {
|
|
if ((new Date() - client.session.lastPing) >= gatewayPingInterval) {
|
|
client.close(wsCloseCodes.NO_PING[0], wsCloseCodes.NO_PING[1]);
|
|
}
|
|
});
|
|
}, gatewayPingCheckInterval);
|
|
}
|
|
|
|
handleServerClose() {
|
|
clearInterval(this.pingCheckIntervalFunction);
|
|
}
|
|
|
|
handleConnection(ws) {
|
|
if (!policies.allowGatewayConnection) {
|
|
return ws.close(wsCloseCodes.SERVER_DENIED_CONNECTION[0], wsCloseCodes.SERVER_DENIED_CONNECTION[1]);
|
|
}
|
|
|
|
const session = new GatewaySession();
|
|
session.setWebsocketClient(ws);
|
|
session.send("HELLO", { pingInterval: clientFacingPingInterval, supportedAttributes });
|
|
return session;
|
|
}
|
|
|
|
handleConnectionClose(ws) {
|
|
if (ws.session && ws.session.user && ws.session.channels) {
|
|
if (this.sessionCounters[ws.session.user._id] <= 1) {
|
|
this.eachInChannel({channelId: ws.session.channels[0], role: attributes.PRESENCE_UPDATES}, (client) => {
|
|
if (client.session && client.session.isReady()) {
|
|
client.session.send("EVENT_CHANNEL_MEMBERS", {
|
|
[ws.session.user._id]: {
|
|
_id: ws.session.user._id,
|
|
username: ws.session.user.username,
|
|
status: 0,
|
|
status_text: ""
|
|
}
|
|
});
|
|
}
|
|
});
|
|
}
|
|
this.sessionCounters[ws.session.user._id]--;
|
|
if (this.sessionCounters[ws.session.user._id] <= 0) {
|
|
this.sessionCounters[ws.session.user._id] = null;
|
|
delete this.sessionCounters[ws.session.user._id];
|
|
}
|
|
}
|
|
}
|
|
|
|
handleMessage(senderSession, packet) {
|
|
return this[`handle_${packet.opcodeType}`](packet, senderSession);
|
|
}
|
|
|
|
// utility functions
|
|
addSessionCounter(id) {
|
|
if (!this.sessionCounters[id]) this.sessionCounters[id] = 0;
|
|
this.sessionCounters[id]++;
|
|
if (this.sessionCounters[id] > policies.perUserMaxGatewayConnections) return false;
|
|
return true;
|
|
}
|
|
|
|
getClients() {
|
|
return this.wss.clients;
|
|
}
|
|
|
|
eachInChannel({channelId, role=""}, callback) {
|
|
const clients = this.getClients();
|
|
clients.forEach((client) => {
|
|
if (client.session && client.session.isReady() && client.session.hasAttribute(role) && client.session.channels.includes(channelId))
|
|
callback(client);
|
|
});
|
|
}
|
|
|
|
// Gateway message handlers
|
|
async handle_YOO({ data }, session) {
|
|
if (session.authenticated) return {error: wsCloseCodes.PAYLOAD_ERROR};
|
|
try {
|
|
if (!(await session.authenticateWithToken(data.token))) return {error: wsCloseCodes.AUTHENTICATION_ERROR};
|
|
} catch(e) {
|
|
return {error: wsCloseCodes.AUTHENTICATION_ERROR};
|
|
}
|
|
if (!this.addSessionCounter(session.user._id)) return {error: wsCloseCodes.TOO_MANY_SESSIONS};
|
|
|
|
const channels = (await Channel.find().lean().sort({ _id: -1 }).limit(50).select("-__v").populate("creator", User.getPulicFields(true))) || [];
|
|
session.channels = channels.map(x => x._id.toString());
|
|
|
|
if (data.attributes) {
|
|
if (!Array.isArray(data.attributes) || data.attributes.length > 8) return {error: wsCloseCodes.PAYLOAD_ERROR};
|
|
for (let i = 0; i < data.attributes.length; i++) {
|
|
if (!supportedAttributes.includes(data.attributes[i]))
|
|
return {error: wsCloseCodes.UNSUPPORTED_ATTRIBUTE};
|
|
}
|
|
session.attributes = data.attributes;
|
|
}
|
|
|
|
session.send("YOO_ACK", { session_id: session.sessionId, channels, user: { username: session.user.username, _id: session.user._id }, __global_experiments: experiments });
|
|
|
|
const channel = session.channels[0];
|
|
if (channel) {
|
|
const presence = {};
|
|
|
|
this.eachInChannel({channelId: channel}, ({ session: remoteSession }) => {
|
|
presence[remoteSession.user._id] = {
|
|
_id: remoteSession.user._id,
|
|
username: remoteSession.user.username,
|
|
status: 1,
|
|
status_text: "Online"
|
|
};
|
|
if (remoteSession.sessionId !== session.sessionId && remoteSession.hasAttribute(attributes.PRESENCE_UPDATES)) {
|
|
remoteSession.send("EVENT_CHANNEL_MEMBERS", {
|
|
[session.user._id]: {
|
|
_id: session.user._id,
|
|
username: session.user.username,
|
|
status: 1,
|
|
status_text: "Online"
|
|
}
|
|
});
|
|
}
|
|
});
|
|
|
|
(session.hasAttribute(attributes.PRESENCE_UPDATES)) && session.send("EVENT_CHANNEL_MEMBERS", presence);
|
|
}
|
|
}
|
|
|
|
async handle_ACTION_PING({ data }, session) {
|
|
if (!session.isReady()) return {error: wsCloseCodes.NOT_AUTHENTICATED};
|
|
if (data !== "1") throw new Error("msg: ACTION_PING payload data should be a `1`");
|
|
|
|
const user = await checkToken(session.token);
|
|
if (!user) return {error: wsCloseCodes.AUTHENTICATION_ERROR};
|
|
session.lastPing = new Date();
|
|
}
|
|
|
|
async handle_ACTION_CREATE_MESSAGE({ data }, session) {
|
|
if (!session.isReady()) return {error: wsCloseCodes.NOT_AUTHENTICATED};
|
|
if (typeof data.content !== "string" || typeof data.channel !== "object" || typeof data.channel._id !== "string") throw new Error("msg: invalid fields in json payload");
|
|
|
|
const messageContent = data.content.trim();
|
|
if (messageContent.length > 2000 || messageContent.length < 1) return;
|
|
if (data.channel._id.length !== 24) throw new Error("msg: payload has invalid id"); // MONGODB ONLY!!
|
|
|
|
// Check if the user is in that channel before broadcasting the message
|
|
if (!session.channels.includes(data.channel._id)) return {error: wsCloseCodes.NOT_AUTHORIZED};
|
|
|
|
this.eachInChannel({channelId: data.channel._id}, async ({ session: remoteSession }) => {
|
|
let id;
|
|
if (policies.allowSavingMessages) {
|
|
const message = await Message.create({
|
|
author: session.user._id,
|
|
channel: data.channel._id,
|
|
content: messageContent,
|
|
createdAt: new Date().getTime()
|
|
});
|
|
id = message._id;
|
|
} else {
|
|
id = new mongoose.Types.ObjectId();
|
|
}
|
|
|
|
remoteSession.send("EVENT_CREATE_MESSAGE", {
|
|
content: messageContent,
|
|
channel: {
|
|
_id: data.channel._id
|
|
},
|
|
author: {
|
|
_id: session.user._id,
|
|
username: session.user.username
|
|
},
|
|
_id: id
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
class GatewayServer {
|
|
constructor(httpServer) {
|
|
this.httpServer = httpServer;
|
|
this.wss = null;
|
|
this.handler = null;
|
|
}
|
|
|
|
bindHandler(handler) {
|
|
if (this.wss) throw new Error("GatewayServer: cannot bindHandler() to a server which already has a bound handler.");
|
|
|
|
const newHandler = new handler();
|
|
this.handler = newHandler;
|
|
this.wss = new websockets.Server({ server: this.httpServer, ...(this.handler.handleServerConfigRequest()) });
|
|
this.handler.handleServerReady(this.wss);
|
|
this.wss.on("close", () => this.handler.handleServerClose());
|
|
this.wss.on("connection", (ws) => {
|
|
const session = this.handler.handleConnection(ws);
|
|
if (!session)
|
|
return ws.close();
|
|
if (session.__error)
|
|
return ws.close(session.__error[0], session.__error[1]);
|
|
ws.session = session;
|
|
ws.on("message", async (data, isBinary) => {
|
|
try {
|
|
if (isBinary || !ws.session)
|
|
return ws.close(wsCloseCodes.PAYLOAD_ERROR[0], wsCloseCodes.PAYLOAD_ERROR[1]);
|
|
if (data.byteLength > config.gatewayMaxPayloadBytes)
|
|
return ws.close(wsCloseCodes.ILLEGAL_PAYLOAD_SIZE[0], wsCloseCodes.ILLEGAL_PAYLOAD_SIZE[1]);
|
|
const status = await this.handler.handleMessage(ws.session, parseMessage(data.toString()));
|
|
if (status && status.error) {
|
|
return ws.close(status.error[0], status.error[1]);
|
|
}
|
|
} catch(e) {
|
|
console.error("GatewayServer: unexpected error while attempting to handle payload", e);
|
|
ws.close(wsCloseCodes.PAYLOAD_ERROR[0], wsCloseCodes.PAYLOAD_ERROR[1]);
|
|
}
|
|
});
|
|
ws.on("close", (code, reason) => {
|
|
this.handler.handleConnectionClose(ws, code, reason);
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
module.exports = { GatewayServer, GatewayHandler };
|