Add gateway pings #20
4 changed files with 30 additions and 21 deletions
|
@ -3,7 +3,7 @@ const EventEmitter = require("events");
|
||||||
const uuid = require("uuid");
|
const uuid = require("uuid");
|
||||||
const werift = require("werift");
|
const werift = require("werift");
|
||||||
|
|
||||||
const { policies } = require("../../../config");
|
const { policies, gatewayPingInterval, gatewayPingCheckInterval, clientFacingPingInterval } = require("../../../config");
|
||||||
const { experiments } = require("../../../experiments");
|
const { experiments } = require("../../../experiments");
|
||||||
const User = require("../../../models/User");
|
const User = require("../../../models/User");
|
||||||
const Channel = require("../../../models/Channel");
|
const Channel = require("../../../models/Channel");
|
||||||
|
@ -18,9 +18,9 @@ const wsCloseCodes = {
|
||||||
TOO_MANY_SESSIONS: [4005, "Too many sessions"],
|
TOO_MANY_SESSIONS: [4005, "Too many sessions"],
|
||||||
NOT_AUTHORIZED: [4006, "Not authorized"],
|
NOT_AUTHORIZED: [4006, "Not authorized"],
|
||||||
FLOODING: [4007, "Flooding"],
|
FLOODING: [4007, "Flooding"],
|
||||||
|
NO_PING: [4008, "No ping"],
|
||||||
};
|
};
|
||||||
const closeConnectionWithCode = (ws, code) => ws.close(code[0], code[1]);
|
const closeConnectionWithCode = (ws, code) => ws.close(code[0], code[1]);
|
||||||
const pingCheckDelay = 10000;
|
|
||||||
|
|
||||||
class GatewayRTCConnection {
|
class GatewayRTCConnection {
|
||||||
constructor(ws, server) {
|
constructor(ws, server) {
|
||||||
|
@ -56,36 +56,31 @@ class GatewayServer extends EventEmitter {
|
||||||
this.wss = new websockets.Server({ server: server, path: "/gateway" });
|
this.wss = new websockets.Server({ server: server, path: "/gateway" });
|
||||||
this.sessionCounters = {};
|
this.sessionCounters = {};
|
||||||
|
|
||||||
this.pingInterval = setInterval(() => {
|
this.pingCheckIntervalFunction = setInterval(() => {
|
||||||
this.wss.clients.forEach((client) => {
|
this.wss.clients.forEach((client) => {
|
||||||
if (!client.alive) {
|
if ((new Date() - client.session.lastPing) >= gatewayPingInterval) {
|
||||||
client.terminate();
|
closeConnectionWithCode(client, wsCloseCodes.NO_PING);
|
||||||
}
|
}
|
||||||
client.alive = false;
|
|
||||||
client.ping(() => {});
|
|
||||||
});
|
});
|
||||||
}, pingCheckDelay);
|
}, gatewayPingCheckInterval);
|
||||||
|
|
||||||
this.wss.on("close", () => {
|
this.wss.on("close", () => {
|
||||||
clearInterval(this.pingInterval);
|
clearInterval(this.pingCheckIntervalFunction);
|
||||||
console.log("gateway: websocket server closed");
|
console.log("gateway: websocket server closed");
|
||||||
});
|
});
|
||||||
|
|
||||||
this.wss.on("connection", (ws) => {
|
this.wss.on("connection", (ws) => {
|
||||||
if (!policies.allowGatewayConnection) return closeConnectionWithCode(ws, wsCloseCodes.SERVER_DENIED_CONNECTION);
|
if (!policies.allowGatewayConnection) return closeConnectionWithCode(ws, wsCloseCodes.SERVER_DENIED_CONNECTION);
|
||||||
ws.send(packet("HELLO", {}));
|
ws.send(packet("HELLO", { pingInterval: clientFacingPingInterval }));
|
||||||
ws.session = {
|
ws.session = {
|
||||||
authenticated: false,
|
authenticated: false,
|
||||||
user: null,
|
user: null,
|
||||||
sessionId: uuid.v4()
|
sessionId: uuid.v4(),
|
||||||
|
lastPing: new Date(),
|
||||||
|
token: null
|
||||||
};
|
};
|
||||||
ws.alive = true;
|
|
||||||
|
|
||||||
ws.on("pong", () => {
|
ws.on("close", async () => {
|
||||||
ws.alive = true;
|
|
||||||
});
|
|
||||||
ws.on("close", async (code, data) => {
|
|
||||||
console.log(code, data.toString());
|
|
||||||
if (ws.session.user && ws.channels) {
|
if (ws.session.user && ws.channels) {
|
||||||
if (this.sessionCounters[ws.session.user._id] <= 1) {
|
if (this.sessionCounters[ws.session.user._id] <= 1) {
|
||||||
this.inChannel(ws.channels[0], (client) => {
|
this.inChannel(ws.channels[0], (client) => {
|
||||||
|
@ -107,7 +102,7 @@ class GatewayServer extends EventEmitter {
|
||||||
});
|
});
|
||||||
ws.on("message", async (data, isBinary) => {
|
ws.on("message", async (data, isBinary) => {
|
||||||
try {
|
try {
|
||||||
if (isBinary) return closeConnectionWithCode(wsCloseCodes.PAYLOAD_ERROR);
|
if (isBinary) return closeConnectionWithCode(ws, wsCloseCodes.PAYLOAD_ERROR);
|
||||||
const message = parseMessage(data.toString());
|
const message = parseMessage(data.toString());
|
||||||
switch (message.opcodeType) {
|
switch (message.opcodeType) {
|
||||||
case "YOO": {
|
case "YOO": {
|
||||||
|
@ -117,6 +112,7 @@ class GatewayServer extends EventEmitter {
|
||||||
if (!user) return closeConnectionWithCode(ws, wsCloseCodes.AUTHENTICATION_ERROR);
|
if (!user) return closeConnectionWithCode(ws, wsCloseCodes.AUTHENTICATION_ERROR);
|
||||||
ws.session.user = user;
|
ws.session.user = user;
|
||||||
ws.session.authenticated = true;
|
ws.session.authenticated = true;
|
||||||
|
ws.session.token = message.data.token;
|
||||||
|
|
||||||
if (!this.sessionCounters[user._id]) this.sessionCounters[user._id] = 0;
|
if (!this.sessionCounters[user._id]) this.sessionCounters[user._id] = 0;
|
||||||
this.sessionCounters[user._id]++;
|
this.sessionCounters[user._id]++;
|
||||||
|
@ -164,6 +160,16 @@ class GatewayServer extends EventEmitter {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case "ACTION_PING": {
|
||||||
|
if (!this.authMessage(ws)) return;
|
||||||
|
if (message.data !== "1") throw new Error("msg: ACTION_PING payload data should be a `1`");
|
||||||
|
|
||||||
|
const user = await checkToken(ws.session.token);
|
||||||
|
if (!user) return closeConnectionWithCode(ws, wsCloseCodes.AUTHENTICATION_ERROR);
|
||||||
|
|
||||||
|
ws.session.lastPing = new Date();
|
||||||
|
break;
|
||||||
|
}
|
||||||
case "ACTION_CREATE_MESSAGE": {
|
case "ACTION_CREATE_MESSAGE": {
|
||||||
if (!this.authMessage(ws)) return;
|
if (!this.authMessage(ws)) return;
|
||||||
if (typeof message.data.content !== "string" || typeof message.data.channel !== "object" || typeof message.data.channel._id !== "string") throw new Error("msg: invalid fields in json payload");
|
if (typeof message.data.content !== "string" || typeof message.data.channel !== "object" || typeof message.data.channel._id !== "string") throw new Error("msg: invalid fields in json payload");
|
||||||
|
|
|
@ -6,7 +6,7 @@ const opcodes = {
|
||||||
4: { name: "EVENT_CREATE_MESSAGE", data: "JSON" },
|
4: { name: "EVENT_CREATE_MESSAGE", data: "JSON" },
|
||||||
5: { name: "ACTION_UPDATE_STATUS", data: "JSON" },
|
5: { name: "ACTION_UPDATE_STATUS", data: "JSON" },
|
||||||
6: { name: "EVENT_CHANNEL_MEMBERS", data: "JSON" },
|
6: { name: "EVENT_CHANNEL_MEMBERS", data: "JSON" },
|
||||||
7: { name: "EVENT_CHANNEL_MEMBER_UPDATE", data: "JSON" },
|
7: { name: "ACTION_PING", data: "string" },
|
||||||
21: { name: "ACTION_VOICE_REQUEST_SESSION", data: "JSON" },
|
21: { name: "ACTION_VOICE_REQUEST_SESSION", data: "JSON" },
|
||||||
22: { name: "EVENT_VOICE_ASSIGN_SERVER", data: "JSON" },
|
22: { name: "EVENT_VOICE_ASSIGN_SERVER", data: "JSON" },
|
||||||
23: { name: "ACTION_VOICE_CONNECTION_REQUEST", data: "JSON" },
|
23: { name: "ACTION_VOICE_CONNECTION_REQUEST", data: "JSON" },
|
||||||
|
|
|
@ -32,7 +32,10 @@ module.exports = {
|
||||||
},
|
},
|
||||||
*/
|
*/
|
||||||
address: "localhost",
|
address: "localhost",
|
||||||
tokenExpiresIn: "8h",
|
tokenExpiresIn: "2m",
|
||||||
|
gatewayPingInterval: 15000,
|
||||||
|
gatewayPingCheckInterval: 4500,
|
||||||
|
clientFacingPingInterval: 14750,
|
||||||
bcryptRounds: 10,
|
bcryptRounds: 10,
|
||||||
roleMap: {
|
roleMap: {
|
||||||
"BANNED": 0,
|
"BANNED": 0,
|
||||||
|
|
|
@ -31,7 +31,7 @@ app.use(express.static("app"));
|
||||||
|
|
||||||
app.use((err, req, res, next) => {
|
app.use((err, req, res, next) => {
|
||||||
console.error("error: server: internal server error", err);
|
console.error("error: server: internal server error", err);
|
||||||
res.status(500).json({ error: true, status: 500, message: "ERR_INTERNAL_SERVER_ERROR" });
|
res.status(500).json({ error: true, status: 500, message: "ERROR_INTERNAL_SERVER_ERROR" });
|
||||||
});
|
});
|
||||||
|
|
||||||
httpServer.listen(config.ports.mainServerPort, () => {
|
httpServer.listen(config.ports.mainServerPort, () => {
|
||||||
|
|
Reference in a new issue