This repository has been archived on 2022-05-17. You can view files and clone it, but cannot push or open issues or pull requests.
brainlet/brainlet/api/v2/gateway/index.js

289 lines
13 KiB
JavaScript
Raw Normal View History

const websockets = require("ws");
const EventEmitter = require("events");
const uuid = require("uuid");
const werift = require("werift");
const { policies } = require("../../../config");
const { experiments } = require("../../../experiments");
const User = require("../../../models/User");
const Channel = require("../../../models/Channel");
const { parseMessage, packet } = require("./messageparser");
const { checkToken } = require("../../../common/auth/authfunctions");
const wsCloseCodes = {
PAYLOAD_ERROR: [4001, "Error while handling payload"],
NOT_AUTHENTICATED: [4002, "Not authenticated"],
SERVER_DENIED_CONNECTION: [4003, "Server denied connection"],
AUTHENTICATION_ERROR: [4004, "Authentication error"],
TOO_MANY_SESSIONS: [4005, "Too many sessions"],
NOT_AUTHORIZED: [4006, "Not authorized"],
FLOODING: [4007, "Flooding"],
};
const closeConnectionWithCode = (ws, code) => ws.close(code[0], code[1]);
const pingCheckDelay = 10000;
class GatewayRTCConnection {
constructor(ws, server) {
this.ws = ws;
this.server = server;
this.connection = new werift.RTCPeerConnection();
this.connection.onnegotiationneeded.subscribe(() => this.renegotiate); // TODO: what
}
async answer(offer) {
await this.connection.setRemoteDescription(offer);
await this.connection.setLocalDescription(await this.connection.createAnswer());
return this.connection.localDescription;
}
async addTrack(track) {
if (!track) return;
2021-06-02 04:44:52 +03:00
this.ws.send(this.server.packet("EVENT_TRACK_NOTIFICATION", {
kind: track.kind
}));
this.connection.addTransceiver(track);
}
async renegotiate() {
this.ws.send(this.server.packet("EVENT_RENEGOTIATE_REQUIRED", {}));
}
}
class GatewayServer extends EventEmitter {
constructor({ server }) {
super();
this.wss = new websockets.Server({ server: server, path: "/gateway" });
this.sessionCounters = {};
this.pingInterval = setInterval(() => {
this.wss.clients.forEach((client) => {
if (!client.alive) {
client.terminate();
}
client.alive = false;
client.ping(() => {});
});
}, pingCheckDelay);
this.wss.on("close", () => {
clearInterval(this.pingInterval);
console.log("gateway: websocket server closed");
});
this.wss.on("connection", (ws) => {
if (!policies.allowGatewayConnection) return closeConnectionWithCode(ws, wsCloseCodes.SERVER_DENIED_CONNECTION);
ws.send(packet("HELLO", {}));
ws.session = {
authenticated: false,
user: null,
sessionId: uuid.v4()
};
ws.alive = true;
ws.on("pong", () => {
ws.alive = true;
});
ws.on("close", async (code, data) => {
console.log(code, data.toString());
if (ws.session.user && ws.channels) {
if (this.sessionCounters[ws.session.user._id] <= 1) {
this.inChannel(ws.channels[0], (client) => {
console.log(client.session);
client.send(packet("EVENT_CHANNEL_MEMBERS", {
[ws.session.user._id]: {
_id: ws.session.user._id,
username: ws.session.user.username,
status: 0,
status_text: ""
}
}));
});
}
this.sessionCounters[ws.session.user._id]--;
}
if (ws.rtc && ws.rtc.connection) await ws.rtc.connection.close();
});
ws.on("message", async (data, isBinary) => {
try {
if (isBinary) return closeConnectionWithCode(wsCloseCodes.PAYLOAD_ERROR);
const message = parseMessage(data.toString());
switch (message.opcodeType) {
case "YOO": {
// The client has responded to our HELLO with a YOO packet
try {
const user = await checkToken(message.data.token);
if (!user) return closeConnectionWithCode(ws, wsCloseCodes.AUTHENTICATION_ERROR);
ws.session.user = user;
ws.session.authenticated = true;
if (!this.sessionCounters[user._id]) this.sessionCounters[user._id] = 0;
this.sessionCounters[user._id]++;
if (this.sessionCounters[user._id] > policies.perUserMaxGatewayConnections) return closeConnectionWithCode(ws, wsCloseCodes.TOO_MANY_SESSIONS);
// The user is now successfully authenticated, send the YOO_ACK packet
// TODO: This is probably not efficient
let channels = await Channel.find().lean().sort({ _id: -1 }).limit(50).select("-posts -__v").populate("creator", User.getPulicFields(true));
if (!channels) channels = [];
channels = channels.map(x => ({ ...x, _id: x._id.toString() }));
ws.channels = channels.map(x => x._id);
ws.send(packet("YOO_ACK", { session_id: ws.session.sessionId, channels, user: { username: user.username, _id: user._id }, __global_experiments: experiments }));
let presence = {};
const channel = channels[0];
if (channel) {
this.inChannel(channel._id, (client) => {
presence[client.session.user._id] = {
_id: client.session.user._id,
username: client.session.user.username,
status: 1,
status_text: "Online"
};
if (client.session.sessionId !== ws.session.sessionId) client.send(packet("EVENT_CHANNEL_MEMBERS", {
[ws.session.user._id]: {
_id: ws.session.user._id,
username: ws.session.user.username,
status: 1,
status_text: "Online"
}
}));
});
ws.send(packet("EVENT_CHANNEL_MEMBERS", presence));
}
console.log(`gateway: user ${user.username}: handshake complete`);
} catch (e) {
console.log("gateway:", e);
return closeConnectionWithCode(ws, wsCloseCodes.AUTHENTICATION_ERROR);
}
break;
}
case "ACTION_CREATE_MESSAGE": {
if (!this.authMessage(ws)) return;
if (typeof message.data.content !== "string" || typeof message.data.channel !== "object" || typeof message.data.channel._id !== "string") throw new Error("msg: invalid fields in json payload");
const messageContent = message.data.content.trim();
if (messageContent.length > 2000) return;
if (messageContent === "") return;
if (message.data.channel._id.length !== 24) throw new Error("msg: payload has invalid id"); // MONGODB ONLY!!
// Check if the user is in that channel before broadcasting the message
if (!ws.channels.includes(message.data.channel._id)) return closeConnectionWithCode(ws, wsCloseCodes.NOT_AUTHORIZED);
this.broadcast(message.data.channel._id, packet("EVENT_CREATE_MESSAGE", {
content: messageContent,
channel: {
_id: message.data.channel._id
},
author: {
_id: ws.session.user._id,
username: ws.session.user.username
},
_id: uuid.v4()
}));
break;
}
case "ACTION_VOICE_REQUEST_SESSION": {
if (!experiments.voiceSFUTesting) return;
// just send ourselves as the voice server lol
ws.send(packet("EVENT_VOICE_ASSIGN_SERVER", {
reportTo: "/gateway",
channel: message.data.channel
}));
break;
}
case "ACTION_VOICE_CONNECTION_REQUEST": {
if (!this.authMessage(ws)) return;
if (!experiments.voiceSFUTesting) return;
const offer = message.data.offer;
const channel = message.data.channel;
if (!channel || !channel._id) throw new Error("Client did not send a valid channel in ACTION_VOICE_CONNECTION_REQUEST");
if (!offer) throw new Error("Client did not send any offer in ACTION_VOICE_CONNECTION_REQUEST");
let isNewConnection = true;
if (ws.rtc) {
isNewConnection = false;
} else {
ws.rtc = new GatewayRTCConnection(ws, this);
}
if (isNewConnection) {
ws.rtc.connection.onTransceiver.subscribe(async (transceiver) => {
const [track] = await transceiver.onTrack.asPromise();
this.inChannel(channel._id, (otherWs) => {
if (!this.clientRTCReady(otherWs)) return;
if (otherWs.session.sessionId === ws.session.sessionId) return; // Don't perform the actions below on ourselves
otherWs.rtc.addTrack(track);
otherWs.rtc.renegotiate();
});
});
}
ws.send(packet("EVENT_VOICE_CONNECTION_ANSWER", {
answer: await ws.rtc.answer(offer)
}));
if (isNewConnection) {
this.inChannel(channel._id, (otherWs) => {
if (!this.clientRTCReady(otherWs)) return;
if (otherWs.session.sessionId === ws.session.sessionId) return; // Don't perform the actions below on ourselves
const otherReceivers = otherWs.rtc.connection.getReceivers();
for (let i = 0; i < otherReceivers.length; i++) {
const otherRecevier = otherReceivers[i];
ws.rtc.addTrack(otherRecevier.tracks[0]);
}
});
ws.rtc.renegotiate();
}
break;
}
}
} catch(e) {
console.error("gateway:", e);
return closeConnectionWithCode(ws, wsCloseCodes.PAYLOAD_ERROR);
}
});
});
}
}
GatewayServer.prototype.broadcast = function(channelId, data) {
this.wss.clients.forEach((client) => {
if (this.clientReady(client) && client.channels.includes(channelId)) client.send(data);
});
};
GatewayServer.prototype.inChannel = function(channelId, func) {
this.wss.clients.forEach((client) => {
if (this.clientReady(client) && client.channels.includes(channelId)) func(client);
});
};
GatewayServer.prototype.clientReady = function(ws) {
return ws.readyState === websockets.OPEN && ws.session && ws.session.authenticated;
};
GatewayServer.prototype.clientRTCReady = function(ws) {
return ws.rtc; // TODO: add more checks
};
GatewayServer.prototype.authMessage = function(ws) {
if (!this.clientReady(ws)) {
closeConnectionWithCode(ws, wsCloseCodes.NOT_AUTHENTICATED);
return false;
}
return true;
};
module.exports = GatewayServer;