forked from hippoz/brainlet
Merge pull request 'Add gateway pings' (#20) from gateway-pings into master
Reviewed-on: hippoz/brainlet#20
This commit is contained in:
commit
aa7c838164
8 changed files with 871 additions and 1049 deletions
|
@ -1,6 +1,5 @@
|
||||||
module.exports = {
|
module.exports = {
|
||||||
"env": {
|
"env": {
|
||||||
"browser": true,
|
|
||||||
"commonjs": true,
|
"commonjs": true,
|
||||||
"es2021": true,
|
"es2021": true,
|
||||||
"node": true
|
"node": true
|
||||||
|
|
|
@ -3,14 +3,24 @@ const EventEmitter = require("events");
|
||||||
const uuid = require("uuid");
|
const uuid = require("uuid");
|
||||||
const werift = require("werift");
|
const werift = require("werift");
|
||||||
|
|
||||||
const { policies } = require("../../../config");
|
const { policies, gatewayPingInterval, gatewayPingCheckInterval, clientFacingPingInterval } = require("../../../config");
|
||||||
const { experiments } = require("../../../experiments");
|
const { experiments } = require("../../../experiments");
|
||||||
const User = require("../../../models/User");
|
const User = require("../../../models/User");
|
||||||
const Channel = require("../../../models/Channel");
|
const Channel = require("../../../models/Channel");
|
||||||
const { parseMessage, packet } = require("./messageparser");
|
const { parseMessage, packet } = require("./messageparser");
|
||||||
const { checkToken } = require("../../../common/auth/authfunctions");
|
const { checkToken } = require("../../../common/auth/authfunctions");
|
||||||
|
|
||||||
const pingCheckDelay = 10000;
|
const wsCloseCodes = {
|
||||||
|
PAYLOAD_ERROR: [4001, "Error while handling payload"],
|
||||||
|
NOT_AUTHENTICATED: [4002, "Not authenticated"],
|
||||||
|
SERVER_DENIED_CONNECTION: [4003, "Server denied connection"],
|
||||||
|
AUTHENTICATION_ERROR: [4004, "Authentication error"],
|
||||||
|
TOO_MANY_SESSIONS: [4005, "Too many sessions"],
|
||||||
|
NOT_AUTHORIZED: [4006, "Not authorized"],
|
||||||
|
FLOODING: [4007, "Flooding"],
|
||||||
|
NO_PING: [4008, "No ping"],
|
||||||
|
};
|
||||||
|
const closeConnectionWithCode = (ws, code) => ws.close(code[0], code[1]);
|
||||||
|
|
||||||
class GatewayRTCConnection {
|
class GatewayRTCConnection {
|
||||||
constructor(ws, server) {
|
constructor(ws, server) {
|
||||||
|
@ -18,7 +28,7 @@ class GatewayRTCConnection {
|
||||||
this.server = server;
|
this.server = server;
|
||||||
|
|
||||||
this.connection = new werift.RTCPeerConnection();
|
this.connection = new werift.RTCPeerConnection();
|
||||||
this.connection.onnegotiationneeded.subscribe(() => this.renegotiate);
|
this.connection.onnegotiationneeded.subscribe(() => this.renegotiate); // TODO: what
|
||||||
}
|
}
|
||||||
|
|
||||||
async answer(offer) {
|
async answer(offer) {
|
||||||
|
@ -46,37 +56,32 @@ class GatewayServer extends EventEmitter {
|
||||||
this.wss = new websockets.Server({ server: server, path: "/gateway" });
|
this.wss = new websockets.Server({ server: server, path: "/gateway" });
|
||||||
this.sessionCounters = {};
|
this.sessionCounters = {};
|
||||||
|
|
||||||
this.pingInterval = setInterval(() => {
|
this.pingCheckIntervalFunction = setInterval(() => {
|
||||||
this.wss.clients.forEach((client) => {
|
this.wss.clients.forEach((client) => {
|
||||||
if (!client.alive) {
|
if ((new Date() - client.session.lastPing) >= gatewayPingInterval) {
|
||||||
client.terminate();
|
closeConnectionWithCode(client, wsCloseCodes.NO_PING);
|
||||||
}
|
}
|
||||||
client.alive = false;
|
|
||||||
client.ping(() => {});
|
|
||||||
});
|
});
|
||||||
}, pingCheckDelay);
|
}, gatewayPingCheckInterval);
|
||||||
|
|
||||||
this.wss.on("close", () => {
|
this.wss.on("close", () => {
|
||||||
clearInterval(this.pingInterval);
|
clearInterval(this.pingCheckIntervalFunction);
|
||||||
console.log("gateway: websocket server closed");
|
console.log("gateway: websocket server closed");
|
||||||
});
|
});
|
||||||
|
|
||||||
this.wss.on("connection", (ws) => {
|
this.wss.on("connection", (ws) => {
|
||||||
if (!policies.allowGatewayConnection) return ws.close(4007, "Disallowed by policy.");
|
if (!policies.allowGatewayConnection) return closeConnectionWithCode(ws, wsCloseCodes.SERVER_DENIED_CONNECTION);
|
||||||
// Send HELLO message as soon as the client connects
|
ws.send(packet("HELLO", { pingInterval: clientFacingPingInterval }));
|
||||||
ws.send(packet("HELLO", {}));
|
|
||||||
ws.session = {
|
ws.session = {
|
||||||
authenticated: false,
|
authenticated: false,
|
||||||
user: null,
|
user: null,
|
||||||
sessionId: uuid.v4()
|
sessionId: uuid.v4(),
|
||||||
|
lastPing: new Date(),
|
||||||
|
token: null
|
||||||
};
|
};
|
||||||
ws.alive = true;
|
|
||||||
|
|
||||||
ws.on("pong", () => {
|
|
||||||
ws.alive = true;
|
|
||||||
});
|
|
||||||
ws.on("close", async () => {
|
ws.on("close", async () => {
|
||||||
if (ws.session.user) {
|
if (ws.session.user && ws.channels) {
|
||||||
if (this.sessionCounters[ws.session.user._id] <= 1) {
|
if (this.sessionCounters[ws.session.user._id] <= 1) {
|
||||||
this.inChannel(ws.channels[0], (client) => {
|
this.inChannel(ws.channels[0], (client) => {
|
||||||
console.log(client.session);
|
console.log(client.session);
|
||||||
|
@ -95,23 +100,23 @@ class GatewayServer extends EventEmitter {
|
||||||
|
|
||||||
if (ws.rtc && ws.rtc.connection) await ws.rtc.connection.close();
|
if (ws.rtc && ws.rtc.connection) await ws.rtc.connection.close();
|
||||||
});
|
});
|
||||||
ws.on("message", async (data) => {
|
ws.on("message", async (data, isBinary) => {
|
||||||
try {
|
try {
|
||||||
const message = parseMessage(data);
|
if (isBinary) return closeConnectionWithCode(ws, wsCloseCodes.PAYLOAD_ERROR);
|
||||||
|
const message = parseMessage(data.toString());
|
||||||
switch (message.opcodeType) {
|
switch (message.opcodeType) {
|
||||||
case "YOO": {
|
case "YOO": {
|
||||||
// The client has responded to our HELLO with a YOO packet
|
// The client has responded to our HELLO with a YOO packet
|
||||||
try {
|
try {
|
||||||
const user = await checkToken(message.data.token);
|
const user = await checkToken(message.data.token);
|
||||||
if (!user) return ws.close(4006, "Authentication failed.");
|
if (!user) return closeConnectionWithCode(ws, wsCloseCodes.AUTHENTICATION_ERROR);
|
||||||
ws.session.user = user;
|
ws.session.user = user;
|
||||||
ws.session.authenticated = true;
|
ws.session.authenticated = true;
|
||||||
|
ws.session.token = message.data.token;
|
||||||
|
|
||||||
if (!this.sessionCounters[user._id]) this.sessionCounters[user._id] = 0;
|
if (!this.sessionCounters[user._id]) this.sessionCounters[user._id] = 0;
|
||||||
this.sessionCounters[user._id]++;
|
this.sessionCounters[user._id]++;
|
||||||
if (this.sessionCounters[user._id] > 5) {
|
if (this.sessionCounters[user._id] > policies.perUserMaxGatewayConnections) return closeConnectionWithCode(ws, wsCloseCodes.TOO_MANY_SESSIONS);
|
||||||
return ws.close(4006, "Too many sessions.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// The user is now successfully authenticated, send the YOO_ACK packet
|
// The user is now successfully authenticated, send the YOO_ACK packet
|
||||||
// TODO: This is probably not efficient
|
// TODO: This is probably not efficient
|
||||||
|
@ -151,10 +156,20 @@ class GatewayServer extends EventEmitter {
|
||||||
console.log(`gateway: user ${user.username}: handshake complete`);
|
console.log(`gateway: user ${user.username}: handshake complete`);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.log("gateway:", e);
|
console.log("gateway:", e);
|
||||||
return ws.close(4006, "Authentication failed.");
|
return closeConnectionWithCode(ws, wsCloseCodes.AUTHENTICATION_ERROR);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case "ACTION_PING": {
|
||||||
|
if (!this.authMessage(ws)) return;
|
||||||
|
if (message.data !== "1") throw new Error("msg: ACTION_PING payload data should be a `1`");
|
||||||
|
|
||||||
|
const user = await checkToken(ws.session.token);
|
||||||
|
if (!user) return closeConnectionWithCode(ws, wsCloseCodes.AUTHENTICATION_ERROR);
|
||||||
|
|
||||||
|
ws.session.lastPing = new Date();
|
||||||
|
break;
|
||||||
|
}
|
||||||
case "ACTION_CREATE_MESSAGE": {
|
case "ACTION_CREATE_MESSAGE": {
|
||||||
if (!this.authMessage(ws)) return;
|
if (!this.authMessage(ws)) return;
|
||||||
if (typeof message.data.content !== "string" || typeof message.data.channel !== "object" || typeof message.data.channel._id !== "string") throw new Error("msg: invalid fields in json payload");
|
if (typeof message.data.content !== "string" || typeof message.data.channel !== "object" || typeof message.data.channel._id !== "string") throw new Error("msg: invalid fields in json payload");
|
||||||
|
@ -165,7 +180,7 @@ class GatewayServer extends EventEmitter {
|
||||||
if (message.data.channel._id.length !== 24) throw new Error("msg: payload has invalid id"); // MONGODB ONLY!!
|
if (message.data.channel._id.length !== 24) throw new Error("msg: payload has invalid id"); // MONGODB ONLY!!
|
||||||
|
|
||||||
// Check if the user is in that channel before broadcasting the message
|
// Check if the user is in that channel before broadcasting the message
|
||||||
if (!ws.channels.includes(message.data.channel._id)) return ws.close(4008, "Not authorized to perform action.");
|
if (!ws.channels.includes(message.data.channel._id)) return closeConnectionWithCode(ws, wsCloseCodes.NOT_AUTHORIZED);
|
||||||
|
|
||||||
this.broadcast(message.data.channel._id, packet("EVENT_CREATE_MESSAGE", {
|
this.broadcast(message.data.channel._id, packet("EVENT_CREATE_MESSAGE", {
|
||||||
content: messageContent,
|
content: messageContent,
|
||||||
|
@ -241,7 +256,7 @@ class GatewayServer extends EventEmitter {
|
||||||
}
|
}
|
||||||
} catch(e) {
|
} catch(e) {
|
||||||
console.error("gateway:", e);
|
console.error("gateway:", e);
|
||||||
return ws.close(4000, "Error while handling payload.");
|
return closeConnectionWithCode(ws, wsCloseCodes.PAYLOAD_ERROR);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -270,7 +285,7 @@ GatewayServer.prototype.clientRTCReady = function(ws) {
|
||||||
|
|
||||||
GatewayServer.prototype.authMessage = function(ws) {
|
GatewayServer.prototype.authMessage = function(ws) {
|
||||||
if (!this.clientReady(ws)) {
|
if (!this.clientReady(ws)) {
|
||||||
ws.close(4007, "Not authenticated.");
|
closeConnectionWithCode(ws, wsCloseCodes.NOT_AUTHENTICATED);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -6,7 +6,7 @@ const opcodes = {
|
||||||
4: { name: "EVENT_CREATE_MESSAGE", data: "JSON" },
|
4: { name: "EVENT_CREATE_MESSAGE", data: "JSON" },
|
||||||
5: { name: "ACTION_UPDATE_STATUS", data: "JSON" },
|
5: { name: "ACTION_UPDATE_STATUS", data: "JSON" },
|
||||||
6: { name: "EVENT_CHANNEL_MEMBERS", data: "JSON" },
|
6: { name: "EVENT_CHANNEL_MEMBERS", data: "JSON" },
|
||||||
7: { name: "EVENT_CHANNEL_MEMBER_UPDATE", data: "JSON" },
|
7: { name: "ACTION_PING", data: "string" },
|
||||||
21: { name: "ACTION_VOICE_REQUEST_SESSION", data: "JSON" },
|
21: { name: "ACTION_VOICE_REQUEST_SESSION", data: "JSON" },
|
||||||
22: { name: "EVENT_VOICE_ASSIGN_SERVER", data: "JSON" },
|
22: { name: "EVENT_VOICE_ASSIGN_SERVER", data: "JSON" },
|
||||||
23: { name: "ACTION_VOICE_CONNECTION_REQUEST", data: "JSON" },
|
23: { name: "ACTION_VOICE_CONNECTION_REQUEST", data: "JSON" },
|
||||||
|
|
|
@ -1,33 +0,0 @@
|
||||||
// This is "inspired" by rate-limiter-flexible
|
|
||||||
|
|
||||||
class RateLimiter {
|
|
||||||
constructor({ points=5, time=1000, minPoints=0 }) {
|
|
||||||
this.points = points;
|
|
||||||
this.minPoints = minPoints;
|
|
||||||
this.time = time;
|
|
||||||
|
|
||||||
this._flooding = {};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
RateLimiter.prototype.consoom = function(discriminator) {
|
|
||||||
if (!this._flooding[discriminator]) this._flooding[discriminator] = { points: this.points, lastReset: Date.now() };
|
|
||||||
|
|
||||||
if (Math.abs(new Date() - this._flooding[discriminator].lastReset) >= this.time) {
|
|
||||||
this._flooding[discriminator] = { points: this.points, lastReset: Date.now() };
|
|
||||||
}
|
|
||||||
|
|
||||||
this._flooding[discriminator].points--;
|
|
||||||
|
|
||||||
if (this._flooding[discriminator].points <= this.minPoints) {
|
|
||||||
this._flooding[discriminator].flooding = true;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this._flooding[discriminator].flooding === true) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
};
|
|
||||||
|
|
||||||
module.exports = RateLimiter;
|
|
|
@ -19,6 +19,7 @@ module.exports = {
|
||||||
allowAccountCreation: true,
|
allowAccountCreation: true,
|
||||||
allowLogin: true,
|
allowLogin: true,
|
||||||
allowGatewayConnection: true,
|
allowGatewayConnection: true,
|
||||||
|
perUserMaxGatewayConnections: 4,
|
||||||
},
|
},
|
||||||
/*
|
/*
|
||||||
--- Adding a special code requirement for account creation
|
--- Adding a special code requirement for account creation
|
||||||
|
@ -32,6 +33,9 @@ module.exports = {
|
||||||
*/
|
*/
|
||||||
address: "localhost",
|
address: "localhost",
|
||||||
tokenExpiresIn: "8h",
|
tokenExpiresIn: "8h",
|
||||||
|
gatewayPingInterval: 15000,
|
||||||
|
gatewayPingCheckInterval: 4500,
|
||||||
|
clientFacingPingInterval: 14750,
|
||||||
bcryptRounds: 10,
|
bcryptRounds: 10,
|
||||||
roleMap: {
|
roleMap: {
|
||||||
"BANNED": 0,
|
"BANNED": 0,
|
||||||
|
|
|
@ -30,10 +30,10 @@ app.use("/api/v1", apiRoute);
|
||||||
app.use(express.static("app"));
|
app.use(express.static("app"));
|
||||||
|
|
||||||
app.use((err, req, res, next) => {
|
app.use((err, req, res, next) => {
|
||||||
console.error("error: Internal server error", err);
|
console.error("error: server: internal server error", err);
|
||||||
res.status(500).json({ error: true, status: 500, message: "ERR_INTERNAL_SERVER_ERROR" });
|
res.status(500).json({ error: true, status: 500, message: "ERROR_INTERNAL_SERVER_ERROR" });
|
||||||
});
|
});
|
||||||
|
|
||||||
httpServer.listen(config.ports.mainServerPort, () => {
|
httpServer.listen(config.ports.mainServerPort, () => {
|
||||||
console.log(`[*] [server] Main server is listening on port ${config.ports.mainServerPort}`);
|
console.log(`server: listening on port ${config.ports.mainServerPort}`);
|
||||||
});
|
});
|
||||||
|
|
1783
brainlet/package-lock.json
generated
1783
brainlet/package-lock.json
generated
File diff suppressed because it is too large
Load diff
|
@ -18,12 +18,10 @@
|
||||||
"express-rate-limit": "^5.1.3",
|
"express-rate-limit": "^5.1.3",
|
||||||
"express-validator": "^6.6.1",
|
"express-validator": "^6.6.1",
|
||||||
"jsonwebtoken": "^8.5.1",
|
"jsonwebtoken": "^8.5.1",
|
||||||
"mongoose": "^5.10.0",
|
"mongoose": "^6.0.4",
|
||||||
"semver": "^5.7.1",
|
|
||||||
"socket.io": "^3.0.1",
|
|
||||||
"uuid": "^8.3.1",
|
"uuid": "^8.3.1",
|
||||||
"werift": "^0.9.13",
|
"werift": "^0.9.13",
|
||||||
"ws": "^7.4.3"
|
"ws": "^8.2.1"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"eslint": "^7.21.0"
|
"eslint": "^7.21.0"
|
||||||
|
|
Loading…
Reference in a new issue