Compare commits

..

9 commits

7 changed files with 97 additions and 17 deletions

View file

@ -21,16 +21,28 @@ export default {
heartbeatInterval: null,
user: null,
channels: null,
reconnectDelay: 400,
reconnectTimeout: null,
init() {
const token = getAuthToken();
if (!token) {
return false;
}
this.ws = new WebSocket(getItem("gatewayBase"));
this.ws.onmessage = (message) => {
const payload = JSON.parse(message);
this.ws.onopen = () => {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}
console.log("[gateway] open");
};
this.ws.onmessage = (event) => {
const payload = JSON.parse(event.data);
switch (payload.t) {
case GatewayPayloadType.Hello: {
this.send({
t: GatewayPayloadType.Authenticate,
d: getAuthToken()
d: token
});
this.heartbeatInterval = setInterval(() => {
@ -39,23 +51,39 @@ export default {
d: 0
});
}, payload.d.pingInterval);
console.log("[gateway] hello");
break;
}
case GatewayPayloadType.Ready: {
this.user = payload.d.user;
this.channels = payload.d.channels;
this.reconnectDelay = 400;
console.log("[gateway] ready");
break;
}
}
};
this.ws.onclose = () => {
if (this.reconnectDelay < 60000) {
this.reconnectDelay *= 2;
}
this.authenticated = false;
this.user = null;
this.channels = null;
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
this.reconnectTimeout = setTimeout(() => {
this.init();
}, this.reconnectDelay);
console.log("[gateway] close");
};
return true;
},
send(data) {
return this.ws.send(JSON.stringify(data));

View file

@ -1,4 +1,9 @@
import App from './components/App.svelte';
import gateway from './gateway';
import { initStorageDefaults } from './storage';
initStorageDefaults();
gateway.init();
const app = new App({
target: document.body

View file

@ -14,10 +14,7 @@ const dummyProvider = {
};
function getProvider() {
if (!window.localStorage || !window.localStorage.getItem || !window.localStorage.setItem) {
return dummyProvider;
}
return window.localStorage;
return window.localStorage || dummyProvider;
}
export function getItem(key) {

View file

@ -12,4 +12,8 @@ export const gatewayErrors = {
BAD_AUTH: { code: 4002, message: "Bad authentication" },
AUTHENTICATION_TIMEOUT: { code: 4003, message: "Authentication timeout" },
NO_PING: { code: 4004, message: "No ping" },
FLOODING: { code: 4005, message: "Flooding (exceeded maximum messages per batch)" },
ALREADY_AUTHENTICATED: { code: 4006, message: "Already authenticated" },
PAYLOAD_TOO_LARGE: { code: 4007, message: "Payload too large" },
TOO_MANY_SESSIONS: { code: 4008, message: "Too many sessions" },
};

View file

@ -7,12 +7,17 @@ import { gatewayErrors } from "../errors";
import { GatewayPayload } from "../types/gatewaypayload";
import { GatewayPayloadType } from "./gatewaypayloadtype";
const GATEWAY_BATCH_INTERVAL = 25000 || process.env.GATEWAY_BATCH_INTERVAL;
const GATEWAY_PING_INTERVAL = 20000 || process.env.GATEWAY_PING_INTERVAL;
const GATEWAY_BATCH_INTERVAL = 50000;
const GATEWAY_PING_INTERVAL = 40000;
const MAX_CLIENT_MESSAGES_PER_BATCH = 6; // TODO: how well does this work for weak connections?
const MAX_GATEWAY_SESSIONS_PER_USER = 5;
// mapping between a dispatch id and a websocket client
const dispatchChannels = new Map<string, Set<WebSocket>>();
// mapping between a user id and the websocket sessions it has
const sessionsByUserId = new Map<number, Set<WebSocket>>();
function clientSubscribe(ws: WebSocket, dispatchChannel: string) {
ws.state.dispatchChannels.add(dispatchChannel);
if (!dispatchChannels.get(dispatchChannel)) {
@ -128,6 +133,7 @@ export default function(server: Server) {
if (!e.state.alive) {
return closeWithError(e, gatewayErrors.NO_PING);
}
e.state.messagesSinceLastCheck = 0;
}
});
}, GATEWAY_BATCH_INTERVAL);
@ -144,7 +150,8 @@ export default function(server: Server) {
alive: false,
ready: false,
lastAliveCheck: performance.now(),
dispatchChannels: new Set()
dispatchChannels: new Set(),
messagesSinceLastCheck: 0
};
sendPayload(ws, {
@ -156,6 +163,15 @@ export default function(server: Server) {
ws.on("close", () => {
clientUnsubscribeAll(ws);
if (ws.state.user && ws.state.user.id) {
const sessions = sessionsByUserId.get(ws.state.user.id);
if (sessions) {
sessions.delete(ws);
if (sessions.size < 1) {
sessionsByUserId.delete(ws.state.user.id);
}
}
}
});
ws.on("message", async (rawData, isBinary) => {
@ -163,13 +179,27 @@ export default function(server: Server) {
return closeWithBadPayload(ws, "Binary messages are not supported");
}
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(rawData.toString()));
ws.state.messagesSinceLastCheck++;
if (ws.state.messagesSinceLastCheck > MAX_CLIENT_MESSAGES_PER_BATCH) {
return closeWithError(ws, gatewayErrors.FLOODING);
}
const stringData = rawData.toString();
if (stringData.length > 2048) {
return closeWithError(ws, gatewayErrors.PAYLOAD_TOO_LARGE);
}
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(stringData));
if (!payload) {
return closeWithBadPayload(ws, "Invalid JSON or message does not match schema");
}
switch (payload.t) {
case GatewayPayloadType.Authenticate: {
if (ws.state.ready) {
return closeWithError(ws, gatewayErrors.ALREADY_AUTHENTICATED);
}
const token = payload.d;
if (typeof token !== "string") {
return closeWithBadPayload(ws, "d: expected string");
@ -178,7 +208,19 @@ export default function(server: Server) {
if (!user) {
return closeWithError(ws, gatewayErrors.BAD_AUTH);
}
// each user should have their own list of channels that they join
let sessions = sessionsByUserId.get(user.id);
if (sessions) {
if ((sessions.size + 1) > MAX_GATEWAY_SESSIONS_PER_USER) {
return closeWithError(ws, gatewayErrors.TOO_MANY_SESSIONS);
}
} else {
sessions = new Set();
sessionsByUserId.set(user.id, sessions);
}
sessions.add(ws);
// TODO: each user should have their own list of channels that they join
const channels = await query("SELECT id, name, owner_id FROM channels");
clientSubscribe(ws, "*");
@ -199,7 +241,11 @@ export default function(server: Server) {
break;
}
case GatewayPayloadType.Ping: {
// TODO: also check session here and ensure packet is sent at the right time
if (payload.d !== 0) {
return closeWithBadPayload(ws, "d: expected numeric '0'");
}
// TODO: also check session here
ws.state.alive = true;
break;
}

View file

@ -1,13 +1,12 @@
import { Application, json } from "express";
import express, { Application, json } from "express";
import usersRouter from "./routes/api/v1/users";
import channelsRouter from "./routes/api/v1/channels";
import messagesRouter from "./routes/api/v1/messages";
export default function(app: Application) {
app.get("/", (req, res) => res.send("hello!"));
app.use(json());
app.use("/api/v1/users", usersRouter);
app.use("/api/v1/channels", channelsRouter);
app.use("/api/v1/messages", messagesRouter);
app.use("/", express.static("frontend/public"));
};

View file

@ -3,5 +3,6 @@ interface GatewayClientState {
ready: boolean,
alive: boolean,
lastAliveCheck: number,
dispatchChannels: Set<string>
dispatchChannels: Set<string>,
messagesSinceLastCheck: number
}