improve gateway rpc signals

This commit is contained in:
hippoz 2023-08-08 19:33:26 +03:00
parent dd4ea1cd79
commit b640f742ba
Signed by: hippoz
GPG key ID: 56C4E02A85F2FBED
12 changed files with 114 additions and 53 deletions

View file

@ -2,7 +2,7 @@
import { onDestroy, onMount } from "svelte";
import { getItem } from "../storage";
import { messagesStoreProvider, overlayStore, setMessageInputEvent, smallViewport, typingStore, userInfoStore, usesKeyboardNavigation } from "../stores";
import { getErrorFromResponse, methods, remoteBlobUpload, remoteCall, responseOk } from "../request";
import { getErrorFromResponse, methods, remoteBlobUpload, remoteCall, remoteSignal, responseOk } from "../request";
export let channel;
let messageInput = "";
@ -84,7 +84,7 @@
for (let i = 0; i < filesToUpload.length; i++) {
const file = filesToUpload[i].file;
const res = await remoteBlobUpload(methods.createMessageAttachment, file, [messageId, file.name]);
const res = await remoteBlobUpload({...methods.createMessageAttachment, _isSignal: true}, file, [messageId, file.name]);
if (!responseOk(res)) {
const error = getErrorFromResponse(res);
const message = error.validationErrors && error.validationErrors.length ? error.validationErrors[0].msg : error.message;
@ -117,7 +117,7 @@
const messagesStoreForChannel = messagesStoreProvider.getStore(channel.id);
messagesStoreForChannel.addMessage(optimisticMessage);
const res = await remoteCall(methods.createChannelMessage, channel.id, content, optimisticMessageId, null, files.length);
const res = await remoteSignal(methods.createChannelMessage, channel.id, content, optimisticMessageId, null, files.length);
if (!responseOk(res)) {
messagesStoreForChannel.deleteMessage({

View file

@ -125,9 +125,11 @@ export default {
log("ready");
break;
}
case GatewayPayloadType.RPCResponse: {
case GatewayPayloadType.RPCResponse: /* through */
default: {
if (this.waitingSerials.get(payload.s)) {
(this.waitingSerials.get(payload.s))(payload.d);
// Any payload with `s` (sequence) can be an RPC response for a single RPC call
(this.waitingSerials.get(payload.s))(payload.t === GatewayPayloadType.RPCResponse ? payload.d : [payload.d]);
this.waitingSerials.delete(payload.s);
}
break;

View file

@ -146,7 +146,7 @@ body {
color: var(--foreground-color-1);
background-color: var(--background-color-1);
font-size: 100%;
font-family: "Open Sans Variable", "Iosevka Waffle Web", system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Oxygen, Ubuntu, Cantarell, "Open Sans", "Helvetica Neue", sans-serif;
font-family: "Open Sans Variable", system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Oxygen, Ubuntu, Cantarell, "Open Sans", "Helvetica Neue", sans-serif;
line-height: 26px;
letter-spacing: 0.01em;
@ -537,19 +537,15 @@ body {
visibility: visible;
}
.sidebar-button:hover {
background-color: var(--background-color-2);
}
.sidebar-button:hover,
.sidebar-button.selected {
color: var(--foreground-color-1);
background-color: var(--background-color-2);
}
.sidebar-button.selected .icon-button {
.sidebar-button.selected .sidebar-button-text {
color: var(--foreground-color-1);
}
.material-icons-outlined, .material-icons {
user-select: none;
color: var(--foreground-special-color-1);

View file

@ -7,7 +7,7 @@ import { gatewayErrors } from "../errors";
import { GatewayPayload } from "../types/gatewaypayload";
import { GatewayPayloadType, GatewayPresenceStatus } from "./gatewaypayloadtype";
import { GatewayPresenceEntry } from "../types/gatewaypresence";
import { processMethodBatch } from "../rpc/rpc";
import { RPCContext, processMethodBatch } from "../rpc/rpc";
import { maxGatewayJsonStringByteLength, maxGatewayJsonStringLength, maxGatewayPayloadByteLength } from "../serverconfig";
const GATEWAY_BATCH_INTERVAL = 50000;
@ -159,7 +159,7 @@ function ensureFormattedGatewayPayload(payload: any): GatewayPayload | null {
}
class GatewayClient {
export class GatewayClient {
ws: WebSocket;
user?: User;
ready: boolean;
@ -427,7 +427,33 @@ class GatewayClient {
}
// RPCSignal is like RPCRequest however it does not send RPC method output unless there is an error
processMethodBatch(this.user, payload.d, (payload.t === GatewayPayloadType.RPCSignal ? true : false), binaryStream).then((results) => {
const isSignal = payload.t === GatewayPayloadType.RPCSignal;
let hasAlreadyRepliedToSelfViaDispatch = false;
let context: RPCContext | undefined = undefined;
if (isSignal && Array.isArray(payload.d) && payload.d.length === 1 && typeof payload.s === "number") {
const seq = payload.s;
context = {
isRealtime: true,
gatewayDispatch: (chan, message) => {
if (this._dispatchButSignalReplyToSelf(seq, chan, message)) {
hasAlreadyRepliedToSelfViaDispatch = true;
}
}
};
}
processMethodBatch(this.user, payload.d, isSignal, binaryStream, context).then((results) => {
if (isSignal && hasAlreadyRepliedToSelfViaDispatch && Array.isArray(results) && results.length === 1 && !(typeof results[0] === "object" && !!results[0] && results[0].code)) {
// There are many signals that dispatch to all users when something was performed.
// Often times, we send a signal to the client, and the server sends *both* the signal success RPCResponse,
// as well as the message that was dispatched to all users from the signal's code.
// For some reason, this really bothers me. So, I added this hacky solution that hooks into the
// gatewayDispatch of the signal, and, if the dispatch happens to reach us, we send the sequence
// of the RPC request, thereby letting the client know "hey, we've handled this signal,
// and this is what was dispatched", which removes the need for the RPCResponse.
return;
}
this.send({
t: GatewayPayloadType.RPCResponse,
d: results,
@ -441,6 +467,26 @@ class GatewayClient {
}
}
}
_dispatchButSignalReplyToSelf(seq: number, channel: string, message: GatewayPayload | ((ws: GatewayClient | null) => GatewayPayload)) {
let hasRepliedToSelf = false;
dispatch(channel, (other) => {
let effectivePayload: GatewayPayload;
if (typeof message === "function") effectivePayload = message(other);
else effectivePayload = message;
if (other === this) {
hasRepliedToSelf = true;
return {
...effectivePayload,
s: seq
};
} else {
return effectivePayload;
}
});
return hasRepliedToSelf;
}
}
export default function(server: Server) {

View file

@ -1,8 +1,9 @@
import { query } from "./database";
import { dispatch } from "./gateway";
import { GatewayPayloadType } from "./gateway/gatewaypayloadtype";
import { RPCContext } from "./rpc/rpc";
export default async function sendMessage(user: User, channelId: number, optimisticId: number | null, content: string, nickUsername: string | null, pendingAttachments: number) {
export default async function sendMessage(user: User, channelId: number, optimisticId: number | null, content: string, nickUsername: string | null, pendingAttachments: number, ctx: RPCContext) {
const authorId = user.id;
const createdAt = Date.now().toString();
@ -24,7 +25,7 @@ export default async function sendMessage(user: User, channelId: number, optimis
attachments: null
};
dispatch(`channel:${channelId}`, (ws) => {
ctx.gatewayDispatch(`channel:${channelId}`, (ws) => {
let payload: any = returnObject;
if (ws && ws.user && ws.user.id === user.id && optimisticId) {
payload = {

View file

@ -2,7 +2,7 @@ import express from "express";
import { authenticateRoute, loginAttempt } from "../../auth";
import { query } from "../../database";
import { getMessagesByChannelAfterPage, getMessagesByChannelFirstPage } from "../../database/templates";
import { waitForEvent } from "../../gateway";
import { dispatch, waitForEvent } from "../../gateway";
import cors from "cors";
import sendMessage from "../../impl";
@ -334,7 +334,10 @@ router.put(
error: "Message body must be a string between 1 and 2000 characters"
});
}
const message = await sendMessage(req.user, channelId, null, req.body.body, null, 0);
const message = await sendMessage(req.user, channelId, null, req.body.body, null, 0, {
isRealtime: false,
gatewayDispatch: dispatch
});
if (!message) {
return res.status(500).json({
errcode: "M_UNKNOWN",

View file

@ -1,4 +1,4 @@
import { bufferSlice, method, string, uint } from "../rpc";
import { RPCContext, bufferSlice, method, string, uint } from "../rpc";
import { query } from "../../database";
import { errors } from "../../errors";
import { UploadTarget, getSafeUploadPath, sanitizeFilename, supportedImageMime } from "../../uploading";
@ -16,7 +16,7 @@ const fileType = eval("import('file-type')");
method(
"createMessageAttachment",
[uint(), string(2, 128), bufferSlice()],
async (user: User, messageId: number, filenameUnsafe: string, inputBuffer: Buffer) => {
async (user: User, messageId: number, filenameUnsafe: string, inputBuffer: Buffer, ctx: RPCContext) => {
if (inputBuffer.byteLength >= 16777220) {
return { ...errors.BAD_REQUEST, detail: "Uploaded file exceeds 16MiB limit." };
}
@ -112,7 +112,7 @@ method(
attachmentObject
];
dispatch(`channel:${messageCheckResult.rows[0].channel_id}`, {
ctx.gatewayDispatch(`channel:${messageCheckResult.rows[0].channel_id}`, {
t: GatewayPayloadType.MessageUpdate,
d: messageCheckResult.rows[0]
});

View file

@ -1,4 +1,4 @@
import { channelNameRegex, method, int, string, uint, withOptional, withRegexp } from "../rpc";
import { channelNameRegex, method, int, string, uint, withOptional, withRegexp, RPCContext } from "../rpc";
import { query } from "../../database";
import { getMessagesByChannelFirstPage, getMessagesByChannelPage } from "../../database/templates";
import { errors } from "../../errors";
@ -10,7 +10,7 @@ import serverConfig from "../../serverconfig";
method(
"createChannel",
[withRegexp(channelNameRegex, string(1, 32)), withOptional(uint())],
async (user: User, name: string, communityId: number | null) => {
async (user: User, name: string, communityId: number | null, ctx: RPCContext) => {
if (serverConfig.superuserRequirement.createChannel && !user.is_superuser) {
return errors.FORBIDDEN_DUE_TO_MISSING_PERMISSIONS;
}
@ -20,7 +20,7 @@ method(
return errors.GOT_NO_DATABASE_DATA;
}
dispatch("*", {
ctx.gatewayDispatch("*", {
t: GatewayPayloadType.ChannelCreate,
d: result.rows[0]
});
@ -36,7 +36,7 @@ method(
method(
"updateChannelName",
[uint(), withRegexp(channelNameRegex, string(1, 32))],
async (user: User, id: number, name: string) => {
async (user: User, id: number, name: string, ctx: RPCContext) => {
const permissionCheckResult = await query("SELECT owner_id FROM channels WHERE id = $1", [id]);
if (!permissionCheckResult || permissionCheckResult.rowCount < 1) {
return errors.NOT_FOUND;
@ -50,7 +50,7 @@ method(
return errors.GOT_NO_DATABASE_DATA;
}
dispatch(`channel:${id}`, {
ctx.gatewayDispatch(`channel:${id}`, {
t: GatewayPayloadType.ChannelUpdate,
d: result.rows[0]
});
@ -62,7 +62,7 @@ method(
method(
"deleteChannel",
[uint()],
async (user: User, id: number) => {
async (user: User, id: number, ctx: RPCContext) => {
const permissionCheckResult = await query("SELECT owner_id FROM channels WHERE id = $1", [id]);
if (!permissionCheckResult || permissionCheckResult.rowCount < 1) {
return errors.NOT_FOUND;
@ -76,7 +76,7 @@ method(
return errors.GOT_NO_DATABASE_DATA;
}
dispatch(`channel:${id}`, {
ctx.gatewayDispatch(`channel:${id}`, {
t: GatewayPayloadType.ChannelDelete,
d: {id}
});
@ -111,8 +111,8 @@ method(
method(
"createChannelMessage",
[uint(), string(1, 4000), withOptional(uint()), withOptional(string(1, 64)), withOptional(uint())],
async (user: User, id: number, content: string, optimistic_id: number | null, nick_username: string | null, pending_attachments: number | null) => {
return await sendMessage(user, id, optimistic_id, content, nick_username, pending_attachments ?? 0);
async (user: User, id: number, content: string, optimistic_id: number | null, nick_username: string | null, pending_attachments: number | null, ctx: RPCContext) => {
return await sendMessage(user, id, optimistic_id, content, nick_username, pending_attachments ?? 0, ctx);
}
);
@ -139,8 +139,8 @@ method(
method(
"putChannelTyping",
[uint()],
async (user: User, channelId: number) => {
dispatch(`channel:${channelId}`, {
async (user: User, channelId: number, ctx: RPCContext) => {
ctx.gatewayDispatch(`channel:${channelId}`, {
t: GatewayPayloadType.TypingStart,
d: {
user: {

View file

@ -1,4 +1,4 @@
import { channelNameRegex, method, int, string, uint, withRegexp } from "../rpc";
import { channelNameRegex, method, int, string, uint, withRegexp, RPCContext } from "../rpc";
import { query } from "../../database";
import { errors } from "../../errors";
import { dispatch, dispatchChannelSubscribe } from "../../gateway";
@ -8,7 +8,7 @@ import serverConfig from "../../serverconfig";
method(
"createCommunity",
[withRegexp(channelNameRegex, string(1, 64))],
async (user: User, name: string) => {
async (user: User, name: string, ctx: RPCContext) => {
if (serverConfig.superuserRequirement.createChannel && !user.is_superuser) {
return errors.FORBIDDEN_DUE_TO_MISSING_PERMISSIONS;
}
@ -17,7 +17,7 @@ method(
return errors.GOT_NO_DATABASE_DATA;
}
dispatch("*", {
ctx.gatewayDispatch("*", {
t: GatewayPayloadType.CommunityCreate,
d: result.rows[0]
});
@ -31,7 +31,7 @@ method(
method(
"updateCommunityName",
[uint(), withRegexp(channelNameRegex, string(1, 32))],
async (user: User, id: number, name: string) => {
async (user: User, id: number, name: string, ctx: RPCContext) => {
const permissionCheckResult = await query("SELECT owner_id FROM communities WHERE id = $1", [id]);
if (!permissionCheckResult || permissionCheckResult.rowCount < 1) {
return errors.NOT_FOUND;
@ -45,7 +45,7 @@ method(
return errors.GOT_NO_DATABASE_DATA;
}
dispatch(`community:${id}`, {
ctx.gatewayDispatch(`community:${id}`, {
t: GatewayPayloadType.CommunityUpdate,
d: result.rows[0]
});
@ -57,7 +57,7 @@ method(
method(
"deleteCommunity",
[uint()],
async (user: User, id: number) => {
async (user: User, id: number, ctx: RPCContext) => {
const permissionCheckResult = await query("SELECT owner_id FROM communities WHERE id = $1", [id]);
if (!permissionCheckResult || permissionCheckResult.rowCount < 1) {
return errors.NOT_FOUND;
@ -71,7 +71,7 @@ method(
return errors.GOT_NO_DATABASE_DATA;
}
dispatch(`community:${id}`, {
ctx.gatewayDispatch(`community:${id}`, {
t: GatewayPayloadType.CommunityDelete,
d: {id}
});

View file

@ -1,4 +1,4 @@
import { method, string, uint } from "./../rpc";
import { RPCContext, method, string, uint } from "./../rpc";
import { query } from "../../database";
import { getMessageById } from "../../database/templates";
import { errors } from "../../errors";
@ -11,7 +11,7 @@ import { UploadTarget, getSafeUploadPath } from "../../uploading";
method(
"deleteMessage",
[uint()],
async (user: User, id: number) => {
async (user: User, id: number, ctx: RPCContext) => {
const messageCheckResult = await query(getMessageById, [id]);
if (!messageCheckResult || messageCheckResult.rowCount < 1) {
return errors.NOT_FOUND;
@ -46,7 +46,7 @@ method(
return errors.GOT_NO_DATABASE_DATA;
}
dispatch(`channel:${message.channel_id}`, {
ctx.gatewayDispatch(`channel:${message.channel_id}`, {
t: GatewayPayloadType.MessageDelete,
d: {
id,
@ -61,7 +61,7 @@ method(
method(
"updateMessageContent",
[uint(), string(1, 4000)],
async (user: User, id: number, content: string) => {
async (user: User, id: number, content: string, ctx: RPCContext) => {
const permissionCheckResult = await query(getMessageById, [id]);
if (!permissionCheckResult || permissionCheckResult.rowCount < 1) {
return errors.NOT_FOUND;
@ -80,7 +80,7 @@ method(
content
};
dispatch(`channel:${permissionCheckResult.rows[0].channel_id}`, {
ctx.gatewayDispatch(`channel:${permissionCheckResult.rows[0].channel_id}`, {
t: GatewayPayloadType.MessageUpdate,
d: returnObject
});

View file

@ -2,7 +2,7 @@ import { errors } from "../../errors";
import { query } from "../../database";
import { compare, hash, hashSync } from "bcrypt";
import { getPublicUserObject, loginAttempt } from "../../auth";
import { bufferSlice, method, methodButWarningDoesNotAuthenticate, string, usernameRegex, withRegexp } from "./../rpc";
import { RPCContext, bufferSlice, method, methodButWarningDoesNotAuthenticate, string, usernameRegex, withRegexp } from "./../rpc";
import sharp from "sharp";
import path from "path";
import { randomBytes } from "crypto";
@ -92,7 +92,7 @@ const profilePictureSizes = [
method(
"putUserAvatar",
[bufferSlice()],
async (user: User, buffer: Buffer) => {
async (user: User, buffer: Buffer, ctx: RPCContext) => {
if (buffer.byteLength >= 3145728) {
// buffer exceeds 3MiB
return { ...errors.BAD_REQUEST, detail: "Uploaded file exceeds 3MiB limit." };
@ -148,7 +148,7 @@ method(
user.avatar = avatarId;
dispatch("*", {
ctx.gatewayDispatch("*", {
t: GatewayPayloadType.UserUpdate,
d: getPublicUserObject(user),
});

View file

@ -1,5 +1,7 @@
import { errors } from "../errors";
import { GatewayClient, dispatch } from "../gateway";
import { maxBufferByteLength } from "../serverconfig";
import { GatewayPayload } from "../types/gatewaypayload";
export const alphanumericRegex = new RegExp(/^[a-z0-9]+$/i);
export const usernameRegex = new RegExp(/^[a-z0-9_]+$/i);
@ -19,6 +21,17 @@ export const withOptional = (arg: RPCArgument): RPCArgument => ({ ...arg, isOpti
const isInt = (val: any) => typeof val === "number" && Number.isSafeInteger(val);
const isUint = (val: any) => (isInt(val) && val >= 0);
export interface RPCContext {
isRealtime: boolean,
gatewayDispatch: (channel: string, message: GatewayPayload | ((ws: GatewayClient | null) => GatewayPayload)) => void,
}
const defaultRPCContext: RPCContext = {
isRealtime: false,
gatewayDispatch: (...a) => dispatch(...a)
};
enum RPCArgumentType {
Integer,
String,
@ -64,7 +77,7 @@ export const methodButWarningDoesNotAuthenticate = (name: string, args: RPCArgum
return method(name, args, func, false);
};
export const userInvokeMethod = async (user: User | null, methodId: number, args: any[], buffer: Buffer | null) => {
export const userInvokeMethod = async (user: User | null, methodId: number, args: any[], buffer: Buffer | null, context: RPCContext) => {
const methodData = methods.get(methodId);
if (!methodData) return {
...errors.BAD_REQUEST,
@ -161,7 +174,7 @@ export const userInvokeMethod = async (user: User | null, methodId: number, args
}
if (user) {
return await ((methodData.func)(user, ...args));
return await ((methodData.func)(user, ...args, context));
} else if (!user && !methodData.requiresAuthentication) {
return await ((methodData.func)(...args));
} else {
@ -169,7 +182,7 @@ export const userInvokeMethod = async (user: User | null, methodId: number, args
}
};
export const processMethodBatch = async (user: User | null, calls: any, ignoreNonErrors = false, buffer: Buffer | null) => {
export const processMethodBatch = async (user: User | null, calls: any, ignoreNonErrors = false, buffer: Buffer | null, context: RPCContext = defaultRPCContext) => {
if (!Array.isArray(calls) || !calls.length || calls.length > 5) {
return {
...errors.BAD_REQUEST,
@ -195,7 +208,7 @@ export const processMethodBatch = async (user: User | null, calls: any, ignoreNo
return;
}
const promise = userInvokeMethod(user, call[0], call.slice(1, call.length), buffer);
const promise = userInvokeMethod(user, call[0], call.slice(1, call.length), buffer, context);
promise.then(value => {
if (ignoreNonErrors && !value.code) {
responses[index] = null;