Compare commits
9 commits
a9a4cdbb5c
...
1885653815
Author | SHA1 | Date | |
---|---|---|---|
|
1885653815 | ||
|
fdbf6c9839 | ||
|
e329c64eb7 | ||
|
bf5e4f554e | ||
|
538717cfc9 | ||
|
fec30b7ec9 | ||
|
9c9f764e6d | ||
|
2f19cb211d | ||
|
d21408ac63 |
7 changed files with 97 additions and 17 deletions
|
@ -21,16 +21,28 @@ export default {
|
|||
heartbeatInterval: null,
|
||||
user: null,
|
||||
channels: null,
|
||||
reconnectDelay: 400,
|
||||
reconnectTimeout: null,
|
||||
init() {
|
||||
const token = getAuthToken();
|
||||
if (!token) {
|
||||
return false;
|
||||
}
|
||||
this.ws = new WebSocket(getItem("gatewayBase"));
|
||||
this.ws.onmessage = (message) => {
|
||||
const payload = JSON.parse(message);
|
||||
this.ws.onopen = () => {
|
||||
if (this.reconnectTimeout) {
|
||||
clearTimeout(this.reconnectTimeout);
|
||||
}
|
||||
console.log("[gateway] open");
|
||||
};
|
||||
this.ws.onmessage = (event) => {
|
||||
const payload = JSON.parse(event.data);
|
||||
|
||||
switch (payload.t) {
|
||||
case GatewayPayloadType.Hello: {
|
||||
this.send({
|
||||
t: GatewayPayloadType.Authenticate,
|
||||
d: getAuthToken()
|
||||
d: token
|
||||
});
|
||||
|
||||
this.heartbeatInterval = setInterval(() => {
|
||||
|
@ -39,23 +51,39 @@ export default {
|
|||
d: 0
|
||||
});
|
||||
}, payload.d.pingInterval);
|
||||
|
||||
console.log("[gateway] hello");
|
||||
break;
|
||||
}
|
||||
case GatewayPayloadType.Ready: {
|
||||
this.user = payload.d.user;
|
||||
this.channels = payload.d.channels;
|
||||
|
||||
this.reconnectDelay = 400;
|
||||
|
||||
console.log("[gateway] ready");
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
this.ws.onclose = () => {
|
||||
if (this.reconnectDelay < 60000) {
|
||||
this.reconnectDelay *= 2;
|
||||
}
|
||||
this.authenticated = false;
|
||||
this.user = null;
|
||||
this.channels = null;
|
||||
if (this.heartbeatInterval) {
|
||||
clearInterval(this.heartbeatInterval);
|
||||
}
|
||||
this.reconnectTimeout = setTimeout(() => {
|
||||
this.init();
|
||||
}, this.reconnectDelay);
|
||||
|
||||
console.log("[gateway] close");
|
||||
};
|
||||
|
||||
return true;
|
||||
},
|
||||
send(data) {
|
||||
return this.ws.send(JSON.stringify(data));
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
import App from './components/App.svelte';
|
||||
import gateway from './gateway';
|
||||
import { initStorageDefaults } from './storage';
|
||||
|
||||
initStorageDefaults();
|
||||
gateway.init();
|
||||
|
||||
const app = new App({
|
||||
target: document.body
|
||||
|
|
|
@ -14,10 +14,7 @@ const dummyProvider = {
|
|||
};
|
||||
|
||||
function getProvider() {
|
||||
if (!window.localStorage || !window.localStorage.getItem || !window.localStorage.setItem) {
|
||||
return dummyProvider;
|
||||
}
|
||||
return window.localStorage;
|
||||
return window.localStorage || dummyProvider;
|
||||
}
|
||||
|
||||
export function getItem(key) {
|
||||
|
|
|
@ -12,4 +12,8 @@ export const gatewayErrors = {
|
|||
BAD_AUTH: { code: 4002, message: "Bad authentication" },
|
||||
AUTHENTICATION_TIMEOUT: { code: 4003, message: "Authentication timeout" },
|
||||
NO_PING: { code: 4004, message: "No ping" },
|
||||
FLOODING: { code: 4005, message: "Flooding (exceeded maximum messages per batch)" },
|
||||
ALREADY_AUTHENTICATED: { code: 4006, message: "Already authenticated" },
|
||||
PAYLOAD_TOO_LARGE: { code: 4007, message: "Payload too large" },
|
||||
TOO_MANY_SESSIONS: { code: 4008, message: "Too many sessions" },
|
||||
};
|
||||
|
|
|
@ -7,12 +7,17 @@ import { gatewayErrors } from "../errors";
|
|||
import { GatewayPayload } from "../types/gatewaypayload";
|
||||
import { GatewayPayloadType } from "./gatewaypayloadtype";
|
||||
|
||||
const GATEWAY_BATCH_INTERVAL = 25000 || process.env.GATEWAY_BATCH_INTERVAL;
|
||||
const GATEWAY_PING_INTERVAL = 20000 || process.env.GATEWAY_PING_INTERVAL;
|
||||
const GATEWAY_BATCH_INTERVAL = 50000;
|
||||
const GATEWAY_PING_INTERVAL = 40000;
|
||||
const MAX_CLIENT_MESSAGES_PER_BATCH = 6; // TODO: how well does this work for weak connections?
|
||||
const MAX_GATEWAY_SESSIONS_PER_USER = 5;
|
||||
|
||||
// mapping between a dispatch id and a websocket client
|
||||
const dispatchChannels = new Map<string, Set<WebSocket>>();
|
||||
|
||||
// mapping between a user id and the websocket sessions it has
|
||||
const sessionsByUserId = new Map<number, Set<WebSocket>>();
|
||||
|
||||
function clientSubscribe(ws: WebSocket, dispatchChannel: string) {
|
||||
ws.state.dispatchChannels.add(dispatchChannel);
|
||||
if (!dispatchChannels.get(dispatchChannel)) {
|
||||
|
@ -128,6 +133,7 @@ export default function(server: Server) {
|
|||
if (!e.state.alive) {
|
||||
return closeWithError(e, gatewayErrors.NO_PING);
|
||||
}
|
||||
e.state.messagesSinceLastCheck = 0;
|
||||
}
|
||||
});
|
||||
}, GATEWAY_BATCH_INTERVAL);
|
||||
|
@ -144,7 +150,8 @@ export default function(server: Server) {
|
|||
alive: false,
|
||||
ready: false,
|
||||
lastAliveCheck: performance.now(),
|
||||
dispatchChannels: new Set()
|
||||
dispatchChannels: new Set(),
|
||||
messagesSinceLastCheck: 0
|
||||
};
|
||||
|
||||
sendPayload(ws, {
|
||||
|
@ -156,6 +163,15 @@ export default function(server: Server) {
|
|||
|
||||
ws.on("close", () => {
|
||||
clientUnsubscribeAll(ws);
|
||||
if (ws.state.user && ws.state.user.id) {
|
||||
const sessions = sessionsByUserId.get(ws.state.user.id);
|
||||
if (sessions) {
|
||||
sessions.delete(ws);
|
||||
if (sessions.size < 1) {
|
||||
sessionsByUserId.delete(ws.state.user.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
ws.on("message", async (rawData, isBinary) => {
|
||||
|
@ -163,13 +179,27 @@ export default function(server: Server) {
|
|||
return closeWithBadPayload(ws, "Binary messages are not supported");
|
||||
}
|
||||
|
||||
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(rawData.toString()));
|
||||
ws.state.messagesSinceLastCheck++;
|
||||
if (ws.state.messagesSinceLastCheck > MAX_CLIENT_MESSAGES_PER_BATCH) {
|
||||
return closeWithError(ws, gatewayErrors.FLOODING);
|
||||
}
|
||||
|
||||
const stringData = rawData.toString();
|
||||
if (stringData.length > 2048) {
|
||||
return closeWithError(ws, gatewayErrors.PAYLOAD_TOO_LARGE);
|
||||
}
|
||||
|
||||
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(stringData));
|
||||
if (!payload) {
|
||||
return closeWithBadPayload(ws, "Invalid JSON or message does not match schema");
|
||||
}
|
||||
|
||||
switch (payload.t) {
|
||||
case GatewayPayloadType.Authenticate: {
|
||||
if (ws.state.ready) {
|
||||
return closeWithError(ws, gatewayErrors.ALREADY_AUTHENTICATED);
|
||||
}
|
||||
|
||||
const token = payload.d;
|
||||
if (typeof token !== "string") {
|
||||
return closeWithBadPayload(ws, "d: expected string");
|
||||
|
@ -178,7 +208,19 @@ export default function(server: Server) {
|
|||
if (!user) {
|
||||
return closeWithError(ws, gatewayErrors.BAD_AUTH);
|
||||
}
|
||||
// each user should have their own list of channels that they join
|
||||
|
||||
let sessions = sessionsByUserId.get(user.id);
|
||||
if (sessions) {
|
||||
if ((sessions.size + 1) > MAX_GATEWAY_SESSIONS_PER_USER) {
|
||||
return closeWithError(ws, gatewayErrors.TOO_MANY_SESSIONS);
|
||||
}
|
||||
} else {
|
||||
sessions = new Set();
|
||||
sessionsByUserId.set(user.id, sessions);
|
||||
}
|
||||
sessions.add(ws);
|
||||
|
||||
// TODO: each user should have their own list of channels that they join
|
||||
const channels = await query("SELECT id, name, owner_id FROM channels");
|
||||
|
||||
clientSubscribe(ws, "*");
|
||||
|
@ -199,7 +241,11 @@ export default function(server: Server) {
|
|||
break;
|
||||
}
|
||||
case GatewayPayloadType.Ping: {
|
||||
// TODO: also check session here and ensure packet is sent at the right time
|
||||
if (payload.d !== 0) {
|
||||
return closeWithBadPayload(ws, "d: expected numeric '0'");
|
||||
}
|
||||
|
||||
// TODO: also check session here
|
||||
ws.state.alive = true;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
import { Application, json } from "express";
|
||||
import express, { Application, json } from "express";
|
||||
import usersRouter from "./routes/api/v1/users";
|
||||
import channelsRouter from "./routes/api/v1/channels";
|
||||
import messagesRouter from "./routes/api/v1/messages";
|
||||
|
||||
export default function(app: Application) {
|
||||
app.get("/", (req, res) => res.send("hello!"));
|
||||
|
||||
app.use(json());
|
||||
app.use("/api/v1/users", usersRouter);
|
||||
app.use("/api/v1/channels", channelsRouter);
|
||||
app.use("/api/v1/messages", messagesRouter);
|
||||
app.use("/", express.static("frontend/public"));
|
||||
};
|
||||
|
|
3
src/types/gatewayclientstate.d.ts
vendored
3
src/types/gatewayclientstate.d.ts
vendored
|
@ -3,5 +3,6 @@ interface GatewayClientState {
|
|||
ready: boolean,
|
||||
alive: boolean,
|
||||
lastAliveCheck: number,
|
||||
dispatchChannels: Set<string>
|
||||
dispatchChannels: Set<string>,
|
||||
messagesSinceLastCheck: number
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue