From 3f7c462c0919a6d30e9942071ec4480052fa3fed Mon Sep 17 00:00:00 2001 From: hippoz Date: Tue, 25 May 2021 15:51:35 +0300 Subject: [PATCH] remove legacy gateway and respond to transceiver events --- brainlet/api/v1/gateway/index.js | 324 ----------------------- brainlet/api/v2/gateway/index.js | 25 +- brainlet/api/v2/gateway/messageparser.js | 3 +- brainlet/experiments.js | 2 +- brainlet/index.js | 11 - 5 files changed, 19 insertions(+), 346 deletions(-) delete mode 100644 brainlet/api/v1/gateway/index.js diff --git a/brainlet/api/v1/gateway/index.js b/brainlet/api/v1/gateway/index.js deleted file mode 100644 index ee07a2a..0000000 --- a/brainlet/api/v1/gateway/index.js +++ /dev/null @@ -1,324 +0,0 @@ -const User = require("../../../models/User"); -const secret = require("../../../secret"); -const config = require("../../../config"); -const Channel = require("../../../models/Channel"); -const RateLimiter = require("../../../common/util/ratelimiter"); - -const jwt = require("jsonwebtoken"); -const siolib = require("socket.io"); -const uuid = require("uuid"); - -class GatewayServer { - constructor(httpServer) { - this._io = siolib(httpServer); - this._gateway = this._io.of("/gateway"); - this.rateLimiter = new RateLimiter({ - points: 5, - time: 1000, - minPoints: 0 - }); - this.eventSetup(); - - this._commandPrefix = "/"; - } -} - -GatewayServer.prototype._sendSystemMessage = function(socket, message, channel) { - const messageObject = { - author: { - username: "__SYSTEM", - _id: "5fc69864f15a7c5e504c9a1f" - }, - channel: { - title: channel.title, - _id: channel._id - }, - content: message, - _id: uuid.v4() - }; - - socket.emit("message", messageObject); -}; - -GatewayServer.prototype.notifyClientsOfUpdate = function(reason) { - this._gateway.emit("refreshClient", { reason: reason || "REFRESH" }); -}; - -GatewayServer.prototype._processCommand = async function(socket, message) { - const content = message.content; - const fullCommandString = content.slice(this._commandPrefix.length, content.length); - const fullCommand = fullCommandString.split(" "); - const command = fullCommand[0] || "INVALID_COMMAND"; - const args = fullCommand.length - 1; - - switch (command) { - case "INVALID_COMMAND": { - this._sendSystemMessage(socket, "Invalid command.", message.channel); - break; - } - case "admin/fr": { - if (args === 1) { - if (socket.user.permissionLevel >= config.roleMap.ADMIN) { - this._gateway.emit("refreshClient", { reason: fullCommand[1] || "REFRESH" }); - } else { - this._sendSystemMessage(socket, "how about no", message.channel); - } - } else { - this._sendSystemMessage(socket, "Invalid number of arguments.", message.channel); - } - break; - } - case "admin/fru": { - if (args === 1) { - if (socket.user.permissionLevel >= config.roleMap.ADMIN) { - const user = await this._findSocketInRoom(message.channel._id, fullCommand[1]); - if (!user) { - this._sendSystemMessage(socket, "User not found.", message.channel); - break; - } - - this._gateway.in(user.user.sid).emit("refreshClient", { reason: "REFRESH" }); - } else { - this._sendSystemMessage(socket, "how about no", message.channel); - } - } else { - this._sendSystemMessage(socket, "Invalid number of arguments.", message.channel); - } - break; - } - default: { - this._sendSystemMessage(socket, "That command does not exist.", message.channel); - break; - } - } -}; - -GatewayServer.prototype.authDisconnect = function(socket, callback) { - console.log("[E] [gateway] [handshake] User disconnected due to failed authentication"); - socket.isConnected = false; - socket.disconnect(); - socket.disconnect(true); - callback(new Error("ERR_GATEWAY_AUTH_FAIL")); -}; - -GatewayServer.prototype.eventSetup = function() { - this._gateway.use((socket, callback) => { - console.log("[*] [gateway] [handshake] User authentication attempt"); - socket.isConnected = false; - - setTimeout(() => { - if (socket.isConnected) return; - console.log("[E] [gateway] [handshake] User still not connected after timeout, removing..."); - socket.disconnect(); - socket.disconnect(true); - }, config.gatewayStillNotConnectedTimeoutMS); - - // TODO: Maybe passing the token in the query is not the best idea? - const token = socket.handshake.query.token; - - if (!token) return this.authDisconnect(socket, callback); - if (!(typeof token === "string")) return this.authDisconnect(socket, callback); - - const allSockets = this._gateway.sockets; - for (let [_, e] of allSockets) { - if (e.user && e.user.token === token) { - console.log(`[E] [gateway] [handshake] User ${e.user.username} tried to connect more than once, rejecting connection...`); - return this.authDisconnect(socket, callback); - } - } - - jwt.verify(token, secret.jwtPrivateKey, {}, async (err, data) => { - if (err) return this.authDisconnect(socket, callback); - if (!data) return this.authDisconnect(socket, callback); - if (!data.username) return this.authDisconnect(socket, callback); - - const user = await User.findByUsername(data.username); - - if (!user) return this.authDisconnect(socket, callback); - - let permissionLevel = config.roleMap[user.role]; - if (!permissionLevel) { - permissionLevel = 0; - } - - if (permissionLevel < config.roleMap.USER) return this.authDisconnect(socket, callback); - - socket.user = { - username: data.username, - _id: user._id.toString(), - token, // NOTE(hippoz): Maybe not secure - permissionLevel, - color: user.color - }; - console.log(`[*] [gateway] [handshake] User ${data.username} has successfully authenticated`); - return callback(); - }); - }); - - this._gateway.on("connection", (socket) => { - console.log(`[*] [gateway] [handshake] User ${socket.user.username} connected, sending hello and waiting for yoo...`); - - socket.emit("hello", { - gatewayStillNotConnectedTimeoutMS: config.gatewayStillNotConnectedTimeoutMS, - resolvedUser: { - username: socket.user.username, - _id: socket.user._id - } - }); - - socket.once("yoo", () => { - console.log(`[*] [gateway] [handshake] Got yoo from ${socket.user.username}, connection is finally completed!`); - socket.isConnected = true; - - socket.on("message", async ({ channel, content, nickAuthor, destUser }) => { - if (!channel || !content || !socket.joinedChannels || !socket.isConnected || !socket.user || !(typeof content === "string") || !(typeof channel._id === "string")) return; - content = content.trim(); - if (!content || content === "" || content === " " || content.length >= 2000) return; - if (!this.rateLimiter.consoom(socket.user.token)) { // TODO: maybe user ip instead of token? - console.log(`[E] [gateway] Rate limiting ${socket.user.username}`); - return; - } - - // TODO: When/if channel permissions are added, check if the user has permissions for that channel - const channelTitle = socket.joinedChannels[channel._id]; - if (!channelTitle || !(typeof channelTitle === "string")) return; - - let messageObject = { - author: { - username: socket.user.username, - _id: socket.user._id, - color: socket.user.color - }, - channel: { - title: channelTitle, - _id: channel._id - }, - content: content, - _id: uuid.v4() - }; - - if (nickAuthor && nickAuthor.username && (typeof nickAuthor.username) === "string" && nickAuthor.username.length <= 32 && nickAuthor.username.length >= 3) { - if (socket.user.permissionLevel === config.roleMap.BOT) { - messageObject = { - nickAuthor: { - username: nickAuthor.username - }, - ...messageObject - }; - } - } - - if (messageObject.content.startsWith(this._commandPrefix)) { - this._processCommand(socket, messageObject); - return; - } - - if (destUser && destUser._id && (typeof destUser._id) === "string") { - const user = await this._findSocketInRoom(messageObject.channel._id, destUser._id); - if (!user) return; - - this._gateway.in(user.user.sid).emit("message", messageObject); - return; - } - - this._gateway.in(channel._id).emit("message", messageObject); - }); - - socket.on("subscribe", async (channels) => { - if ( !socket.isConnected || !socket.user || !channels || !Array.isArray(channels) || channels === []) return; - try { - for (const v of channels) { - if (!v && !(typeof v === "string")) continue; - // TODO: When/if channel permissions are added, check if the user has permissions for that channel - const channel = await Channel.findById(v); - if (channel && channel.title && channel._id) { - if (!socket.joinedChannels) socket.joinedChannels = {}; - if (socket.joinedChannels[v]) continue; - socket.joinedChannels[v] = channel.title; - await socket.join(v); - - console.log(`[*] [gateway] User ${socket.user.username} subscribed to room ${v} (${channel.title}), sending updated user list to all members of that room...`); - - const upd = await this._generateClientListUpdateObject(v, channel.title); - this._gateway.in(v).emit("clientListUpdate", upd); - } - } - } catch (e) { - return; - } - }); - - socket.on("disconnecting", async () => { - console.log(`[*] [gateway] User ${socket.user.username} is disconnecting, broadcasting updated user list to all of the rooms they have been in...`); - const rooms = socket.rooms; - rooms.forEach(async (room) => { - // Socket io automatically adds a user to a room with their own id - if (room === socket.id) return; - - const channelTitle = socket.joinedChannels[room] || "UNKNOWN"; - await socket.leave(room); - - const upd = await this._generateClientListUpdateObject(room, channelTitle); - socket.in(room).emit("clientListUpdate", upd); - }); - }); - }); - }); -}; - -GatewayServer.prototype._getSocketsInRoom = async function(room) { - // NOTE: I have no idea why i have to do this dumb thing, why can't socket io just let you simply get the sockets from a room? idk - // There kinda was a way in the previous version, but they want to change the api for the worse each version, i'm guessing - const clients = await this._gateway.in(room).allSockets(); - const updatedClientList = []; - - clients.forEach((sid) => { - const client = this._gateway.sockets.get(sid); // lol they also used dumb ass maps for the socket list, can you fucking not? - if (!client || !client.isConnected || !client.user) return; - updatedClientList.push({ - user: { - username: client.user.username, - _id: client.user._id, - color: client.user.color, - sid: client.id - } - }); - }); - return updatedClientList; -}; - -GatewayServer.prototype._findSocketInRoom = async function(room, userid) { - // NOTE: I have no idea why i have to do this dumb thing, why can't socket io just let you simply get the sockets from a room? idk - // There kinda was a way in the previous version, but they want to change the api for the worse each version, i'm guessing - const clients = await this._gateway.in(room).allSockets(); - const updatedClientList = []; - - clients.forEach((sid) => { - const client = this._gateway.sockets.get(sid); // lol they also used dumb ass maps for the socket list, can you fucking not? - if (!client || !client.isConnected || !client.user) return; - if (userid !== client.user._id) return; - updatedClientList.push({ - user: { - username: client.user.username, - _id: client.user._id, - color: client.user.color, - sid: client.id - } - }); - }); - return updatedClientList[0] || undefined; -}; - -GatewayServer.prototype._generateClientListUpdateObject = async function(room, channelTitle="UNKNOWN") { - const clientList = await this._getSocketsInRoom(room); - return { - channel: { - title: channelTitle, - _id: room - }, - clientList - }; -}; - - -module.exports = GatewayServer; \ No newline at end of file diff --git a/brainlet/api/v2/gateway/index.js b/brainlet/api/v2/gateway/index.js index 611e241..82fbe84 100644 --- a/brainlet/api/v2/gateway/index.js +++ b/brainlet/api/v2/gateway/index.js @@ -70,7 +70,7 @@ class GatewayServer extends EventEmitter { ws.alive = true; }); ws.on("close", async () => { - if (ws.rtc) await ws.rtc.close(); + if (ws.rtc) await ws.rtc.connection.close(); }); ws.on("message", async (data) => { try { @@ -152,30 +152,37 @@ class GatewayServer extends EventEmitter { ws.rtc = new GatewayRTCConnection(ws, this); } + if (isNewConnection) { + ws.rtc.connection.onTransceiver.subscribe(async (transceiver) => { + const [track] = await transceiver.onTrack.asPromise(); + this.inChannel(channel._id, (otherWs) => { + if (!this.clientRTCReady(otherWs)) return; + if (otherWs.session.sessionId === ws.session.sessionId) return; // Don't perform the actions below on ourselves + + otherWs.rtc.addTrack(track); + otherWs.rtc.renegotiate(); + }); + }); + } + ws.send(this.packet("EVENT_VOICE_CONNECTION_ANSWER", { answer: await ws.rtc.answer(offer) })); if (isNewConnection) { this.inChannel(channel._id, (otherWs) => { - //if (!this.clientReady(otherWs)) return; if (!this.clientRTCReady(otherWs)) return; - if (otherWs.session.user._id === ws.session.user._id) return; // Don't perform the actions below on ourselves + if (otherWs.session.sessionId === ws.session.sessionId) return; // Don't perform the actions below on ourselves const otherReceivers = otherWs.rtc.connection.getReceivers(); for (let i = 0; i < otherReceivers.length; i++) { const otherRecevier = otherReceivers[i]; ws.rtc.addTrack(otherRecevier.tracks[0]); } - const myReceviers = ws.rtc.connection.getReceivers(); - for (let i = 0; i < myReceviers.length; i++) { - const myReceiver = myReceviers[i]; - otherWs.rtc.addTrack(myReceiver.tracks[0]); - } - otherWs.rtc.renegotiate(); }); ws.rtc.renegotiate(); } + break; } } } catch(e) { diff --git a/brainlet/api/v2/gateway/messageparser.js b/brainlet/api/v2/gateway/messageparser.js index 6b54e90..d7a7371 100644 --- a/brainlet/api/v2/gateway/messageparser.js +++ b/brainlet/api/v2/gateway/messageparser.js @@ -8,7 +8,8 @@ const opcodes = { 22: { name: "EVENT_VOICE_ASSIGN_SERVER", data: "JSON" }, 23: { name: "ACTION_VOICE_CONNECTION_REQUEST", data: "JSON" }, 24: { name: "EVENT_VOICE_CONNECTION_ANSWER", data: "JSON" }, - 25: { name: "EVENT_RENEGOTIATE_REQUIRED", data: "JSON" } + 25: { name: "EVENT_RENEGOTIATE_REQUIRED", data: "JSON" }, + 26: { name: "ACTION_VOICE_ADD_ICE", data: "JSON" } }; const opcodeSeparator = "@"; diff --git a/brainlet/experiments.js b/brainlet/experiments.js index 2971298..c4b79ee 100644 --- a/brainlet/experiments.js +++ b/brainlet/experiments.js @@ -1,6 +1,6 @@ module.exports = { experiments: { - voiceSFUTesting: false, + voiceSFUTesting: true, userListTest: false } }; diff --git a/brainlet/index.js b/brainlet/index.js index f098c84..b4fdaee 100755 --- a/brainlet/index.js +++ b/brainlet/index.js @@ -7,13 +7,11 @@ const cors = require("cors"); const http = require("http"); const { authenticateEndpoint } = require("./common/auth/authfunctions"); -const GatewayServer = require("./api/v1/gateway/index"); const GatewayServerV2 = require("./api/v2/gateway/index"); const app = express(); const httpServer = http.createServer(app); -const gateway = new GatewayServer(httpServer); const gatewayv2 = new GatewayServerV2({ server: httpServer }); app.use(express.urlencoded({ extended: false })); @@ -42,15 +40,6 @@ app.use((err, req, res, next) => { res.status(500).json({ error: true, status: 500, message: "ERR_INTERNAL_SERVER_ERROR" }); }); -const onServerClosing = () => { - gateway.notifyClientsOfUpdate("exit"); - process.exit(); -}; - -["exit", "SIGINT", "SIGUSR1", "SIGUSR2", "uncaughtException", "SIGTERM"].forEach((eventType) => { - process.on(eventType, onServerClosing.bind(null, eventType)); -}); - httpServer.listen(config.ports.mainServerPort, () => { console.log(`[*] [server] Main server is listening on port ${config.ports.mainServerPort}`); });