forked from hippoz/brainlet
Implement basic handshake and message sending capabilities for gatewayv2
This commit is contained in:
parent
4d61db3e5b
commit
8d7b2049c5
8 changed files with 297 additions and 60 deletions
|
@ -127,7 +127,7 @@ app.get("/category/list", authenticateEndpoint(async (req, res) => {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: This is probably not efficient
|
// TODO: This is probably not efficient
|
||||||
const categories = await Category.find().sort({ _id: -1 }).limit(count).select("-posts -__v").populate("creator", User.getPulicFields());
|
const categories = await Category.find().lean().sort({ _id: -1 }).limit(count).select("-posts -__v").populate("creator", User.getPulicFields());
|
||||||
|
|
||||||
res.status(200).json({
|
res.status(200).json({
|
||||||
error: false,
|
error: false,
|
||||||
|
@ -136,4 +136,4 @@ app.get("/category/list", authenticateEndpoint(async (req, res) => {
|
||||||
});
|
});
|
||||||
}, undefined, config.roleMap.USER));
|
}, undefined, config.roleMap.USER));
|
||||||
|
|
||||||
module.exports = app;
|
module.exports = app;
|
||||||
|
|
|
@ -1,15 +1,132 @@
|
||||||
const websockets = require("ws");
|
const websockets = require("ws");
|
||||||
|
const EventEmitter = require("events");
|
||||||
|
const uuid = require("uuid");
|
||||||
|
|
||||||
class GatewayServer {
|
const User = require("../../../models/User");
|
||||||
|
const Category = require("../../../models/Category");
|
||||||
|
const { parseMessage, opcodeSeparator, getOpcodeByName } = require("./messageparser");
|
||||||
|
const { checkToken } = require("../../../common/auth/authfunctions");
|
||||||
|
|
||||||
|
const pingCheckDelay = 10000;
|
||||||
|
|
||||||
|
class GatewayServer extends EventEmitter {
|
||||||
constructor({ server }) {
|
constructor({ server }) {
|
||||||
this.wss = new websockets.Server({ server });
|
super();
|
||||||
|
this.wss = new websockets.Server({ server: server, path: "/gateway" });
|
||||||
|
|
||||||
|
this.pingInterval = setInterval(() => {
|
||||||
|
this.wss.clients.forEach((client) => {
|
||||||
|
if (!client.alive) {
|
||||||
|
console.log("gateway: terminating client due to ping timeout");
|
||||||
|
client.terminate();
|
||||||
|
}
|
||||||
|
client.alive = false;
|
||||||
|
client.ping(() => {});
|
||||||
|
});
|
||||||
|
}, pingCheckDelay);
|
||||||
|
|
||||||
|
this.wss.on("close", () => {
|
||||||
|
clearInterval(this.pingInterval);
|
||||||
|
console.log("gateway: websocket server closed");
|
||||||
|
});
|
||||||
|
|
||||||
this.wss.on("connection", (ws) => {
|
this.wss.on("connection", (ws) => {
|
||||||
ws.on("message", (data) => {
|
// Send HELLO message as soon as the client connects
|
||||||
|
ws.send(this.packet("HELLO", {}));
|
||||||
|
ws.session = {
|
||||||
|
authenticated: false,
|
||||||
|
user: null,
|
||||||
|
sessionId: uuid.v4()
|
||||||
|
};
|
||||||
|
ws.alive = true;
|
||||||
|
|
||||||
|
ws.on("pong", () => {
|
||||||
|
ws.alive = true;
|
||||||
|
});
|
||||||
|
ws.on("message", async (data) => {
|
||||||
|
try {
|
||||||
|
const message = parseMessage(data);
|
||||||
|
switch (message.opcodeType) {
|
||||||
|
case "YOO": {
|
||||||
|
// The client has responded to our HELLO with a YOO packet
|
||||||
|
try {
|
||||||
|
const user = await checkToken(message.data.token);
|
||||||
|
if (!user) return ws.close(4006, "Authentication failed.");
|
||||||
|
ws.session.user = user;
|
||||||
|
ws.session.authenticated = true;
|
||||||
|
|
||||||
|
// The user is now successfully authenticated, send the YOO_ACK packet
|
||||||
|
// TODO: This is probably not efficient
|
||||||
|
|
||||||
|
let channels = await Category.find().lean().sort({ _id: -1 }).limit(50).select("-posts -__v").populate("creator", User.getPulicFields(true));
|
||||||
|
if (!channels) channels = [];
|
||||||
|
channels = channels.map(x => ({ ...x, _id: x._id.toString() }));
|
||||||
|
|
||||||
|
ws.channels = channels.map(x => x._id);
|
||||||
|
|
||||||
|
ws.send(this.packet("YOO_ACK", { session_id: ws.session.sessionId, channels, user: { username: user.username, _id: user._id } }));
|
||||||
|
console.log(`gateway: user ${user.username}: handshake complete`);
|
||||||
|
} catch (e) {
|
||||||
|
console.log("gateway:", e);
|
||||||
|
return ws.close(4006, "Authentication failed.");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "ACTION_CREATE_MESSAGE": {
|
||||||
|
if (!this.authMessage(ws)) return;
|
||||||
|
if (typeof message.data.content !== "string" || typeof message.data.channel !== "object" || typeof message.data.channel._id !== "string") throw new Error("msg: invalid fields in json payload");
|
||||||
|
|
||||||
|
const messageContent = message.data.content.trim();
|
||||||
|
if (messageContent.length > 2000) return;
|
||||||
|
if (message.data.channel._id.length !== 24) throw new Error("msg: payload has invalid id"); // MONGODB ONLY!!
|
||||||
|
|
||||||
|
// Check if the user is in that channel before broadcasting the message
|
||||||
|
if (!ws.channels.includes(message.data.channel._id)) return ws.close(4008, "Not authorized to perform action.");
|
||||||
|
|
||||||
|
this.broadcast(message.data.channel._id, this.packet("EVENT_CREATE_MESSAGE", {
|
||||||
|
content: messageContent,
|
||||||
|
channel: {
|
||||||
|
_id: message.data.channel._id
|
||||||
|
},
|
||||||
|
author: {
|
||||||
|
_id: ws.session.user._id,
|
||||||
|
username: ws.session.user.username
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch(e) {
|
||||||
|
console.error("gateway:", e);
|
||||||
|
return ws.close(4000, "Error while handling payload.");
|
||||||
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = GatewayServer;
|
GatewayServer.prototype.broadcast = function(channelId, data) {
|
||||||
|
this.wss.clients.forEach((client) => {
|
||||||
|
if (this.clientReady(client) && client.channels.includes(channelId)) client.send(data);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
GatewayServer.prototype.clientReady = function(ws) {
|
||||||
|
return ws.readyState === WebSocket.OPEN && ws.session && ws.session.authenticated;
|
||||||
|
};
|
||||||
|
|
||||||
|
GatewayServer.prototype.authMessage = function(ws) {
|
||||||
|
if (!this.clientReady(ws)) {
|
||||||
|
ws.close(4007, "Not authenticated.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
GatewayServer.prototype.packet = function(op, data) {
|
||||||
|
if (typeof op === "string") op = getOpcodeByName(op);
|
||||||
|
return `${op}${opcodeSeparator}${JSON.stringify(data)}`;
|
||||||
|
};
|
||||||
|
|
||||||
|
module.exports = GatewayServer;
|
||||||
|
|
|
@ -1,18 +1,42 @@
|
||||||
const config = require("../../../config");
|
|
||||||
|
|
||||||
const opcodes = {
|
const opcodes = {
|
||||||
0: "HELLO"
|
0: { name: "HELLO", data: "JSON" },
|
||||||
|
1: { name: "YOO", data: "JSON" },
|
||||||
|
2: { name: "YOO_ACK", data: "JSON" },
|
||||||
|
3: { name: "ACTION_CREATE_MESSAGE", data: "JSON" },
|
||||||
|
4: { name: "EVENT_CREATE_MESSAGE", data: "JSON" }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const opcodeSeparator = "@";
|
||||||
|
|
||||||
const parseMessage = (message) => {
|
const parseMessage = (message) => {
|
||||||
if (typeof message !== "string") throw new Error("GatewayMessageParser: Message is not a string");
|
if (typeof message !== "string") throw new Error("msg: message not a string");
|
||||||
if (message.length < 1) throw new Error("GatewayMessageParser: Message has less than 1 character");
|
const stringParts = message.split(opcodeSeparator);
|
||||||
if (message.length > config.gatewayMaxStringPayloadLength) throw new Error(`GatewayMessageParser: Message has more than ${config.gatewayMaxStringPayloadLength} characters`);
|
if (stringParts < 2) throw new Error("msg: message does not split into more than 2 parts");
|
||||||
const op = parseInt(message[0]);
|
const components = [ stringParts.shift(), stringParts.join(opcodeSeparator) ];
|
||||||
if (!op || isNaN(op)) throw new Error("GatewayMessageParser: Message has invalid opcode");
|
const op = parseInt(components[0]);
|
||||||
const opcodeName = opcodes[op];
|
if (isNaN(op)) throw new Error(`msg: message does not contain valid opcode: ${op}`);
|
||||||
if (!opcodeName) throw new Error("GatewayMessageParser: Message has unknown opcode");
|
|
||||||
|
|
||||||
|
const opcodeData = opcodes[op];
|
||||||
|
let data = components[1];
|
||||||
|
if (!opcodeData) throw new Error(`msg: message contains unknown opcode ${op}`);
|
||||||
|
if (opcodeData.data === "JSON") {
|
||||||
|
data = JSON.parse(data);
|
||||||
|
} else if (opcodeData.data === "string") {
|
||||||
|
data = data.toString(); // NOTE: This isnt needed lol
|
||||||
|
} else {
|
||||||
|
throw new Error(`msg: invalid data type on opcode ${op}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
opcode: op,
|
||||||
|
data: data,
|
||||||
|
dataType: opcodeData.data,
|
||||||
|
opcodeType: opcodeData.name || null
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports = { parseMessage };
|
const getOpcodeByName = (name) => {
|
||||||
|
for (const [key, value] of Object.entries(opcodes)) if (value.name === name) return key;
|
||||||
|
};
|
||||||
|
|
||||||
|
module.exports = { opcodes, parseMessage, opcodeSeparator, getOpcodeByName };
|
||||||
|
|
106
brainlet/app/gatewaytest/index.html
Normal file
106
brainlet/app/gatewaytest/index.html
Normal file
|
@ -0,0 +1,106 @@
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<script>
|
||||||
|
const opcodes = {
|
||||||
|
0: { name: "HELLO", data: "JSON" },
|
||||||
|
1: { name: "YOO", data: "JSON" },
|
||||||
|
2: { name: "YOO_ACK", data: "JSON" },
|
||||||
|
3: { name: "ACTION_CREATE_MESSAGE", data: "JSON" },
|
||||||
|
4: { name: "EVENT_CREATE_MESSAGE", data: "JSON" }
|
||||||
|
};
|
||||||
|
|
||||||
|
const opcodeSeparator = "@";
|
||||||
|
|
||||||
|
const parseMessage = (message) => {
|
||||||
|
if (typeof message !== "string") throw new Error("msg: message not a string");
|
||||||
|
const stringParts = message.split(opcodeSeparator);
|
||||||
|
if (stringParts < 2) throw new Error("msg: message does not split into more than 2 parts");
|
||||||
|
const components = [ stringParts.shift(), stringParts.join(opcodeSeparator) ];
|
||||||
|
const op = parseInt(components[0]);
|
||||||
|
if (isNaN(op)) throw new Error(`msg: message does not contain valid opcode: ${op}`);
|
||||||
|
|
||||||
|
const opcodeData = opcodes[op];
|
||||||
|
let data = components[1];
|
||||||
|
if (!opcodeData) throw new Error(`msg: message contains unknown opcode ${op}`);
|
||||||
|
if (opcodeData.data === "JSON") {
|
||||||
|
data = JSON.parse(data);
|
||||||
|
} else if (opcodeData.data === "string") {
|
||||||
|
data = data.toString(); // NOTE: This isnt needed lol
|
||||||
|
} else {
|
||||||
|
throw new Error(`msg: invalid data type on opcode ${op}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
opcode: op,
|
||||||
|
data: data,
|
||||||
|
dataType: opcodeData.data,
|
||||||
|
opcodeType: opcodeData.name || null
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
const getOpcodeByName = (name) => {
|
||||||
|
for (const [key, value] of Object.entries(opcodes)) if (value.name === name) return key;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
class GatewayConnection {
|
||||||
|
constructor(token) {
|
||||||
|
this.ws = new WebSocket("ws://localhost:3005/gateway?v=2");
|
||||||
|
|
||||||
|
this.handshakeCompleted = false;
|
||||||
|
|
||||||
|
this.ws.onopen = () => console.log("gateway: open");
|
||||||
|
this.ws.onclose = (e) => {
|
||||||
|
this.handshakeCompleted = false;
|
||||||
|
console.log(`gateway: close: ${e.code}:${e.reason}`);
|
||||||
|
}
|
||||||
|
this.ws.onmessage = (message) => {
|
||||||
|
try {
|
||||||
|
const packet = parseMessage(message.data);
|
||||||
|
if (!packet) return console.error("gateway: invalid packet from server");
|
||||||
|
|
||||||
|
switch (packet.opcodeType) {
|
||||||
|
case "HELLO": {
|
||||||
|
// Got HELLO from server, send YOO as soon as possible
|
||||||
|
console.log("gateway: got HELLO", packet.data);
|
||||||
|
console.log("gateway: sending YOO");
|
||||||
|
this.ws.send(this.packet("YOO", { token }));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "YOO_ACK": {
|
||||||
|
// Server accepted connection
|
||||||
|
console.log("gateway: got YOO_ACK", packet.data);
|
||||||
|
this.handshakeCompleted = true;
|
||||||
|
console.log("gateway: handshake complete");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "EVENT_CREATE_MESSAGE": {
|
||||||
|
// New message
|
||||||
|
console.log("gateway: got new message", packet.data);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
console.log("gateway: got unknown packet", message.data);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch(e) {
|
||||||
|
return console.error("gateway:", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
GatewayConnection.prototype.packet = function(op, data) {
|
||||||
|
if (typeof op === "string") op = getOpcodeByName(op);
|
||||||
|
return `${op}${opcodeSeparator}${JSON.stringify(data)}`;
|
||||||
|
};
|
||||||
|
|
||||||
|
const connection = new GatewayConnection(localStorage.getItem("token"));
|
||||||
|
</script>
|
||||||
|
</head>
|
||||||
|
|
||||||
|
<body>
|
||||||
|
|
||||||
|
</body>
|
||||||
|
</html>
|
|
@ -15,50 +15,35 @@ const redirect = (res, status=401, url=undefined) => {
|
||||||
res.redirect(url);
|
res.redirect(url);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const checkToken = (token, minPermissionLevel=config.roleMap.RESTRICTED) => {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
if (!token) reject("no token provided");
|
||||||
|
jwt.verify(token, secret.jwtPrivateKey, {}, async (err, data) => {
|
||||||
|
if (err) return reject(err);
|
||||||
|
if (!data || !data.username) return reject("invalid token");
|
||||||
|
const user = await User.findByUsername(data.username);
|
||||||
|
if (!user) return reject("user does not exist");
|
||||||
|
|
||||||
|
let permissionLevel = config.roleMap[user.role];
|
||||||
|
if (!permissionLevel) permissionLevel = 0;
|
||||||
|
|
||||||
|
if (permissionLevel < minPermissionLevel) reject("user does not have the required permission level");
|
||||||
|
resolve(user);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
function authenticateEndpoint(callback, url=undefined, minPermissionLevel=config.roleMap.RESTRICTED) {
|
function authenticateEndpoint(callback, url=undefined, minPermissionLevel=config.roleMap.RESTRICTED) {
|
||||||
return (req, res) => {
|
return (req, res) => {
|
||||||
const token = req.cookies.token;
|
const token = req.cookies.token;
|
||||||
if (!token) {
|
if (!token) return redirect(res, 403, url);
|
||||||
redirect(res, 403, url);
|
checkToken(token, minPermissionLevel).then((user) => {
|
||||||
return;
|
if (!user) return redirect(res, 403, url);
|
||||||
}
|
|
||||||
|
|
||||||
jwt.verify(token, secret.jwtPrivateKey, {}, async (err, data) => {
|
|
||||||
if (err) {
|
|
||||||
redirect(res, 401, url);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!data) {
|
|
||||||
redirect(res, 401, url);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!data.username) {
|
|
||||||
redirect(res, 401, url);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const user = await User.findByUsername(data.username);
|
|
||||||
|
|
||||||
if (!user) {
|
|
||||||
redirect(res, 401, url);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let permissionLevel = config.roleMap[user.role];
|
|
||||||
if (!permissionLevel) {
|
|
||||||
permissionLevel = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (permissionLevel < minPermissionLevel) {
|
|
||||||
redirect(res, 401, url);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
callback(req, res, user);
|
callback(req, res, user);
|
||||||
|
}).catch(() => {
|
||||||
|
return redirect(res, 403, url);
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = { authenticateEndpoint };
|
module.exports = { authenticateEndpoint, checkToken };
|
|
@ -8,7 +8,7 @@ module.exports = {
|
||||||
// specialCode: ''
|
// specialCode: ''
|
||||||
// }
|
// }
|
||||||
//},
|
//},
|
||||||
mongoUrl: 'mongodb://localhost:27017/app',
|
mongoUrl: 'mongodb://192.168.0.105:27017/app',
|
||||||
bcryptRounds: 10,
|
bcryptRounds: 10,
|
||||||
roleMap: {
|
roleMap: {
|
||||||
'BANNED': 0,
|
'BANNED': 0,
|
||||||
|
|
|
@ -8,11 +8,13 @@ const http = require("http");
|
||||||
|
|
||||||
const { authenticateEndpoint } = require("./common/auth/authfunctions");
|
const { authenticateEndpoint } = require("./common/auth/authfunctions");
|
||||||
const GatewayServer = require("./api/v1/gateway/index");
|
const GatewayServer = require("./api/v1/gateway/index");
|
||||||
|
const GatewayServerV2 = require("./api/v2/gateway/index");
|
||||||
|
|
||||||
const app = express();
|
const app = express();
|
||||||
const httpServer = http.createServer(app);
|
const httpServer = http.createServer(app);
|
||||||
|
|
||||||
const gateway = new GatewayServer(httpServer);
|
const gateway = new GatewayServer(httpServer);
|
||||||
|
const gatewayv2 = new GatewayServerV2({ server: httpServer });
|
||||||
|
|
||||||
app.use(express.urlencoded({ extended: false }));
|
app.use(express.urlencoded({ extended: false }));
|
||||||
app.use(express.json());
|
app.use(express.json());
|
||||||
|
@ -51,4 +53,4 @@ const onServerClosing = () => {
|
||||||
|
|
||||||
httpServer.listen(config.ports.mainServerPort, () => {
|
httpServer.listen(config.ports.mainServerPort, () => {
|
||||||
console.log(`[*] [server] Main server is listening on port ${config.ports.mainServerPort}`);
|
console.log(`[*] [server] Main server is listening on port ${config.ports.mainServerPort}`);
|
||||||
});
|
});
|
||||||
|
|
|
@ -67,7 +67,10 @@ User.findByUsername = async function(username) {
|
||||||
return await User.findOne({ username }).exec();
|
return await User.findOne({ username }).exec();
|
||||||
};
|
};
|
||||||
|
|
||||||
User.getPulicFields = function() {
|
User.getPulicFields = function(isPartial=false) {
|
||||||
|
if (isPartial) {
|
||||||
|
return "username _id color";
|
||||||
|
}
|
||||||
return "username role _id color";
|
return "username role _id color";
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue