From b640f742ba931aa3272f969818ae01a609f140a1 Mon Sep 17 00:00:00 2001 From: hippoz <10706925-hippoz@users.noreply.gitlab.com> Date: Tue, 8 Aug 2023 19:33:26 +0300 Subject: [PATCH] improve gateway rpc signals --- frontend/src/components/MessageInput.svelte | 6 +-- frontend/src/gateway.js | 6 ++- frontend/src/styles/global.css | 12 ++--- src/gateway/index.ts | 52 +++++++++++++++++++-- src/impl.ts | 5 +- src/routes/matrix/index.ts | 7 ++- src/rpc/apis/attachments.ts | 6 +-- src/rpc/apis/channels.ts | 22 ++++----- src/rpc/apis/communities.ts | 14 +++--- src/rpc/apis/messages.ts | 10 ++-- src/rpc/apis/users.ts | 6 +-- src/rpc/rpc.ts | 21 +++++++-- 12 files changed, 114 insertions(+), 53 deletions(-) diff --git a/frontend/src/components/MessageInput.svelte b/frontend/src/components/MessageInput.svelte index 96e97ef..b040f18 100644 --- a/frontend/src/components/MessageInput.svelte +++ b/frontend/src/components/MessageInput.svelte @@ -2,7 +2,7 @@ import { onDestroy, onMount } from "svelte"; import { getItem } from "../storage"; import { messagesStoreProvider, overlayStore, setMessageInputEvent, smallViewport, typingStore, userInfoStore, usesKeyboardNavigation } from "../stores"; - import { getErrorFromResponse, methods, remoteBlobUpload, remoteCall, responseOk } from "../request"; + import { getErrorFromResponse, methods, remoteBlobUpload, remoteCall, remoteSignal, responseOk } from "../request"; export let channel; let messageInput = ""; @@ -84,7 +84,7 @@ for (let i = 0; i < filesToUpload.length; i++) { const file = filesToUpload[i].file; - const res = await remoteBlobUpload(methods.createMessageAttachment, file, [messageId, file.name]); + const res = await remoteBlobUpload({...methods.createMessageAttachment, _isSignal: true}, file, [messageId, file.name]); if (!responseOk(res)) { const error = getErrorFromResponse(res); const message = error.validationErrors && error.validationErrors.length ? error.validationErrors[0].msg : error.message; @@ -117,7 +117,7 @@ const messagesStoreForChannel = messagesStoreProvider.getStore(channel.id); messagesStoreForChannel.addMessage(optimisticMessage); - const res = await remoteCall(methods.createChannelMessage, channel.id, content, optimisticMessageId, null, files.length); + const res = await remoteSignal(methods.createChannelMessage, channel.id, content, optimisticMessageId, null, files.length); if (!responseOk(res)) { messagesStoreForChannel.deleteMessage({ diff --git a/frontend/src/gateway.js b/frontend/src/gateway.js index a2e9cd7..be0b567 100644 --- a/frontend/src/gateway.js +++ b/frontend/src/gateway.js @@ -125,9 +125,11 @@ export default { log("ready"); break; } - case GatewayPayloadType.RPCResponse: { + case GatewayPayloadType.RPCResponse: /* through */ + default: { if (this.waitingSerials.get(payload.s)) { - (this.waitingSerials.get(payload.s))(payload.d); + // Any payload with `s` (sequence) can be an RPC response for a single RPC call + (this.waitingSerials.get(payload.s))(payload.t === GatewayPayloadType.RPCResponse ? payload.d : [payload.d]); this.waitingSerials.delete(payload.s); } break; diff --git a/frontend/src/styles/global.css b/frontend/src/styles/global.css index 36a2098..ed2d38c 100644 --- a/frontend/src/styles/global.css +++ b/frontend/src/styles/global.css @@ -146,7 +146,7 @@ body { color: var(--foreground-color-1); background-color: var(--background-color-1); font-size: 100%; - font-family: "Open Sans Variable", "Iosevka Waffle Web", system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Oxygen, Ubuntu, Cantarell, "Open Sans", "Helvetica Neue", sans-serif; + font-family: "Open Sans Variable", system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Oxygen, Ubuntu, Cantarell, "Open Sans", "Helvetica Neue", sans-serif; line-height: 26px; letter-spacing: 0.01em; @@ -537,19 +537,15 @@ body { visibility: visible; } -.sidebar-button:hover { - background-color: var(--background-color-2); -} - +.sidebar-button:hover, .sidebar-button.selected { - color: var(--foreground-color-1); background-color: var(--background-color-2); } - -.sidebar-button.selected .icon-button { +.sidebar-button.selected .sidebar-button-text { color: var(--foreground-color-1); } + .material-icons-outlined, .material-icons { user-select: none; color: var(--foreground-special-color-1); diff --git a/src/gateway/index.ts b/src/gateway/index.ts index 27a1d91..7148120 100644 --- a/src/gateway/index.ts +++ b/src/gateway/index.ts @@ -7,7 +7,7 @@ import { gatewayErrors } from "../errors"; import { GatewayPayload } from "../types/gatewaypayload"; import { GatewayPayloadType, GatewayPresenceStatus } from "./gatewaypayloadtype"; import { GatewayPresenceEntry } from "../types/gatewaypresence"; -import { processMethodBatch } from "../rpc/rpc"; +import { RPCContext, processMethodBatch } from "../rpc/rpc"; import { maxGatewayJsonStringByteLength, maxGatewayJsonStringLength, maxGatewayPayloadByteLength } from "../serverconfig"; const GATEWAY_BATCH_INTERVAL = 50000; @@ -159,7 +159,7 @@ function ensureFormattedGatewayPayload(payload: any): GatewayPayload | null { } -class GatewayClient { +export class GatewayClient { ws: WebSocket; user?: User; ready: boolean; @@ -427,7 +427,33 @@ class GatewayClient { } // RPCSignal is like RPCRequest however it does not send RPC method output unless there is an error - processMethodBatch(this.user, payload.d, (payload.t === GatewayPayloadType.RPCSignal ? true : false), binaryStream).then((results) => { + const isSignal = payload.t === GatewayPayloadType.RPCSignal; + let hasAlreadyRepliedToSelfViaDispatch = false; + + let context: RPCContext | undefined = undefined; + if (isSignal && Array.isArray(payload.d) && payload.d.length === 1 && typeof payload.s === "number") { + const seq = payload.s; + context = { + isRealtime: true, + gatewayDispatch: (chan, message) => { + if (this._dispatchButSignalReplyToSelf(seq, chan, message)) { + hasAlreadyRepliedToSelfViaDispatch = true; + } + } + }; + } + + processMethodBatch(this.user, payload.d, isSignal, binaryStream, context).then((results) => { + if (isSignal && hasAlreadyRepliedToSelfViaDispatch && Array.isArray(results) && results.length === 1 && !(typeof results[0] === "object" && !!results[0] && results[0].code)) { + // There are many signals that dispatch to all users when something was performed. + // Often times, we send a signal to the client, and the server sends *both* the signal success RPCResponse, + // as well as the message that was dispatched to all users from the signal's code. + // For some reason, this really bothers me. So, I added this hacky solution that hooks into the + // gatewayDispatch of the signal, and, if the dispatch happens to reach us, we send the sequence + // of the RPC request, thereby letting the client know "hey, we've handled this signal, + // and this is what was dispatched", which removes the need for the RPCResponse. + return; + } this.send({ t: GatewayPayloadType.RPCResponse, d: results, @@ -441,6 +467,26 @@ class GatewayClient { } } } + + _dispatchButSignalReplyToSelf(seq: number, channel: string, message: GatewayPayload | ((ws: GatewayClient | null) => GatewayPayload)) { + let hasRepliedToSelf = false; + dispatch(channel, (other) => { + let effectivePayload: GatewayPayload; + if (typeof message === "function") effectivePayload = message(other); + else effectivePayload = message; + + if (other === this) { + hasRepliedToSelf = true; + return { + ...effectivePayload, + s: seq + }; + } else { + return effectivePayload; + } + }); + return hasRepliedToSelf; + } } export default function(server: Server) { diff --git a/src/impl.ts b/src/impl.ts index a9befba..0c7ddb3 100644 --- a/src/impl.ts +++ b/src/impl.ts @@ -1,8 +1,9 @@ import { query } from "./database"; import { dispatch } from "./gateway"; import { GatewayPayloadType } from "./gateway/gatewaypayloadtype"; +import { RPCContext } from "./rpc/rpc"; -export default async function sendMessage(user: User, channelId: number, optimisticId: number | null, content: string, nickUsername: string | null, pendingAttachments: number) { +export default async function sendMessage(user: User, channelId: number, optimisticId: number | null, content: string, nickUsername: string | null, pendingAttachments: number, ctx: RPCContext) { const authorId = user.id; const createdAt = Date.now().toString(); @@ -24,7 +25,7 @@ export default async function sendMessage(user: User, channelId: number, optimis attachments: null }; - dispatch(`channel:${channelId}`, (ws) => { + ctx.gatewayDispatch(`channel:${channelId}`, (ws) => { let payload: any = returnObject; if (ws && ws.user && ws.user.id === user.id && optimisticId) { payload = { diff --git a/src/routes/matrix/index.ts b/src/routes/matrix/index.ts index 7b049cf..b40d0cf 100644 --- a/src/routes/matrix/index.ts +++ b/src/routes/matrix/index.ts @@ -2,7 +2,7 @@ import express from "express"; import { authenticateRoute, loginAttempt } from "../../auth"; import { query } from "../../database"; import { getMessagesByChannelAfterPage, getMessagesByChannelFirstPage } from "../../database/templates"; -import { waitForEvent } from "../../gateway"; +import { dispatch, waitForEvent } from "../../gateway"; import cors from "cors"; import sendMessage from "../../impl"; @@ -334,7 +334,10 @@ router.put( error: "Message body must be a string between 1 and 2000 characters" }); } - const message = await sendMessage(req.user, channelId, null, req.body.body, null, 0); + const message = await sendMessage(req.user, channelId, null, req.body.body, null, 0, { + isRealtime: false, + gatewayDispatch: dispatch + }); if (!message) { return res.status(500).json({ errcode: "M_UNKNOWN", diff --git a/src/rpc/apis/attachments.ts b/src/rpc/apis/attachments.ts index 1cf34fa..736dcfb 100644 --- a/src/rpc/apis/attachments.ts +++ b/src/rpc/apis/attachments.ts @@ -1,4 +1,4 @@ -import { bufferSlice, method, string, uint } from "../rpc"; +import { RPCContext, bufferSlice, method, string, uint } from "../rpc"; import { query } from "../../database"; import { errors } from "../../errors"; import { UploadTarget, getSafeUploadPath, sanitizeFilename, supportedImageMime } from "../../uploading"; @@ -16,7 +16,7 @@ const fileType = eval("import('file-type')"); method( "createMessageAttachment", [uint(), string(2, 128), bufferSlice()], - async (user: User, messageId: number, filenameUnsafe: string, inputBuffer: Buffer) => { + async (user: User, messageId: number, filenameUnsafe: string, inputBuffer: Buffer, ctx: RPCContext) => { if (inputBuffer.byteLength >= 16777220) { return { ...errors.BAD_REQUEST, detail: "Uploaded file exceeds 16MiB limit." }; } @@ -112,7 +112,7 @@ method( attachmentObject ]; - dispatch(`channel:${messageCheckResult.rows[0].channel_id}`, { + ctx.gatewayDispatch(`channel:${messageCheckResult.rows[0].channel_id}`, { t: GatewayPayloadType.MessageUpdate, d: messageCheckResult.rows[0] }); diff --git a/src/rpc/apis/channels.ts b/src/rpc/apis/channels.ts index 641cb09..90647b5 100644 --- a/src/rpc/apis/channels.ts +++ b/src/rpc/apis/channels.ts @@ -1,4 +1,4 @@ -import { channelNameRegex, method, int, string, uint, withOptional, withRegexp } from "../rpc"; +import { channelNameRegex, method, int, string, uint, withOptional, withRegexp, RPCContext } from "../rpc"; import { query } from "../../database"; import { getMessagesByChannelFirstPage, getMessagesByChannelPage } from "../../database/templates"; import { errors } from "../../errors"; @@ -10,7 +10,7 @@ import serverConfig from "../../serverconfig"; method( "createChannel", [withRegexp(channelNameRegex, string(1, 32)), withOptional(uint())], - async (user: User, name: string, communityId: number | null) => { + async (user: User, name: string, communityId: number | null, ctx: RPCContext) => { if (serverConfig.superuserRequirement.createChannel && !user.is_superuser) { return errors.FORBIDDEN_DUE_TO_MISSING_PERMISSIONS; } @@ -20,7 +20,7 @@ method( return errors.GOT_NO_DATABASE_DATA; } - dispatch("*", { + ctx.gatewayDispatch("*", { t: GatewayPayloadType.ChannelCreate, d: result.rows[0] }); @@ -36,7 +36,7 @@ method( method( "updateChannelName", [uint(), withRegexp(channelNameRegex, string(1, 32))], - async (user: User, id: number, name: string) => { + async (user: User, id: number, name: string, ctx: RPCContext) => { const permissionCheckResult = await query("SELECT owner_id FROM channels WHERE id = $1", [id]); if (!permissionCheckResult || permissionCheckResult.rowCount < 1) { return errors.NOT_FOUND; @@ -50,7 +50,7 @@ method( return errors.GOT_NO_DATABASE_DATA; } - dispatch(`channel:${id}`, { + ctx.gatewayDispatch(`channel:${id}`, { t: GatewayPayloadType.ChannelUpdate, d: result.rows[0] }); @@ -62,7 +62,7 @@ method( method( "deleteChannel", [uint()], - async (user: User, id: number) => { + async (user: User, id: number, ctx: RPCContext) => { const permissionCheckResult = await query("SELECT owner_id FROM channels WHERE id = $1", [id]); if (!permissionCheckResult || permissionCheckResult.rowCount < 1) { return errors.NOT_FOUND; @@ -76,7 +76,7 @@ method( return errors.GOT_NO_DATABASE_DATA; } - dispatch(`channel:${id}`, { + ctx.gatewayDispatch(`channel:${id}`, { t: GatewayPayloadType.ChannelDelete, d: {id} }); @@ -111,8 +111,8 @@ method( method( "createChannelMessage", [uint(), string(1, 4000), withOptional(uint()), withOptional(string(1, 64)), withOptional(uint())], - async (user: User, id: number, content: string, optimistic_id: number | null, nick_username: string | null, pending_attachments: number | null) => { - return await sendMessage(user, id, optimistic_id, content, nick_username, pending_attachments ?? 0); + async (user: User, id: number, content: string, optimistic_id: number | null, nick_username: string | null, pending_attachments: number | null, ctx: RPCContext) => { + return await sendMessage(user, id, optimistic_id, content, nick_username, pending_attachments ?? 0, ctx); } ); @@ -139,8 +139,8 @@ method( method( "putChannelTyping", [uint()], - async (user: User, channelId: number) => { - dispatch(`channel:${channelId}`, { + async (user: User, channelId: number, ctx: RPCContext) => { + ctx.gatewayDispatch(`channel:${channelId}`, { t: GatewayPayloadType.TypingStart, d: { user: { diff --git a/src/rpc/apis/communities.ts b/src/rpc/apis/communities.ts index b07f54b..70fea4c 100644 --- a/src/rpc/apis/communities.ts +++ b/src/rpc/apis/communities.ts @@ -1,4 +1,4 @@ -import { channelNameRegex, method, int, string, uint, withRegexp } from "../rpc"; +import { channelNameRegex, method, int, string, uint, withRegexp, RPCContext } from "../rpc"; import { query } from "../../database"; import { errors } from "../../errors"; import { dispatch, dispatchChannelSubscribe } from "../../gateway"; @@ -8,7 +8,7 @@ import serverConfig from "../../serverconfig"; method( "createCommunity", [withRegexp(channelNameRegex, string(1, 64))], - async (user: User, name: string) => { + async (user: User, name: string, ctx: RPCContext) => { if (serverConfig.superuserRequirement.createChannel && !user.is_superuser) { return errors.FORBIDDEN_DUE_TO_MISSING_PERMISSIONS; } @@ -17,7 +17,7 @@ method( return errors.GOT_NO_DATABASE_DATA; } - dispatch("*", { + ctx.gatewayDispatch("*", { t: GatewayPayloadType.CommunityCreate, d: result.rows[0] }); @@ -31,7 +31,7 @@ method( method( "updateCommunityName", [uint(), withRegexp(channelNameRegex, string(1, 32))], - async (user: User, id: number, name: string) => { + async (user: User, id: number, name: string, ctx: RPCContext) => { const permissionCheckResult = await query("SELECT owner_id FROM communities WHERE id = $1", [id]); if (!permissionCheckResult || permissionCheckResult.rowCount < 1) { return errors.NOT_FOUND; @@ -45,7 +45,7 @@ method( return errors.GOT_NO_DATABASE_DATA; } - dispatch(`community:${id}`, { + ctx.gatewayDispatch(`community:${id}`, { t: GatewayPayloadType.CommunityUpdate, d: result.rows[0] }); @@ -57,7 +57,7 @@ method( method( "deleteCommunity", [uint()], - async (user: User, id: number) => { + async (user: User, id: number, ctx: RPCContext) => { const permissionCheckResult = await query("SELECT owner_id FROM communities WHERE id = $1", [id]); if (!permissionCheckResult || permissionCheckResult.rowCount < 1) { return errors.NOT_FOUND; @@ -71,7 +71,7 @@ method( return errors.GOT_NO_DATABASE_DATA; } - dispatch(`community:${id}`, { + ctx.gatewayDispatch(`community:${id}`, { t: GatewayPayloadType.CommunityDelete, d: {id} }); diff --git a/src/rpc/apis/messages.ts b/src/rpc/apis/messages.ts index 91b9621..09362df 100644 --- a/src/rpc/apis/messages.ts +++ b/src/rpc/apis/messages.ts @@ -1,4 +1,4 @@ -import { method, string, uint } from "./../rpc"; +import { RPCContext, method, string, uint } from "./../rpc"; import { query } from "../../database"; import { getMessageById } from "../../database/templates"; import { errors } from "../../errors"; @@ -11,7 +11,7 @@ import { UploadTarget, getSafeUploadPath } from "../../uploading"; method( "deleteMessage", [uint()], - async (user: User, id: number) => { + async (user: User, id: number, ctx: RPCContext) => { const messageCheckResult = await query(getMessageById, [id]); if (!messageCheckResult || messageCheckResult.rowCount < 1) { return errors.NOT_FOUND; @@ -46,7 +46,7 @@ method( return errors.GOT_NO_DATABASE_DATA; } - dispatch(`channel:${message.channel_id}`, { + ctx.gatewayDispatch(`channel:${message.channel_id}`, { t: GatewayPayloadType.MessageDelete, d: { id, @@ -61,7 +61,7 @@ method( method( "updateMessageContent", [uint(), string(1, 4000)], - async (user: User, id: number, content: string) => { + async (user: User, id: number, content: string, ctx: RPCContext) => { const permissionCheckResult = await query(getMessageById, [id]); if (!permissionCheckResult || permissionCheckResult.rowCount < 1) { return errors.NOT_FOUND; @@ -80,7 +80,7 @@ method( content }; - dispatch(`channel:${permissionCheckResult.rows[0].channel_id}`, { + ctx.gatewayDispatch(`channel:${permissionCheckResult.rows[0].channel_id}`, { t: GatewayPayloadType.MessageUpdate, d: returnObject }); diff --git a/src/rpc/apis/users.ts b/src/rpc/apis/users.ts index 9e98409..462a32b 100644 --- a/src/rpc/apis/users.ts +++ b/src/rpc/apis/users.ts @@ -2,7 +2,7 @@ import { errors } from "../../errors"; import { query } from "../../database"; import { compare, hash, hashSync } from "bcrypt"; import { getPublicUserObject, loginAttempt } from "../../auth"; -import { bufferSlice, method, methodButWarningDoesNotAuthenticate, string, usernameRegex, withRegexp } from "./../rpc"; +import { RPCContext, bufferSlice, method, methodButWarningDoesNotAuthenticate, string, usernameRegex, withRegexp } from "./../rpc"; import sharp from "sharp"; import path from "path"; import { randomBytes } from "crypto"; @@ -92,7 +92,7 @@ const profilePictureSizes = [ method( "putUserAvatar", [bufferSlice()], - async (user: User, buffer: Buffer) => { + async (user: User, buffer: Buffer, ctx: RPCContext) => { if (buffer.byteLength >= 3145728) { // buffer exceeds 3MiB return { ...errors.BAD_REQUEST, detail: "Uploaded file exceeds 3MiB limit." }; @@ -148,7 +148,7 @@ method( user.avatar = avatarId; - dispatch("*", { + ctx.gatewayDispatch("*", { t: GatewayPayloadType.UserUpdate, d: getPublicUserObject(user), }); diff --git a/src/rpc/rpc.ts b/src/rpc/rpc.ts index d028418..47a9d55 100644 --- a/src/rpc/rpc.ts +++ b/src/rpc/rpc.ts @@ -1,5 +1,7 @@ import { errors } from "../errors"; +import { GatewayClient, dispatch } from "../gateway"; import { maxBufferByteLength } from "../serverconfig"; +import { GatewayPayload } from "../types/gatewaypayload"; export const alphanumericRegex = new RegExp(/^[a-z0-9]+$/i); export const usernameRegex = new RegExp(/^[a-z0-9_]+$/i); @@ -19,6 +21,17 @@ export const withOptional = (arg: RPCArgument): RPCArgument => ({ ...arg, isOpti const isInt = (val: any) => typeof val === "number" && Number.isSafeInteger(val); const isUint = (val: any) => (isInt(val) && val >= 0); +export interface RPCContext { + isRealtime: boolean, + gatewayDispatch: (channel: string, message: GatewayPayload | ((ws: GatewayClient | null) => GatewayPayload)) => void, +} + +const defaultRPCContext: RPCContext = { + isRealtime: false, + gatewayDispatch: (...a) => dispatch(...a) +}; + + enum RPCArgumentType { Integer, String, @@ -64,7 +77,7 @@ export const methodButWarningDoesNotAuthenticate = (name: string, args: RPCArgum return method(name, args, func, false); }; -export const userInvokeMethod = async (user: User | null, methodId: number, args: any[], buffer: Buffer | null) => { +export const userInvokeMethod = async (user: User | null, methodId: number, args: any[], buffer: Buffer | null, context: RPCContext) => { const methodData = methods.get(methodId); if (!methodData) return { ...errors.BAD_REQUEST, @@ -161,7 +174,7 @@ export const userInvokeMethod = async (user: User | null, methodId: number, args } if (user) { - return await ((methodData.func)(user, ...args)); + return await ((methodData.func)(user, ...args, context)); } else if (!user && !methodData.requiresAuthentication) { return await ((methodData.func)(...args)); } else { @@ -169,7 +182,7 @@ export const userInvokeMethod = async (user: User | null, methodId: number, args } }; -export const processMethodBatch = async (user: User | null, calls: any, ignoreNonErrors = false, buffer: Buffer | null) => { +export const processMethodBatch = async (user: User | null, calls: any, ignoreNonErrors = false, buffer: Buffer | null, context: RPCContext = defaultRPCContext) => { if (!Array.isArray(calls) || !calls.length || calls.length > 5) { return { ...errors.BAD_REQUEST, @@ -195,7 +208,7 @@ export const processMethodBatch = async (user: User | null, calls: any, ignoreNo return; } - const promise = userInvokeMethod(user, call[0], call.slice(1, call.length), buffer); + const promise = userInvokeMethod(user, call[0], call.slice(1, call.length), buffer, context); promise.then(value => { if (ignoreNonErrors && !value.code) { responses[index] = null;