Compare commits
9 commits
a9a4cdbb5c
...
1885653815
Author | SHA1 | Date | |
---|---|---|---|
|
1885653815 | ||
|
fdbf6c9839 | ||
|
e329c64eb7 | ||
|
bf5e4f554e | ||
|
538717cfc9 | ||
|
fec30b7ec9 | ||
|
9c9f764e6d | ||
|
2f19cb211d | ||
|
d21408ac63 |
7 changed files with 97 additions and 17 deletions
|
@ -21,16 +21,28 @@ export default {
|
||||||
heartbeatInterval: null,
|
heartbeatInterval: null,
|
||||||
user: null,
|
user: null,
|
||||||
channels: null,
|
channels: null,
|
||||||
|
reconnectDelay: 400,
|
||||||
|
reconnectTimeout: null,
|
||||||
init() {
|
init() {
|
||||||
|
const token = getAuthToken();
|
||||||
|
if (!token) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
this.ws = new WebSocket(getItem("gatewayBase"));
|
this.ws = new WebSocket(getItem("gatewayBase"));
|
||||||
this.ws.onmessage = (message) => {
|
this.ws.onopen = () => {
|
||||||
const payload = JSON.parse(message);
|
if (this.reconnectTimeout) {
|
||||||
|
clearTimeout(this.reconnectTimeout);
|
||||||
|
}
|
||||||
|
console.log("[gateway] open");
|
||||||
|
};
|
||||||
|
this.ws.onmessage = (event) => {
|
||||||
|
const payload = JSON.parse(event.data);
|
||||||
|
|
||||||
switch (payload.t) {
|
switch (payload.t) {
|
||||||
case GatewayPayloadType.Hello: {
|
case GatewayPayloadType.Hello: {
|
||||||
this.send({
|
this.send({
|
||||||
t: GatewayPayloadType.Authenticate,
|
t: GatewayPayloadType.Authenticate,
|
||||||
d: getAuthToken()
|
d: token
|
||||||
});
|
});
|
||||||
|
|
||||||
this.heartbeatInterval = setInterval(() => {
|
this.heartbeatInterval = setInterval(() => {
|
||||||
|
@ -39,23 +51,39 @@ export default {
|
||||||
d: 0
|
d: 0
|
||||||
});
|
});
|
||||||
}, payload.d.pingInterval);
|
}, payload.d.pingInterval);
|
||||||
|
|
||||||
|
console.log("[gateway] hello");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case GatewayPayloadType.Ready: {
|
case GatewayPayloadType.Ready: {
|
||||||
this.user = payload.d.user;
|
this.user = payload.d.user;
|
||||||
this.channels = payload.d.channels;
|
this.channels = payload.d.channels;
|
||||||
|
|
||||||
|
this.reconnectDelay = 400;
|
||||||
|
|
||||||
|
console.log("[gateway] ready");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
this.ws.onclose = () => {
|
this.ws.onclose = () => {
|
||||||
|
if (this.reconnectDelay < 60000) {
|
||||||
|
this.reconnectDelay *= 2;
|
||||||
|
}
|
||||||
this.authenticated = false;
|
this.authenticated = false;
|
||||||
this.user = null;
|
this.user = null;
|
||||||
this.channels = null;
|
this.channels = null;
|
||||||
if (this.heartbeatInterval) {
|
if (this.heartbeatInterval) {
|
||||||
clearInterval(this.heartbeatInterval);
|
clearInterval(this.heartbeatInterval);
|
||||||
}
|
}
|
||||||
|
this.reconnectTimeout = setTimeout(() => {
|
||||||
|
this.init();
|
||||||
|
}, this.reconnectDelay);
|
||||||
|
|
||||||
|
console.log("[gateway] close");
|
||||||
};
|
};
|
||||||
|
|
||||||
|
return true;
|
||||||
},
|
},
|
||||||
send(data) {
|
send(data) {
|
||||||
return this.ws.send(JSON.stringify(data));
|
return this.ws.send(JSON.stringify(data));
|
||||||
|
|
|
@ -1,4 +1,9 @@
|
||||||
import App from './components/App.svelte';
|
import App from './components/App.svelte';
|
||||||
|
import gateway from './gateway';
|
||||||
|
import { initStorageDefaults } from './storage';
|
||||||
|
|
||||||
|
initStorageDefaults();
|
||||||
|
gateway.init();
|
||||||
|
|
||||||
const app = new App({
|
const app = new App({
|
||||||
target: document.body
|
target: document.body
|
||||||
|
|
|
@ -14,10 +14,7 @@ const dummyProvider = {
|
||||||
};
|
};
|
||||||
|
|
||||||
function getProvider() {
|
function getProvider() {
|
||||||
if (!window.localStorage || !window.localStorage.getItem || !window.localStorage.setItem) {
|
return window.localStorage || dummyProvider;
|
||||||
return dummyProvider;
|
|
||||||
}
|
|
||||||
return window.localStorage;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export function getItem(key) {
|
export function getItem(key) {
|
||||||
|
|
|
@ -12,4 +12,8 @@ export const gatewayErrors = {
|
||||||
BAD_AUTH: { code: 4002, message: "Bad authentication" },
|
BAD_AUTH: { code: 4002, message: "Bad authentication" },
|
||||||
AUTHENTICATION_TIMEOUT: { code: 4003, message: "Authentication timeout" },
|
AUTHENTICATION_TIMEOUT: { code: 4003, message: "Authentication timeout" },
|
||||||
NO_PING: { code: 4004, message: "No ping" },
|
NO_PING: { code: 4004, message: "No ping" },
|
||||||
|
FLOODING: { code: 4005, message: "Flooding (exceeded maximum messages per batch)" },
|
||||||
|
ALREADY_AUTHENTICATED: { code: 4006, message: "Already authenticated" },
|
||||||
|
PAYLOAD_TOO_LARGE: { code: 4007, message: "Payload too large" },
|
||||||
|
TOO_MANY_SESSIONS: { code: 4008, message: "Too many sessions" },
|
||||||
};
|
};
|
||||||
|
|
|
@ -7,12 +7,17 @@ import { gatewayErrors } from "../errors";
|
||||||
import { GatewayPayload } from "../types/gatewaypayload";
|
import { GatewayPayload } from "../types/gatewaypayload";
|
||||||
import { GatewayPayloadType } from "./gatewaypayloadtype";
|
import { GatewayPayloadType } from "./gatewaypayloadtype";
|
||||||
|
|
||||||
const GATEWAY_BATCH_INTERVAL = 25000 || process.env.GATEWAY_BATCH_INTERVAL;
|
const GATEWAY_BATCH_INTERVAL = 50000;
|
||||||
const GATEWAY_PING_INTERVAL = 20000 || process.env.GATEWAY_PING_INTERVAL;
|
const GATEWAY_PING_INTERVAL = 40000;
|
||||||
|
const MAX_CLIENT_MESSAGES_PER_BATCH = 6; // TODO: how well does this work for weak connections?
|
||||||
|
const MAX_GATEWAY_SESSIONS_PER_USER = 5;
|
||||||
|
|
||||||
// mapping between a dispatch id and a websocket client
|
// mapping between a dispatch id and a websocket client
|
||||||
const dispatchChannels = new Map<string, Set<WebSocket>>();
|
const dispatchChannels = new Map<string, Set<WebSocket>>();
|
||||||
|
|
||||||
|
// mapping between a user id and the websocket sessions it has
|
||||||
|
const sessionsByUserId = new Map<number, Set<WebSocket>>();
|
||||||
|
|
||||||
function clientSubscribe(ws: WebSocket, dispatchChannel: string) {
|
function clientSubscribe(ws: WebSocket, dispatchChannel: string) {
|
||||||
ws.state.dispatchChannels.add(dispatchChannel);
|
ws.state.dispatchChannels.add(dispatchChannel);
|
||||||
if (!dispatchChannels.get(dispatchChannel)) {
|
if (!dispatchChannels.get(dispatchChannel)) {
|
||||||
|
@ -128,6 +133,7 @@ export default function(server: Server) {
|
||||||
if (!e.state.alive) {
|
if (!e.state.alive) {
|
||||||
return closeWithError(e, gatewayErrors.NO_PING);
|
return closeWithError(e, gatewayErrors.NO_PING);
|
||||||
}
|
}
|
||||||
|
e.state.messagesSinceLastCheck = 0;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}, GATEWAY_BATCH_INTERVAL);
|
}, GATEWAY_BATCH_INTERVAL);
|
||||||
|
@ -144,7 +150,8 @@ export default function(server: Server) {
|
||||||
alive: false,
|
alive: false,
|
||||||
ready: false,
|
ready: false,
|
||||||
lastAliveCheck: performance.now(),
|
lastAliveCheck: performance.now(),
|
||||||
dispatchChannels: new Set()
|
dispatchChannels: new Set(),
|
||||||
|
messagesSinceLastCheck: 0
|
||||||
};
|
};
|
||||||
|
|
||||||
sendPayload(ws, {
|
sendPayload(ws, {
|
||||||
|
@ -156,6 +163,15 @@ export default function(server: Server) {
|
||||||
|
|
||||||
ws.on("close", () => {
|
ws.on("close", () => {
|
||||||
clientUnsubscribeAll(ws);
|
clientUnsubscribeAll(ws);
|
||||||
|
if (ws.state.user && ws.state.user.id) {
|
||||||
|
const sessions = sessionsByUserId.get(ws.state.user.id);
|
||||||
|
if (sessions) {
|
||||||
|
sessions.delete(ws);
|
||||||
|
if (sessions.size < 1) {
|
||||||
|
sessionsByUserId.delete(ws.state.user.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
ws.on("message", async (rawData, isBinary) => {
|
ws.on("message", async (rawData, isBinary) => {
|
||||||
|
@ -163,13 +179,27 @@ export default function(server: Server) {
|
||||||
return closeWithBadPayload(ws, "Binary messages are not supported");
|
return closeWithBadPayload(ws, "Binary messages are not supported");
|
||||||
}
|
}
|
||||||
|
|
||||||
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(rawData.toString()));
|
ws.state.messagesSinceLastCheck++;
|
||||||
|
if (ws.state.messagesSinceLastCheck > MAX_CLIENT_MESSAGES_PER_BATCH) {
|
||||||
|
return closeWithError(ws, gatewayErrors.FLOODING);
|
||||||
|
}
|
||||||
|
|
||||||
|
const stringData = rawData.toString();
|
||||||
|
if (stringData.length > 2048) {
|
||||||
|
return closeWithError(ws, gatewayErrors.PAYLOAD_TOO_LARGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(stringData));
|
||||||
if (!payload) {
|
if (!payload) {
|
||||||
return closeWithBadPayload(ws, "Invalid JSON or message does not match schema");
|
return closeWithBadPayload(ws, "Invalid JSON or message does not match schema");
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (payload.t) {
|
switch (payload.t) {
|
||||||
case GatewayPayloadType.Authenticate: {
|
case GatewayPayloadType.Authenticate: {
|
||||||
|
if (ws.state.ready) {
|
||||||
|
return closeWithError(ws, gatewayErrors.ALREADY_AUTHENTICATED);
|
||||||
|
}
|
||||||
|
|
||||||
const token = payload.d;
|
const token = payload.d;
|
||||||
if (typeof token !== "string") {
|
if (typeof token !== "string") {
|
||||||
return closeWithBadPayload(ws, "d: expected string");
|
return closeWithBadPayload(ws, "d: expected string");
|
||||||
|
@ -178,7 +208,19 @@ export default function(server: Server) {
|
||||||
if (!user) {
|
if (!user) {
|
||||||
return closeWithError(ws, gatewayErrors.BAD_AUTH);
|
return closeWithError(ws, gatewayErrors.BAD_AUTH);
|
||||||
}
|
}
|
||||||
// each user should have their own list of channels that they join
|
|
||||||
|
let sessions = sessionsByUserId.get(user.id);
|
||||||
|
if (sessions) {
|
||||||
|
if ((sessions.size + 1) > MAX_GATEWAY_SESSIONS_PER_USER) {
|
||||||
|
return closeWithError(ws, gatewayErrors.TOO_MANY_SESSIONS);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sessions = new Set();
|
||||||
|
sessionsByUserId.set(user.id, sessions);
|
||||||
|
}
|
||||||
|
sessions.add(ws);
|
||||||
|
|
||||||
|
// TODO: each user should have their own list of channels that they join
|
||||||
const channels = await query("SELECT id, name, owner_id FROM channels");
|
const channels = await query("SELECT id, name, owner_id FROM channels");
|
||||||
|
|
||||||
clientSubscribe(ws, "*");
|
clientSubscribe(ws, "*");
|
||||||
|
@ -199,7 +241,11 @@ export default function(server: Server) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case GatewayPayloadType.Ping: {
|
case GatewayPayloadType.Ping: {
|
||||||
// TODO: also check session here and ensure packet is sent at the right time
|
if (payload.d !== 0) {
|
||||||
|
return closeWithBadPayload(ws, "d: expected numeric '0'");
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: also check session here
|
||||||
ws.state.alive = true;
|
ws.state.alive = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,12 @@
|
||||||
import { Application, json } from "express";
|
import express, { Application, json } from "express";
|
||||||
import usersRouter from "./routes/api/v1/users";
|
import usersRouter from "./routes/api/v1/users";
|
||||||
import channelsRouter from "./routes/api/v1/channels";
|
import channelsRouter from "./routes/api/v1/channels";
|
||||||
import messagesRouter from "./routes/api/v1/messages";
|
import messagesRouter from "./routes/api/v1/messages";
|
||||||
|
|
||||||
export default function(app: Application) {
|
export default function(app: Application) {
|
||||||
app.get("/", (req, res) => res.send("hello!"));
|
|
||||||
|
|
||||||
app.use(json());
|
app.use(json());
|
||||||
app.use("/api/v1/users", usersRouter);
|
app.use("/api/v1/users", usersRouter);
|
||||||
app.use("/api/v1/channels", channelsRouter);
|
app.use("/api/v1/channels", channelsRouter);
|
||||||
app.use("/api/v1/messages", messagesRouter);
|
app.use("/api/v1/messages", messagesRouter);
|
||||||
|
app.use("/", express.static("frontend/public"));
|
||||||
};
|
};
|
||||||
|
|
3
src/types/gatewayclientstate.d.ts
vendored
3
src/types/gatewayclientstate.d.ts
vendored
|
@ -3,5 +3,6 @@ interface GatewayClientState {
|
||||||
ready: boolean,
|
ready: boolean,
|
||||||
alive: boolean,
|
alive: boolean,
|
||||||
lastAliveCheck: number,
|
lastAliveCheck: number,
|
||||||
dispatchChannels: Set<string>
|
dispatchChannels: Set<string>,
|
||||||
|
messagesSinceLastCheck: number
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue