Compare commits

...

9 commits

7 changed files with 97 additions and 17 deletions

View file

@ -21,16 +21,28 @@ export default {
heartbeatInterval: null, heartbeatInterval: null,
user: null, user: null,
channels: null, channels: null,
reconnectDelay: 400,
reconnectTimeout: null,
init() { init() {
const token = getAuthToken();
if (!token) {
return false;
}
this.ws = new WebSocket(getItem("gatewayBase")); this.ws = new WebSocket(getItem("gatewayBase"));
this.ws.onmessage = (message) => { this.ws.onopen = () => {
const payload = JSON.parse(message); if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}
console.log("[gateway] open");
};
this.ws.onmessage = (event) => {
const payload = JSON.parse(event.data);
switch (payload.t) { switch (payload.t) {
case GatewayPayloadType.Hello: { case GatewayPayloadType.Hello: {
this.send({ this.send({
t: GatewayPayloadType.Authenticate, t: GatewayPayloadType.Authenticate,
d: getAuthToken() d: token
}); });
this.heartbeatInterval = setInterval(() => { this.heartbeatInterval = setInterval(() => {
@ -39,23 +51,39 @@ export default {
d: 0 d: 0
}); });
}, payload.d.pingInterval); }, payload.d.pingInterval);
console.log("[gateway] hello");
break; break;
} }
case GatewayPayloadType.Ready: { case GatewayPayloadType.Ready: {
this.user = payload.d.user; this.user = payload.d.user;
this.channels = payload.d.channels; this.channels = payload.d.channels;
this.reconnectDelay = 400;
console.log("[gateway] ready");
break; break;
} }
} }
}; };
this.ws.onclose = () => { this.ws.onclose = () => {
if (this.reconnectDelay < 60000) {
this.reconnectDelay *= 2;
}
this.authenticated = false; this.authenticated = false;
this.user = null; this.user = null;
this.channels = null; this.channels = null;
if (this.heartbeatInterval) { if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval); clearInterval(this.heartbeatInterval);
} }
this.reconnectTimeout = setTimeout(() => {
this.init();
}, this.reconnectDelay);
console.log("[gateway] close");
}; };
return true;
}, },
send(data) { send(data) {
return this.ws.send(JSON.stringify(data)); return this.ws.send(JSON.stringify(data));

View file

@ -1,4 +1,9 @@
import App from './components/App.svelte'; import App from './components/App.svelte';
import gateway from './gateway';
import { initStorageDefaults } from './storage';
initStorageDefaults();
gateway.init();
const app = new App({ const app = new App({
target: document.body target: document.body

View file

@ -14,10 +14,7 @@ const dummyProvider = {
}; };
function getProvider() { function getProvider() {
if (!window.localStorage || !window.localStorage.getItem || !window.localStorage.setItem) { return window.localStorage || dummyProvider;
return dummyProvider;
}
return window.localStorage;
} }
export function getItem(key) { export function getItem(key) {

View file

@ -12,4 +12,8 @@ export const gatewayErrors = {
BAD_AUTH: { code: 4002, message: "Bad authentication" }, BAD_AUTH: { code: 4002, message: "Bad authentication" },
AUTHENTICATION_TIMEOUT: { code: 4003, message: "Authentication timeout" }, AUTHENTICATION_TIMEOUT: { code: 4003, message: "Authentication timeout" },
NO_PING: { code: 4004, message: "No ping" }, NO_PING: { code: 4004, message: "No ping" },
FLOODING: { code: 4005, message: "Flooding (exceeded maximum messages per batch)" },
ALREADY_AUTHENTICATED: { code: 4006, message: "Already authenticated" },
PAYLOAD_TOO_LARGE: { code: 4007, message: "Payload too large" },
TOO_MANY_SESSIONS: { code: 4008, message: "Too many sessions" },
}; };

View file

@ -7,12 +7,17 @@ import { gatewayErrors } from "../errors";
import { GatewayPayload } from "../types/gatewaypayload"; import { GatewayPayload } from "../types/gatewaypayload";
import { GatewayPayloadType } from "./gatewaypayloadtype"; import { GatewayPayloadType } from "./gatewaypayloadtype";
const GATEWAY_BATCH_INTERVAL = 25000 || process.env.GATEWAY_BATCH_INTERVAL; const GATEWAY_BATCH_INTERVAL = 50000;
const GATEWAY_PING_INTERVAL = 20000 || process.env.GATEWAY_PING_INTERVAL; const GATEWAY_PING_INTERVAL = 40000;
const MAX_CLIENT_MESSAGES_PER_BATCH = 6; // TODO: how well does this work for weak connections?
const MAX_GATEWAY_SESSIONS_PER_USER = 5;
// mapping between a dispatch id and a websocket client // mapping between a dispatch id and a websocket client
const dispatchChannels = new Map<string, Set<WebSocket>>(); const dispatchChannels = new Map<string, Set<WebSocket>>();
// mapping between a user id and the websocket sessions it has
const sessionsByUserId = new Map<number, Set<WebSocket>>();
function clientSubscribe(ws: WebSocket, dispatchChannel: string) { function clientSubscribe(ws: WebSocket, dispatchChannel: string) {
ws.state.dispatchChannels.add(dispatchChannel); ws.state.dispatchChannels.add(dispatchChannel);
if (!dispatchChannels.get(dispatchChannel)) { if (!dispatchChannels.get(dispatchChannel)) {
@ -128,6 +133,7 @@ export default function(server: Server) {
if (!e.state.alive) { if (!e.state.alive) {
return closeWithError(e, gatewayErrors.NO_PING); return closeWithError(e, gatewayErrors.NO_PING);
} }
e.state.messagesSinceLastCheck = 0;
} }
}); });
}, GATEWAY_BATCH_INTERVAL); }, GATEWAY_BATCH_INTERVAL);
@ -144,7 +150,8 @@ export default function(server: Server) {
alive: false, alive: false,
ready: false, ready: false,
lastAliveCheck: performance.now(), lastAliveCheck: performance.now(),
dispatchChannels: new Set() dispatchChannels: new Set(),
messagesSinceLastCheck: 0
}; };
sendPayload(ws, { sendPayload(ws, {
@ -156,6 +163,15 @@ export default function(server: Server) {
ws.on("close", () => { ws.on("close", () => {
clientUnsubscribeAll(ws); clientUnsubscribeAll(ws);
if (ws.state.user && ws.state.user.id) {
const sessions = sessionsByUserId.get(ws.state.user.id);
if (sessions) {
sessions.delete(ws);
if (sessions.size < 1) {
sessionsByUserId.delete(ws.state.user.id);
}
}
}
}); });
ws.on("message", async (rawData, isBinary) => { ws.on("message", async (rawData, isBinary) => {
@ -163,13 +179,27 @@ export default function(server: Server) {
return closeWithBadPayload(ws, "Binary messages are not supported"); return closeWithBadPayload(ws, "Binary messages are not supported");
} }
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(rawData.toString())); ws.state.messagesSinceLastCheck++;
if (ws.state.messagesSinceLastCheck > MAX_CLIENT_MESSAGES_PER_BATCH) {
return closeWithError(ws, gatewayErrors.FLOODING);
}
const stringData = rawData.toString();
if (stringData.length > 2048) {
return closeWithError(ws, gatewayErrors.PAYLOAD_TOO_LARGE);
}
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(stringData));
if (!payload) { if (!payload) {
return closeWithBadPayload(ws, "Invalid JSON or message does not match schema"); return closeWithBadPayload(ws, "Invalid JSON or message does not match schema");
} }
switch (payload.t) { switch (payload.t) {
case GatewayPayloadType.Authenticate: { case GatewayPayloadType.Authenticate: {
if (ws.state.ready) {
return closeWithError(ws, gatewayErrors.ALREADY_AUTHENTICATED);
}
const token = payload.d; const token = payload.d;
if (typeof token !== "string") { if (typeof token !== "string") {
return closeWithBadPayload(ws, "d: expected string"); return closeWithBadPayload(ws, "d: expected string");
@ -178,7 +208,19 @@ export default function(server: Server) {
if (!user) { if (!user) {
return closeWithError(ws, gatewayErrors.BAD_AUTH); return closeWithError(ws, gatewayErrors.BAD_AUTH);
} }
// each user should have their own list of channels that they join
let sessions = sessionsByUserId.get(user.id);
if (sessions) {
if ((sessions.size + 1) > MAX_GATEWAY_SESSIONS_PER_USER) {
return closeWithError(ws, gatewayErrors.TOO_MANY_SESSIONS);
}
} else {
sessions = new Set();
sessionsByUserId.set(user.id, sessions);
}
sessions.add(ws);
// TODO: each user should have their own list of channels that they join
const channels = await query("SELECT id, name, owner_id FROM channels"); const channels = await query("SELECT id, name, owner_id FROM channels");
clientSubscribe(ws, "*"); clientSubscribe(ws, "*");
@ -199,7 +241,11 @@ export default function(server: Server) {
break; break;
} }
case GatewayPayloadType.Ping: { case GatewayPayloadType.Ping: {
// TODO: also check session here and ensure packet is sent at the right time if (payload.d !== 0) {
return closeWithBadPayload(ws, "d: expected numeric '0'");
}
// TODO: also check session here
ws.state.alive = true; ws.state.alive = true;
break; break;
} }

View file

@ -1,13 +1,12 @@
import { Application, json } from "express"; import express, { Application, json } from "express";
import usersRouter from "./routes/api/v1/users"; import usersRouter from "./routes/api/v1/users";
import channelsRouter from "./routes/api/v1/channels"; import channelsRouter from "./routes/api/v1/channels";
import messagesRouter from "./routes/api/v1/messages"; import messagesRouter from "./routes/api/v1/messages";
export default function(app: Application) { export default function(app: Application) {
app.get("/", (req, res) => res.send("hello!"));
app.use(json()); app.use(json());
app.use("/api/v1/users", usersRouter); app.use("/api/v1/users", usersRouter);
app.use("/api/v1/channels", channelsRouter); app.use("/api/v1/channels", channelsRouter);
app.use("/api/v1/messages", messagesRouter); app.use("/api/v1/messages", messagesRouter);
app.use("/", express.static("frontend/public"));
}; };

View file

@ -3,5 +3,6 @@ interface GatewayClientState {
ready: boolean, ready: boolean,
alive: boolean, alive: boolean,
lastAliveCheck: number, lastAliveCheck: number,
dispatchChannels: Set<string> dispatchChannels: Set<string>,
messagesSinceLastCheck: number
} }