Add GatewayServer and GatewayClient(frontend)
This commit adds a websocket server that clients can connect and authenticate to. Once they're authenticated, they will start to receive relevant events. One issue is that the server does not ping for dead connections yet and the fact that new listeners for the guild are added for each connection. There is also the bug in WatchedGuild that prevents other bridge users from seeing eachother's messages.
This commit is contained in:
parent
d92248543a
commit
e33f6f7cfd
6 changed files with 327 additions and 18 deletions
192
GatewayServer.js
Normal file
192
GatewayServer.js
Normal file
|
@ -0,0 +1,192 @@
|
||||||
|
import { WebSocketServer } from "ws";
|
||||||
|
import { guildMap } from "./common.js";
|
||||||
|
import { decodeToken } from "./tokens.js";
|
||||||
|
|
||||||
|
const messageSchema = { t: "number", d: "object" };
|
||||||
|
const authenticationTimeoutMs = 15000;
|
||||||
|
const messageTypes = {
|
||||||
|
HELLO: 0,
|
||||||
|
YOO: 1,
|
||||||
|
READY: 2,
|
||||||
|
EVENT: 3
|
||||||
|
};
|
||||||
|
|
||||||
|
class GatewayServer {
|
||||||
|
constructor(server, extraWebsocketServerConfig={}) {
|
||||||
|
this.wss = new WebSocketServer({
|
||||||
|
server,
|
||||||
|
...extraWebsocketServerConfig
|
||||||
|
});
|
||||||
|
|
||||||
|
this.wss.on("connection", (ws) => {
|
||||||
|
this.onConnection(ws);
|
||||||
|
ws.on("message", (data, isBinary) => {
|
||||||
|
this.onMessage(ws, data, isBinary);
|
||||||
|
});
|
||||||
|
ws.on("close", (code, reason) => {
|
||||||
|
this.onDisconnect(code, reason.toString());
|
||||||
|
})
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_clientDispatch(ws, e) {
|
||||||
|
switch (e.type) {
|
||||||
|
case "INIT": {
|
||||||
|
ws.state = ws.state || {
|
||||||
|
authenticated: false,
|
||||||
|
user: null,
|
||||||
|
token: null,
|
||||||
|
alive: true,
|
||||||
|
handlers: new Map()
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "AUTHENTICATE_AS": {
|
||||||
|
ws.state.user = e.user;
|
||||||
|
ws.state.authenticated = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "SET_ALIVE": {
|
||||||
|
ws.state.alive = e.alive;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "SET_TOKEN": {
|
||||||
|
ws.state.token = e.token;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "ADD_HANDLER": {
|
||||||
|
ws.state.handlers.set(e.guildId, e.handler);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_checkMessageSchema(message) {
|
||||||
|
for (const [key, value] of Object.entries(message)) {
|
||||||
|
if (!messageSchema[key])
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (typeof value !== messageSchema[key])
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
onConnection(ws) {
|
||||||
|
this._clientDispatch(ws, { type: "INIT" });
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
if (!ws.state.authenticated) {
|
||||||
|
ws.close(4001, "Authentication timeout.");
|
||||||
|
}
|
||||||
|
}, authenticationTimeoutMs);
|
||||||
|
|
||||||
|
ws.send(JSON.stringify({
|
||||||
|
t: messageTypes.HELLO,
|
||||||
|
d: null
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
async onMessage(ws, message, isBinary) {
|
||||||
|
if (isBinary)
|
||||||
|
return ws.close(4000, "Binary payload not supported.");
|
||||||
|
|
||||||
|
try {
|
||||||
|
message = JSON.parse(message.toString());
|
||||||
|
} catch (e) {
|
||||||
|
console.error("GatewayServer: payload decode error", e);
|
||||||
|
return ws.close(4000, "Payload error.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this._checkMessageSchema(message))
|
||||||
|
return ws.close(4000, "JSON payload does not match schema.");
|
||||||
|
|
||||||
|
switch (message.t) {
|
||||||
|
case messageTypes.YOO: {
|
||||||
|
if (message.d.token) {
|
||||||
|
let user;
|
||||||
|
try {
|
||||||
|
user = await decodeToken(message.d.token);
|
||||||
|
} catch(e) {
|
||||||
|
console.error(e);
|
||||||
|
ws.close(4001, "Bad token.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (user && user.username) {
|
||||||
|
if (!user.guildAccess || user.guildAccess.length < 1) {
|
||||||
|
ws.close(4002, "No possible events: no guilds.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
this._clientDispatch(ws, {
|
||||||
|
type: "SET_TOKEN",
|
||||||
|
token: message.token
|
||||||
|
});
|
||||||
|
this._clientDispatch(ws, {
|
||||||
|
type: "AUTHENTICATE_AS",
|
||||||
|
user
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO: it might actually be more efficient to have a single listener
|
||||||
|
// for each guild and broadcast the relevant events that way
|
||||||
|
user.guildAccess.forEach((guildId) => {
|
||||||
|
const guild = guildMap.get(guildId);
|
||||||
|
if (!guild) {
|
||||||
|
ws.close(4003, "User is in a guild that does not exist.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const handle = (ev) => {
|
||||||
|
ws.send(JSON.stringify({
|
||||||
|
t: messageTypes.EVENT,
|
||||||
|
d: ev
|
||||||
|
}));
|
||||||
|
};
|
||||||
|
guild.on("pushed", handle);
|
||||||
|
this._clientDispatch(ws, {
|
||||||
|
type: "ADD_HANDLER",
|
||||||
|
handler: handle,
|
||||||
|
guildId
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.send(JSON.stringify({
|
||||||
|
t: messageTypes.READY,
|
||||||
|
d: {
|
||||||
|
user: {
|
||||||
|
username: user.username,
|
||||||
|
guildAccess: user.guildAccess,
|
||||||
|
discordID: user.discordID,
|
||||||
|
avatarURL: user.avatarURL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
ws.close(4001, "Bad token.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ws.close(4001, "No token.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
default: {
|
||||||
|
return ws.close(4000, "Invalid payload type.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
onDisconnect(ws) {
|
||||||
|
if (ws.state && ws.state.handlers && ws.state.handlers.length > 0) {
|
||||||
|
for (const [guildId, handler] of ws.state.handlers.entries()) {
|
||||||
|
const guild = guildMap.get(guildId);
|
||||||
|
if (guild) {
|
||||||
|
guild.removeListener("pushed", handler);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export default GatewayServer;
|
|
@ -59,6 +59,7 @@ class WatchedGuild extends EventEmitter {
|
||||||
if (message.guild_id !== this.upstreamGuildId)
|
if (message.guild_id !== this.upstreamGuildId)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
// TODOOOOOO: bridge user's wont get message events from other bridge users in the same channel
|
||||||
const maybeKnownWebhook = this.knownWebhooks.get(message.channel_id);
|
const maybeKnownWebhook = this.knownWebhooks.get(message.channel_id);
|
||||||
if (maybeKnownWebhook && maybeKnownWebhook.id === message.webhook_id)
|
if (maybeKnownWebhook && maybeKnownWebhook.id === message.webhook_id)
|
||||||
return; // ignore messages coming from our webhook
|
return; // ignore messages coming from our webhook
|
||||||
|
|
93
frontend/src/api/GatewayClient.js
Normal file
93
frontend/src/api/GatewayClient.js
Normal file
|
@ -0,0 +1,93 @@
|
||||||
|
const messageSchema = { t: "number", d: "object" };
|
||||||
|
const messageTypes = {
|
||||||
|
HELLO: 0,
|
||||||
|
YOO: 1,
|
||||||
|
READY: 2,
|
||||||
|
EVENT: 3
|
||||||
|
};
|
||||||
|
|
||||||
|
export default class GatewayClient {
|
||||||
|
constructor(gatewayPath) {
|
||||||
|
this.gatewayPath = gatewayPath;
|
||||||
|
this.ws = null;
|
||||||
|
this.token = null;
|
||||||
|
this.user = null;
|
||||||
|
this.onEvent = (e) => {};
|
||||||
|
}
|
||||||
|
|
||||||
|
connect(token) {
|
||||||
|
if (!token)
|
||||||
|
token = this.token;
|
||||||
|
|
||||||
|
console.log("gateway: connecting");
|
||||||
|
|
||||||
|
this.ws = new WebSocket(this.gatewayPath);
|
||||||
|
|
||||||
|
this.ws.addEventListener("message", ({ data }) => {
|
||||||
|
if (typeof data !== "string") {
|
||||||
|
console.warn("gateway: got non-string data from server, ignoring...");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let message;
|
||||||
|
try {
|
||||||
|
message = JSON.parse(data);
|
||||||
|
} catch(e) {
|
||||||
|
console.warn("gateway: got invalid JSON from server (failed to parse), ignoring...");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this._checkMessageSchema(message)) {
|
||||||
|
console.warn("gateway: got invalid JSON from server (does not match schema), ignoring...");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (message.t) {
|
||||||
|
case messageTypes.HELLO: {
|
||||||
|
console.log("gateway: HELLO");
|
||||||
|
this.ws.send(JSON.stringify({
|
||||||
|
t: messageTypes.YOO,
|
||||||
|
d: {
|
||||||
|
token
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case messageTypes.READY: {
|
||||||
|
console.log("gateway: READY");
|
||||||
|
this.user = message.d.user;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case messageTypes.EVENT: {
|
||||||
|
this.onEvent(message.d);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
console.warn("gateway: got invalid JSON from server (invalid type), ignoring...");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
this.ws.addEventListener("open", () => {
|
||||||
|
console.log("gateway: open");
|
||||||
|
});
|
||||||
|
this.ws.addEventListener("close", () => {
|
||||||
|
console.log("gateway: closed");
|
||||||
|
setTimeout(() => {
|
||||||
|
console.log("gateway: reconnecting");
|
||||||
|
this.connect();
|
||||||
|
}, 4000);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_checkMessageSchema(message) {
|
||||||
|
for (const [key, value] of Object.entries(message)) {
|
||||||
|
if (!messageSchema[key])
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (typeof value !== messageSchema[key])
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,8 +1,10 @@
|
||||||
<script>
|
<script>
|
||||||
import { apiClient } from "../api/common";
|
import { apiClient } from "../api/common";
|
||||||
|
import GatewayClient from "../api/GatewayClient";
|
||||||
|
import { supportsWebsockets } from "../util/browser";
|
||||||
import ChatView from "./ChatView.svelte";
|
import ChatView from "./ChatView.svelte";
|
||||||
import FuzzyView from "./FuzzyView.svelte";
|
import FuzzyView from "./FuzzyView.svelte";
|
||||||
import MessageDisplay from "./MessageDisplay.svelte";
|
import MessageDisplay from "./MessageDisplay.svelte";
|
||||||
|
|
||||||
let messageStore = {};
|
let messageStore = {};
|
||||||
let selectedGuild = null;
|
let selectedGuild = null;
|
||||||
|
@ -11,6 +13,21 @@ import MessageDisplay from "./MessageDisplay.svelte";
|
||||||
let user = null;
|
let user = null;
|
||||||
let view = { type: "CHAT" };
|
let view = { type: "CHAT" };
|
||||||
|
|
||||||
|
function handleEvent(event) {
|
||||||
|
if (event.eventType === "MESSAGE_CREATE") {
|
||||||
|
const guildId = event.message.guild_id;
|
||||||
|
const channelId = event.message.channel_id;
|
||||||
|
|
||||||
|
if (!messageStore[guildId]) {
|
||||||
|
messageStore[guildId] = {};
|
||||||
|
}
|
||||||
|
if (!messageStore[guildId][channelId]) {
|
||||||
|
messageStore[guildId][channelId] = [];
|
||||||
|
}
|
||||||
|
messageStore[guildId][channelId] = [...messageStore[guildId][channelId], event.message];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
apiClient.getRequest("/users/@self", false)
|
apiClient.getRequest("/users/@self", false)
|
||||||
.then((res) => {
|
.then((res) => {
|
||||||
let userErrorMessage = "";
|
let userErrorMessage = "";
|
||||||
|
@ -52,21 +69,18 @@ import MessageDisplay from "./MessageDisplay.svelte";
|
||||||
selectedChannel = guilds[0].channels[0];
|
selectedChannel = guilds[0].channels[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
const { poll } = apiClient.createPollingListener(null, ({ event }) => {
|
if (supportsWebsockets()) {
|
||||||
if (event.eventType === "MESSAGE_CREATE") {
|
console.log("App: browser supports WebSocket, using gateway");
|
||||||
const guildId = event.message.guild_id;
|
const gatewayConnection = new GatewayClient(`${ location.protocol === "https:" ? "wss://" : "ws://" }${location.host}/gateway`);
|
||||||
const channelId = event.message.channel_id;
|
gatewayConnection.onEvent = handleEvent;
|
||||||
|
gatewayConnection.connect(apiClient.token);
|
||||||
if (!messageStore[guildId]) {
|
} else {
|
||||||
messageStore[guildId] = {};
|
console.warn("App: browser does not WebSocket, using polling");
|
||||||
}
|
const { poll } = apiClient.createPollingListener(null, ({ event }) => {
|
||||||
if (!messageStore[guildId][channelId]) {
|
handleEvent(event);
|
||||||
messageStore[guildId][channelId] = [];
|
});
|
||||||
}
|
poll();
|
||||||
messageStore[guildId][channelId] = [...messageStore[guildId][channelId], event.message];
|
}
|
||||||
}
|
|
||||||
});
|
|
||||||
poll();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
|
3
frontend/src/util/browser.js
Normal file
3
frontend/src/util/browser.js
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
export function supportsWebsockets() {
|
||||||
|
return !!window.WebSocket;
|
||||||
|
};
|
8
index.js
8
index.js
|
@ -1,7 +1,9 @@
|
||||||
|
import http from "node:http";
|
||||||
import express from "express";
|
import express from "express";
|
||||||
import apiRoute from "./routes/api.js";
|
import apiRoute from "./routes/api.js";
|
||||||
import { mainHttpListenPort } from "./config.js";
|
import { mainHttpListenPort } from "./config.js";
|
||||||
import { bot } from "./common.js";
|
import { bot } from "./common.js";
|
||||||
|
import GatewayServer from "./GatewayServer.js";
|
||||||
|
|
||||||
// might introduce bugs and probably a bad idea
|
// might introduce bugs and probably a bad idea
|
||||||
Object.freeze(Object.prototype);
|
Object.freeze(Object.prototype);
|
||||||
|
@ -9,12 +11,16 @@ Object.freeze(Object);
|
||||||
|
|
||||||
|
|
||||||
const app = express();
|
const app = express();
|
||||||
|
const httpServer = http.createServer(app);
|
||||||
|
const gatewayServer = new GatewayServer(httpServer, {
|
||||||
|
path: "/gateway"
|
||||||
|
});
|
||||||
|
|
||||||
app.use(express.json());
|
app.use(express.json());
|
||||||
app.use("/", express.static("frontend/public/"));
|
app.use("/", express.static("frontend/public/"));
|
||||||
app.use("/api/v1", apiRoute);
|
app.use("/api/v1", apiRoute);
|
||||||
|
|
||||||
app.listen(mainHttpListenPort, () => {
|
httpServer.listen(mainHttpListenPort, () => {
|
||||||
console.log(`server main: listen on ${mainHttpListenPort}`);
|
console.log(`server main: listen on ${mainHttpListenPort}`);
|
||||||
bot.connect();
|
bot.connect();
|
||||||
});
|
});
|
Loading…
Reference in a new issue