fix and improve matrix layer

This commit is contained in:
hippoz 2023-03-17 22:46:49 +02:00
parent afb046b3b6
commit 5a5588f1e3
Signed by: hippoz
GPG key ID: 56C4E02A85F2FBED

View file

@ -12,6 +12,9 @@ const router = express.Router();
router.use(cors());
const isInt = (val: any) => typeof val === "number" && Number.isSafeInteger(val);
const isUint = (val: any) => (isInt(val) && val >= 0);
const matrixProtocol = process.env.MATRIX_PROTOCOL ? process.env.MATRIX_PROTOCOL : "http";
const matrixHomeserverBaseUrl = process.env.MATRIX_HOMESERVER_BASE_URL ? process.env.MATRIX_HOMESERVER_BASE_URL : "localhost:3000";
const matrixWaffleAppUrl = process.env.MATRIX_WAFFLE_APP_URL ? process.env.MATRIX_WAFFLE_APP_URL : "localhost:3000";
@ -47,22 +50,60 @@ const matrixUserProfile = (matrixUserId: string) => {
};
};
const cursorType = "waffletrackv1";
interface MatrixSyncCursors {
[channel_id: number]: number;
type: string,
chan: {[channel_id: number]: number};
};
async function buildSyncPayload(user: User, cursors: MatrixSyncCursors, onlyOutstandingEvents: boolean, client: PoolClient, channels: Channel[]) {
function parseCursor(cursor: string | null): MatrixSyncCursors | null {
if (!cursor) {
return null;
}
let json;
try {
json = JSON.parse(cursor);
} catch(O_o) {
return null;
}
if (json.type !== "waffletrackv1") {
return null;
}
if (typeof json.chan !== "object") {
return null;
}
for (const [key, value] of Object.entries(json.chan)) {
if (!isUint(parseInt(key)) || !isUint(value)) {
return null;
}
}
return json;
}
function cursorNew(): MatrixSyncCursors {
return {
type: cursorType,
chan: {}
}
}
async function buildSyncPayload(user: User, cursors: MatrixSyncCursors, onlyOutstandingEvents: boolean, channels: Channel[]) {
const joinedChannels: any[any] = {};
let nextBatchCursors = "";
const nextBatchCursors = cursorNew();
for (let i = 0; i < channels.length; i++) {
const channel = channels[i];
const roomId = `!${channel.id}:${matrixHomeserverBaseUrl}`;
let channelMessagesResult;
if (cursors[channel.id]) {
channelMessagesResult = await client.query(getMessagesByChannelAfterPage(50), [cursors[channel.id], channel.id]);
if (cursors.chan[channel.id]) {
channelMessagesResult = await query(getMessagesByChannelAfterPage(50), [cursors.chan[channel.id], channel.id]);
} else {
channelMessagesResult = await client.query(getMessagesByChannelFirstPage(50), [channel.id]);
channelMessagesResult = await query(getMessagesByChannelFirstPage(50), [channel.id]);
}
const messages = channelMessagesResult && channelMessagesResult.rows ? channelMessagesResult.rows.reverse() : [];
const messagesTimeline = messages.map(e => ({
@ -78,9 +119,9 @@ async function buildSyncPayload(user: User, cursors: MatrixSyncCursors, onlyOuts
}));
if (messages.length > 0) {
nextBatchCursors += `${channel.id}:${messages[messages.length - 1].id};`;
} else if (cursors[channel.id]) {
nextBatchCursors += `${channel.id}:${cursors[channel.id]};`;
nextBatchCursors.chan[channel.id] = messages[messages.length - 1].id;
} else if (cursors.chan[channel.id]) {
nextBatchCursors.chan[channel.id] = cursors.chan[channel.id];
}
if (messages.length < 1 && onlyOutstandingEvents) {
@ -139,7 +180,7 @@ async function buildSyncPayload(user: User, cursors: MatrixSyncCursors, onlyOuts
}
return {
next_batch: nextBatchCursors,
next_batch: JSON.stringify(nextBatchCursors),
rooms: {
invite: {},
join: joinedChannels,
@ -335,46 +376,16 @@ router.get(
isInitial = false;
}
await withClient((client: PoolClient) => {
return new Promise(async (resolve, reject) => {
const channelsResult = await client.query("SELECT id, name, owner_id FROM channels");
const channelsResult = await query("SELECT id, name, owner_id FROM channels");
if (!channelsResult) return;
const channels = channelsResult.rows || [];
const sendPayload = async () => {
const cursors: MatrixSyncCursors = {};
if (since) {
const sinceParts = since.split(";");
if (sinceParts.length < 100) {
sinceParts.forEach((part) => {
const def = part.split(":");
if (def.length !== 2) return;
const channelId = parseInt(def[0]);
const page = parseInt(def[1]);
if (!isNaN(channelId) && isFinite(channelId) && !isNaN(page) && isFinite(page)) {
cursors[channelId] = page;
}
});
}
}
res.json(await buildSyncPayload({
id: 3,
username: "test",
is_superuser: true,
avatar: null
}, cursors, !isInitial, client, channels));
};
const channels = channelsResult.rows;
if (timeout) {
let dispatchChannels = ["*"];
channels.forEach(channel => dispatchChannels.push(`channel:${channel.id}`));
const dispatchChannels = channels.map(channel => `channel:${channel.id}`);
dispatchChannels.push("*");
await waitForEvent(dispatchChannels, timeout);
}
await sendPayload();
resolve(true);
});
});
res.json(await buildSyncPayload(req.publicUser, parseCursor(since) ?? cursorNew(), !isInitial, channels));
}
);