Compare commits

..

No commits in common. "188565381586681dafdc0a6a034e1ddfe49ef93f" and "a9a4cdbb5c844aa9ce98dbdc5f633f59c20dd8e8" have entirely different histories.

7 changed files with 17 additions and 97 deletions

View file

@ -21,28 +21,16 @@ export default {
heartbeatInterval: null, heartbeatInterval: null,
user: null, user: null,
channels: null, channels: null,
reconnectDelay: 400,
reconnectTimeout: null,
init() { init() {
const token = getAuthToken();
if (!token) {
return false;
}
this.ws = new WebSocket(getItem("gatewayBase")); this.ws = new WebSocket(getItem("gatewayBase"));
this.ws.onopen = () => { this.ws.onmessage = (message) => {
if (this.reconnectTimeout) { const payload = JSON.parse(message);
clearTimeout(this.reconnectTimeout);
}
console.log("[gateway] open");
};
this.ws.onmessage = (event) => {
const payload = JSON.parse(event.data);
switch (payload.t) { switch (payload.t) {
case GatewayPayloadType.Hello: { case GatewayPayloadType.Hello: {
this.send({ this.send({
t: GatewayPayloadType.Authenticate, t: GatewayPayloadType.Authenticate,
d: token d: getAuthToken()
}); });
this.heartbeatInterval = setInterval(() => { this.heartbeatInterval = setInterval(() => {
@ -51,39 +39,23 @@ export default {
d: 0 d: 0
}); });
}, payload.d.pingInterval); }, payload.d.pingInterval);
console.log("[gateway] hello");
break; break;
} }
case GatewayPayloadType.Ready: { case GatewayPayloadType.Ready: {
this.user = payload.d.user; this.user = payload.d.user;
this.channels = payload.d.channels; this.channels = payload.d.channels;
this.reconnectDelay = 400;
console.log("[gateway] ready");
break; break;
} }
} }
}; };
this.ws.onclose = () => { this.ws.onclose = () => {
if (this.reconnectDelay < 60000) {
this.reconnectDelay *= 2;
}
this.authenticated = false; this.authenticated = false;
this.user = null; this.user = null;
this.channels = null; this.channels = null;
if (this.heartbeatInterval) { if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval); clearInterval(this.heartbeatInterval);
} }
this.reconnectTimeout = setTimeout(() => {
this.init();
}, this.reconnectDelay);
console.log("[gateway] close");
}; };
return true;
}, },
send(data) { send(data) {
return this.ws.send(JSON.stringify(data)); return this.ws.send(JSON.stringify(data));

View file

@ -1,9 +1,4 @@
import App from './components/App.svelte'; import App from './components/App.svelte';
import gateway from './gateway';
import { initStorageDefaults } from './storage';
initStorageDefaults();
gateway.init();
const app = new App({ const app = new App({
target: document.body target: document.body

View file

@ -14,7 +14,10 @@ const dummyProvider = {
}; };
function getProvider() { function getProvider() {
return window.localStorage || dummyProvider; if (!window.localStorage || !window.localStorage.getItem || !window.localStorage.setItem) {
return dummyProvider;
}
return window.localStorage;
} }
export function getItem(key) { export function getItem(key) {

View file

@ -12,8 +12,4 @@ export const gatewayErrors = {
BAD_AUTH: { code: 4002, message: "Bad authentication" }, BAD_AUTH: { code: 4002, message: "Bad authentication" },
AUTHENTICATION_TIMEOUT: { code: 4003, message: "Authentication timeout" }, AUTHENTICATION_TIMEOUT: { code: 4003, message: "Authentication timeout" },
NO_PING: { code: 4004, message: "No ping" }, NO_PING: { code: 4004, message: "No ping" },
FLOODING: { code: 4005, message: "Flooding (exceeded maximum messages per batch)" },
ALREADY_AUTHENTICATED: { code: 4006, message: "Already authenticated" },
PAYLOAD_TOO_LARGE: { code: 4007, message: "Payload too large" },
TOO_MANY_SESSIONS: { code: 4008, message: "Too many sessions" },
}; };

View file

@ -7,17 +7,12 @@ import { gatewayErrors } from "../errors";
import { GatewayPayload } from "../types/gatewaypayload"; import { GatewayPayload } from "../types/gatewaypayload";
import { GatewayPayloadType } from "./gatewaypayloadtype"; import { GatewayPayloadType } from "./gatewaypayloadtype";
const GATEWAY_BATCH_INTERVAL = 50000; const GATEWAY_BATCH_INTERVAL = 25000 || process.env.GATEWAY_BATCH_INTERVAL;
const GATEWAY_PING_INTERVAL = 40000; const GATEWAY_PING_INTERVAL = 20000 || process.env.GATEWAY_PING_INTERVAL;
const MAX_CLIENT_MESSAGES_PER_BATCH = 6; // TODO: how well does this work for weak connections?
const MAX_GATEWAY_SESSIONS_PER_USER = 5;
// mapping between a dispatch id and a websocket client // mapping between a dispatch id and a websocket client
const dispatchChannels = new Map<string, Set<WebSocket>>(); const dispatchChannels = new Map<string, Set<WebSocket>>();
// mapping between a user id and the websocket sessions it has
const sessionsByUserId = new Map<number, Set<WebSocket>>();
function clientSubscribe(ws: WebSocket, dispatchChannel: string) { function clientSubscribe(ws: WebSocket, dispatchChannel: string) {
ws.state.dispatchChannels.add(dispatchChannel); ws.state.dispatchChannels.add(dispatchChannel);
if (!dispatchChannels.get(dispatchChannel)) { if (!dispatchChannels.get(dispatchChannel)) {
@ -133,7 +128,6 @@ export default function(server: Server) {
if (!e.state.alive) { if (!e.state.alive) {
return closeWithError(e, gatewayErrors.NO_PING); return closeWithError(e, gatewayErrors.NO_PING);
} }
e.state.messagesSinceLastCheck = 0;
} }
}); });
}, GATEWAY_BATCH_INTERVAL); }, GATEWAY_BATCH_INTERVAL);
@ -150,8 +144,7 @@ export default function(server: Server) {
alive: false, alive: false,
ready: false, ready: false,
lastAliveCheck: performance.now(), lastAliveCheck: performance.now(),
dispatchChannels: new Set(), dispatchChannels: new Set()
messagesSinceLastCheck: 0
}; };
sendPayload(ws, { sendPayload(ws, {
@ -163,15 +156,6 @@ export default function(server: Server) {
ws.on("close", () => { ws.on("close", () => {
clientUnsubscribeAll(ws); clientUnsubscribeAll(ws);
if (ws.state.user && ws.state.user.id) {
const sessions = sessionsByUserId.get(ws.state.user.id);
if (sessions) {
sessions.delete(ws);
if (sessions.size < 1) {
sessionsByUserId.delete(ws.state.user.id);
}
}
}
}); });
ws.on("message", async (rawData, isBinary) => { ws.on("message", async (rawData, isBinary) => {
@ -179,27 +163,13 @@ export default function(server: Server) {
return closeWithBadPayload(ws, "Binary messages are not supported"); return closeWithBadPayload(ws, "Binary messages are not supported");
} }
ws.state.messagesSinceLastCheck++; const payload = ensureFormattedGatewayPayload(parseJsonOrNull(rawData.toString()));
if (ws.state.messagesSinceLastCheck > MAX_CLIENT_MESSAGES_PER_BATCH) {
return closeWithError(ws, gatewayErrors.FLOODING);
}
const stringData = rawData.toString();
if (stringData.length > 2048) {
return closeWithError(ws, gatewayErrors.PAYLOAD_TOO_LARGE);
}
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(stringData));
if (!payload) { if (!payload) {
return closeWithBadPayload(ws, "Invalid JSON or message does not match schema"); return closeWithBadPayload(ws, "Invalid JSON or message does not match schema");
} }
switch (payload.t) { switch (payload.t) {
case GatewayPayloadType.Authenticate: { case GatewayPayloadType.Authenticate: {
if (ws.state.ready) {
return closeWithError(ws, gatewayErrors.ALREADY_AUTHENTICATED);
}
const token = payload.d; const token = payload.d;
if (typeof token !== "string") { if (typeof token !== "string") {
return closeWithBadPayload(ws, "d: expected string"); return closeWithBadPayload(ws, "d: expected string");
@ -208,19 +178,7 @@ export default function(server: Server) {
if (!user) { if (!user) {
return closeWithError(ws, gatewayErrors.BAD_AUTH); return closeWithError(ws, gatewayErrors.BAD_AUTH);
} }
// each user should have their own list of channels that they join
let sessions = sessionsByUserId.get(user.id);
if (sessions) {
if ((sessions.size + 1) > MAX_GATEWAY_SESSIONS_PER_USER) {
return closeWithError(ws, gatewayErrors.TOO_MANY_SESSIONS);
}
} else {
sessions = new Set();
sessionsByUserId.set(user.id, sessions);
}
sessions.add(ws);
// TODO: each user should have their own list of channels that they join
const channels = await query("SELECT id, name, owner_id FROM channels"); const channels = await query("SELECT id, name, owner_id FROM channels");
clientSubscribe(ws, "*"); clientSubscribe(ws, "*");
@ -241,11 +199,7 @@ export default function(server: Server) {
break; break;
} }
case GatewayPayloadType.Ping: { case GatewayPayloadType.Ping: {
if (payload.d !== 0) { // TODO: also check session here and ensure packet is sent at the right time
return closeWithBadPayload(ws, "d: expected numeric '0'");
}
// TODO: also check session here
ws.state.alive = true; ws.state.alive = true;
break; break;
} }

View file

@ -1,12 +1,13 @@
import express, { Application, json } from "express"; import { Application, json } from "express";
import usersRouter from "./routes/api/v1/users"; import usersRouter from "./routes/api/v1/users";
import channelsRouter from "./routes/api/v1/channels"; import channelsRouter from "./routes/api/v1/channels";
import messagesRouter from "./routes/api/v1/messages"; import messagesRouter from "./routes/api/v1/messages";
export default function(app: Application) { export default function(app: Application) {
app.get("/", (req, res) => res.send("hello!"));
app.use(json()); app.use(json());
app.use("/api/v1/users", usersRouter); app.use("/api/v1/users", usersRouter);
app.use("/api/v1/channels", channelsRouter); app.use("/api/v1/channels", channelsRouter);
app.use("/api/v1/messages", messagesRouter); app.use("/api/v1/messages", messagesRouter);
app.use("/", express.static("frontend/public"));
}; };

View file

@ -3,6 +3,5 @@ interface GatewayClientState {
ready: boolean, ready: boolean,
alive: boolean, alive: boolean,
lastAliveCheck: number, lastAliveCheck: number,
dispatchChannels: Set<string>, dispatchChannels: Set<string>
messagesSinceLastCheck: number
} }