From 5a5588f1e3f0c12f61b7f856e3aedfec3d87a41c Mon Sep 17 00:00:00 2001 From: hippoz <10706925-hippoz@users.noreply.gitlab.com> Date: Fri, 17 Mar 2023 22:46:49 +0200 Subject: [PATCH] fix and improve matrix layer --- src/routes/matrix/index.ts | 109 ++++++++++++++++++++----------------- 1 file changed, 60 insertions(+), 49 deletions(-) diff --git a/src/routes/matrix/index.ts b/src/routes/matrix/index.ts index d932c1e..9e5ff07 100644 --- a/src/routes/matrix/index.ts +++ b/src/routes/matrix/index.ts @@ -12,6 +12,9 @@ const router = express.Router(); router.use(cors()); +const isInt = (val: any) => typeof val === "number" && Number.isSafeInteger(val); +const isUint = (val: any) => (isInt(val) && val >= 0); + const matrixProtocol = process.env.MATRIX_PROTOCOL ? process.env.MATRIX_PROTOCOL : "http"; const matrixHomeserverBaseUrl = process.env.MATRIX_HOMESERVER_BASE_URL ? process.env.MATRIX_HOMESERVER_BASE_URL : "localhost:3000"; const matrixWaffleAppUrl = process.env.MATRIX_WAFFLE_APP_URL ? process.env.MATRIX_WAFFLE_APP_URL : "localhost:3000"; @@ -47,22 +50,60 @@ const matrixUserProfile = (matrixUserId: string) => { }; }; +const cursorType = "waffletrackv1"; + interface MatrixSyncCursors { - [channel_id: number]: number; + type: string, + chan: {[channel_id: number]: number}; }; -async function buildSyncPayload(user: User, cursors: MatrixSyncCursors, onlyOutstandingEvents: boolean, client: PoolClient, channels: Channel[]) { +function parseCursor(cursor: string | null): MatrixSyncCursors | null { + if (!cursor) { + return null; + } + let json; + try { + json = JSON.parse(cursor); + } catch(O_o) { + return null; + } + + if (json.type !== "waffletrackv1") { + return null; + } + + if (typeof json.chan !== "object") { + return null; + } + + for (const [key, value] of Object.entries(json.chan)) { + if (!isUint(parseInt(key)) || !isUint(value)) { + return null; + } + } + + return json; +} + +function cursorNew(): MatrixSyncCursors { + return { + type: cursorType, + chan: {} + } +} + +async function buildSyncPayload(user: User, cursors: MatrixSyncCursors, onlyOutstandingEvents: boolean, channels: Channel[]) { const joinedChannels: any[any] = {}; - let nextBatchCursors = ""; + const nextBatchCursors = cursorNew(); for (let i = 0; i < channels.length; i++) { const channel = channels[i]; const roomId = `!${channel.id}:${matrixHomeserverBaseUrl}`; let channelMessagesResult; - if (cursors[channel.id]) { - channelMessagesResult = await client.query(getMessagesByChannelAfterPage(50), [cursors[channel.id], channel.id]); + if (cursors.chan[channel.id]) { + channelMessagesResult = await query(getMessagesByChannelAfterPage(50), [cursors.chan[channel.id], channel.id]); } else { - channelMessagesResult = await client.query(getMessagesByChannelFirstPage(50), [channel.id]); + channelMessagesResult = await query(getMessagesByChannelFirstPage(50), [channel.id]); } const messages = channelMessagesResult && channelMessagesResult.rows ? channelMessagesResult.rows.reverse() : []; const messagesTimeline = messages.map(e => ({ @@ -78,9 +119,9 @@ async function buildSyncPayload(user: User, cursors: MatrixSyncCursors, onlyOuts })); if (messages.length > 0) { - nextBatchCursors += `${channel.id}:${messages[messages.length - 1].id};`; - } else if (cursors[channel.id]) { - nextBatchCursors += `${channel.id}:${cursors[channel.id]};`; + nextBatchCursors.chan[channel.id] = messages[messages.length - 1].id; + } else if (cursors.chan[channel.id]) { + nextBatchCursors.chan[channel.id] = cursors.chan[channel.id]; } if (messages.length < 1 && onlyOutstandingEvents) { @@ -139,7 +180,7 @@ async function buildSyncPayload(user: User, cursors: MatrixSyncCursors, onlyOuts } return { - next_batch: nextBatchCursors, + next_batch: JSON.stringify(nextBatchCursors), rooms: { invite: {}, join: joinedChannels, @@ -335,46 +376,16 @@ router.get( isInitial = false; } - await withClient((client: PoolClient) => { - return new Promise(async (resolve, reject) => { - const channelsResult = await client.query("SELECT id, name, owner_id FROM channels"); - if (!channelsResult) return; - const channels = channelsResult.rows || []; + const channelsResult = await query("SELECT id, name, owner_id FROM channels"); + if (!channelsResult) return; + const channels = channelsResult.rows; - const sendPayload = async () => { - const cursors: MatrixSyncCursors = {}; - if (since) { - const sinceParts = since.split(";"); - if (sinceParts.length < 100) { - sinceParts.forEach((part) => { - const def = part.split(":"); - if (def.length !== 2) return; - const channelId = parseInt(def[0]); - const page = parseInt(def[1]); - if (!isNaN(channelId) && isFinite(channelId) && !isNaN(page) && isFinite(page)) { - cursors[channelId] = page; - } - }); - } - } - - res.json(await buildSyncPayload({ - id: 3, - username: "test", - is_superuser: true, - avatar: null - }, cursors, !isInitial, client, channels)); - }; - - if (timeout) { - let dispatchChannels = ["*"]; - channels.forEach(channel => dispatchChannels.push(`channel:${channel.id}`)); - await waitForEvent(dispatchChannels, timeout); - } - await sendPayload(); - resolve(true); - }); - }); + if (timeout) { + const dispatchChannels = channels.map(channel => `channel:${channel.id}`); + dispatchChannels.push("*"); + await waitForEvent(dispatchChannels, timeout); + } + res.json(await buildSyncPayload(req.publicUser, parseCursor(since) ?? cursorNew(), !isInitial, channels)); } );