Compare commits

...

5 commits

Author SHA1 Message Date
hippoz
e60ecce1bb
fix selected channel saving and loading 2023-08-08 15:48:58 +03:00
hippoz
0f6a88ac36
cleanup imports and styles 2023-08-08 15:48:13 +03:00
hippoz
24e9af17d2
add message attachments 2023-08-08 15:45:22 +03:00
hippoz
52d253f2cf
greatly refactor gateway 2023-08-08 15:36:51 +03:00
hippoz
689787247e
refactor configuration 2023-08-08 15:34:22 +03:00
33 changed files with 1528 additions and 850 deletions

2
.gitignore vendored
View file

@ -1,5 +1,5 @@
node_modules/
dist/
frontend-new/
uploads/avatar/*.webp
uploads/
.env

View file

@ -1,8 +1,18 @@
<script>
import { avatarUrl } from "../storage";
import { overlayStore, OverlayType, setMessageInputEvent } from "../stores";
import { overlayStore, OverlayType, setMessageInputEvent } from "../stores";
import MessageAttachment from "./MessageAttachment.svelte";
export let message;
let waitingForAttachments = 0;
$: {
const attachmentCount = message.attachments ? message.attachments.length : 0;
const expectedCount = message.pending_attachments || 0;
let delta = expectedCount - attachmentCount;
if (delta < 0) delta = 0;
waitingForAttachments = delta;
}
const reply = () => {
let replyString = "";
@ -112,6 +122,20 @@ import { overlayStore, OverlayType, setMessageInputEvent } from "../stores";
.message:hover .message-actions {
display: flex;
}
.attachments-container {
display: flex;
flex-direction: column;
margin-top: var(--space-xxs);
}
.hint-text {
display: block;
margin-top: 6px;
color: var(--foreground-color-3);
font-size: var(--h6);
user-select: none;
}
</style>
<div class="message" class:clumped={ message._clumped } class:pinged={ message._mentions }>
@ -137,6 +161,18 @@ import { overlayStore, OverlayType, setMessageInputEvent } from "../stores";
{/if}
<span class="message-content" class:pending={ message._isPending }>{ message.content }</span>
{#if message.attachments && message.attachments.length}
<div class="attachments-container">
{#each message.attachments as attachment(attachment.id)}
<MessageAttachment attachment={attachment} />
{/each}
</div>
{/if}
{#if waitingForAttachments}
<span class="hint-text">Waiting for {waitingForAttachments} more {`attachment${waitingForAttachments !== 1 ? 's' : ''}`}</span>
{/if}
<div class="message-actions">
<button class="icon-button material-icons-outlined" on:click="{ reply }" aria-label="Reply to Message">
reply

View file

@ -0,0 +1,93 @@
<script>
import { attachmentUrl } from "../storage";
const AttachmentRenderAs = {
Invalid: 0,
Image: 1,
Video: 2,
Audio: 3,
DownloadableFile: 4,
};
export let attachment;
let renderAs;
$: {
renderAs = AttachmentRenderAs.Invalid;
if (attachment && attachment.type === "file" && attachment.file_mime && attachment.file) {
const mimeParts = attachment.file_mime.split("/");
const type = mimeParts[0];
if (type) {
switch (type) {
case "audio": {
renderAs = AttachmentRenderAs.Audio;
break;
}
case "video": {
renderAs = AttachmentRenderAs.Video;
break;
}
case "image": {
renderAs = AttachmentRenderAs.Image;
break;
}
default: {
renderAs = AttachmentRenderAs.DownloadableFile
break;
}
}
}
}
}
</script>
<style>
.attachment-card {
display: flex;
flex-direction: row;
align-items: center;
padding: var(--space-norm);
border-radius: 9999px;
background-color: var(--background-color-0);
}
.attachment {
margin-bottom: var(--space-xs);
}
.media {
max-width: 90%;
}
img {
background-color: var(--background-color-2);
border-radius: 6px;
height: auto;
}
.attachment-filename {
color: var(--foreground-color-3);
margin-right: 6px;
}
.attachment-card .icon-button {
margin-left: auto;
}
</style>
{#if renderAs === AttachmentRenderAs.Image}
<img loading="lazy" decoding="async" width="{ attachment.width }" height="{ attachment.height }" class="attachment media" alt="Attachment" src="{ attachmentUrl(attachment.file) }">
{:else if renderAs === AttachmentRenderAs.Video}
<!-- svelte-ignore a11y-media-has-caption -->
<video controls="controls" class="attachment media" src="{ attachmentUrl(attachment.file) }"></video>
{:else if renderAs === AttachmentRenderAs.DownloadableFile}
<div class="attachment attachment-card">
<div class="attachment-filename">{ attachment.file_name }</div>
<a class="icon-button material-icons-outlined" href="{ attachmentUrl(attachment.file) }" target="_blank">download</a>
</div>
{:else}
<div class="attachment attachment-card">Couldn't render attachment</div>
{/if}

View file

@ -1,14 +1,24 @@
<script>
import { onDestroy, onMount } from "svelte";
import { getItem } from "../storage";
import { overlayStore, selectedChannel, sendMessageAction, setMessageInputEvent, smallViewport, typingStore, userInfoStore, usesKeyboardNavigation } from "../stores";
import { messagesStoreProvider, overlayStore, setMessageInputEvent, smallViewport, typingStore, userInfoStore, usesKeyboardNavigation } from "../stores";
import { getErrorFromResponse, methods, remoteBlobUpload, remoteCall, responseOk } from "../request";
export let channel;
let messageInput = "";
let messageTextarea;
let typingList = "?no one?";
let typingMessage = "is typing...";
let messageTextarea;
let fileInput;
let files = [];
let loadingMessage = null;
let showFileListStrip = false;
$: {
showFileListStrip = !!files.length || !!loadingMessage;
}
$: {
const typing = $typingStore.filter(a => a.channelId === channel.id);
const ownIndex = typing.findIndex(a => a.user.id === $userInfoStore.id);
@ -37,13 +47,88 @@
}
}
const removeFile = (id) => {
const index = files.findIndex(f => f.id === id);
if (index !== -1) {
files.splice(index, 1);
files = files;
}
}
const selectFiles = async () => {
if (!fileInput || !fileInput.files) return;
for (let i = 0; i < fileInput.files.length; i++) {
const file = fileInput.files.item(i);
const fileObject = {
file,
id: Math.random()
};
files.push(fileObject);
files = files;
}
return;
};
const uploadFiles = async (messageId) => {
const filesToUpload = [ ...files ];
files.length = 0;
files = files;
if (!filesToUpload.length) return;
loadingMessage = `Uploading ${filesToUpload.length} attachment${filesToUpload.length !== 1 ? 's' : ''}`;
for (let i = 0; i < filesToUpload.length; i++) {
const file = filesToUpload[i].file;
const res = await remoteBlobUpload(methods.createMessageAttachment, file, [messageId, file.name]);
if (!responseOk(res)) {
const error = getErrorFromResponse(res);
const message = error.validationErrors && error.validationErrors.length ? error.validationErrors[0].msg : error.message;
overlayStore.toast(`Failed to upload file ${file.name}: ${message}`);
return;
}
}
loadingMessage = null;
};
const sendMessage = async () => {
messageTextarea.focus();
sendMessageAction.emit({
channelId: channel.id,
content: messageInput
});
const content = messageInput;
if (content.trim() === "" || !userInfoStore.value)
return;
// optimistically add message to store
const optimisticMessageId = Math.floor(Math.random() * 9999999);
const optimisticMessage = {
id: optimisticMessageId,
content: content,
channel_id: channel.id,
author_id: userInfoStore.value.id,
author_username: userInfoStore.value.username,
created_at: Date.now().toString(),
_isPending: true
};
const messagesStoreForChannel = messagesStoreProvider.getStore(channel.id);
messagesStoreForChannel.addMessage(optimisticMessage);
const res = await remoteCall(methods.createChannelMessage, channel.id, content, optimisticMessageId, null, files.length);
if (!responseOk(res)) {
messagesStoreForChannel.deleteMessage({
id: optimisticMessageId
});
overlayStore.toast(`Couldn't send message: ${getMessageFromResponse(res)}`);
return;
}
messageInput = "";
await uploadFiles(res.data.id);
};
const onKeydown = async (e) => {
@ -109,23 +194,31 @@
.message-input-container.small {
padding-top: var(--space-sm);
}
.message-input.small {
padding: var(--space-xsplus);
padding-left: var(--space-sm);
border-radius: 1.4em;
.input-row {
display: flex;
flex-direction: row;
background-color: var(--background-color-3);
border-radius: var(--radius-md);
contain: content;
width: 100%;
padding: var(--space-sm);
}
.input-row.small {
border-radius: 9999px;
}
.message-input {
margin-left: calc(0.67 * var(--space-unit));
flex-grow: 1;
width: 100%;
background-color: var(--background-color-3);
background-color: transparent;
border: none;
color: currentColor;
border-radius: var(--radius-md);
padding: var(--space-sm);
border-radius: 0;
font-size: inherit;
resize: none;
contain: strict;
}
/* TODO: is this good? */
@ -176,22 +269,79 @@
.typing-message {
color: var(--foreground-color-2);
}
.strip {
display: flex;
flex-direction: row;
align-items: center;
border-top-left-radius: var(--radius-md);
border-top-right-radius: var(--radius-md);
background-color: var(--background-color-1);
color: var(--foreground-color-2);
padding: var(--space-sm);
}
.file-strip-file {
display: flex;
flex-direction: row;
background-color: var(--background-color-2);
padding: var(--space-xs);
border-radius: var(--radius-sm);
margin-right: var(--space-xs);
}
.file-strip-file span {
margin-right: 5px;
}
.file-strip-file button {
display: none;
}
.file-strip-file:hover button {
display: block;
}
.loading-message {
color: var(--foreground-color-3);
margin-left: 8px;
}
</style>
<div class="message-input-container" class:small={ $smallViewport }>
<input type="file" style="display: none;" accept="*" name="attachment-upload" multiple={false} bind:this={fileInput} on:change={selectFiles}>
{#if showFileListStrip}
<div class="strip">
{#if loadingMessage}
<div class="spinner ui"></div>
<div class="loading-message">{loadingMessage}</div>
{:else}
{#each files as fileObject(fileObject.id)}
<div class="file-strip-file">
<span>{fileObject.file.name}</span>
<button class="icon-button material-icons-outlined" on:click="{ () => removeFile(fileObject.id) }" aria-label="Remove Attachment">close</button>
</div>
{/each}
{/if}
</div>
{/if}
<div class="inner-input-container">
<textarea
placeholder={$smallViewport ? `Message #${channel.name}` : `Send something interesting to #${channel.name}`}
type="text"
class="message-input"
rows="1"
on:keydown={ onKeydown }
on:input={ onInput }
bind:value={ messageInput }
bind:this={ messageTextarea }
class:small={ $smallViewport || getItem("ui:alwaysUseMobileChatBar") }
class:keyboard-nav={ $usesKeyboardNavigation }
/>
<div class="input-row" class:small={ $smallViewport }>
<button class="icon-button material-icons-outlined" on:click="{ () => fileInput.click() }" aria-label="Add Attachment">add</button>
<textarea
placeholder={$smallViewport ? `Message #${channel.name}` : `Send something interesting to #${channel.name}`}
type="text"
class="message-input"
rows="1"
on:keydown={ onKeydown }
on:input={ onInput }
bind:value={ messageInput }
bind:this={ messageTextarea }
class:small={ $smallViewport || getItem("ui:alwaysUseMobileChatBar") }
class:keyboard-nav={ $usesKeyboardNavigation }
/>
</div>
{#if $smallViewport || getItem("ui:alwaysUseMobileChatBar")}
<button class="icon-button send-button material-icons-outlined" on:click="{ sendMessage }">
arrow_upward

View file

@ -2,9 +2,7 @@
import { quadInOut } from "svelte/easing";
import { maybeFly, maybeFlyIf } from "../animations";
import { avatarUrl } from "../storage";
import { channels, gatewayStatus, overlayStore, selectedChannel, showSidebar, smallViewport, userInfoStore, unreadStore, OverlayType, communities, selectedCommunity, filteredChannelsStore } from "../stores";
import AddCommunity from "./overlays/AddCommunity.svelte";
import UserView from "./UserView.svelte";
import { gatewayStatus, overlayStore, selectedChannel, showSidebar, smallViewport, userInfoStore, unreadStore, OverlayType, communities, selectedCommunity, filteredChannelsStore } from "../stores";
const selectChannel = (channel) => {
if ($smallViewport) {

View file

@ -8,7 +8,7 @@
width: 52px;
height: 26px;
border-radius: 999px;
margin: 4px;
margin: 2px;
background-color: var(--background-color-3);
transition: 0.15s;
contain: strict;

View file

@ -1,7 +1,6 @@
<script>
import { overlayStore, userInfoStore, smallViewport, theme, doAnimations, OverlayType, sendTypingUpdatesItemStore } from "../../stores";
import { logOut } from "../../auth";
import { maybeModalFade, maybeModalScale } from "../../animations";
import request, { getErrorFromResponse, methods, remoteBlobUpload, responseOk } from "../../request";
import { apiRoute, getItem } from "../../storage";
import UserView from "../UserView.svelte";

View file

@ -194,10 +194,10 @@ export default {
},
subscribe(event, handler) {
if (!this.handlers.get(event)) {
this.handlers.set(event, new Set());
this.handlers.set(event, []);
}
this.handlers.get(event).add(handler);
this.handlers.get(event).push(handler);
return handler; // can later be used for unsubscribe()
},
unsubscribe(event, handler) {
@ -205,9 +205,12 @@ export default {
if (!eventHandlers)
return;
eventHandlers.delete(handler);
const index = eventHandlers.indexOf(handler);
if (index !== -1) {
eventHandlers.splice(index, 1);
}
if (eventHandlers.size < 1) {
if (eventHandlers.length < 1) {
this.handlers.delete(event);
}
},

View file

@ -27,6 +27,7 @@ export const methods = {
getCommunity: withCacheable(method(403, true)),
getCommunities: withCacheable(method(404, true)),
getCommunityChannels: withCacheable(method(405, true)),
createMessageAttachment: method(500, true),
};
export const RPCError = {
@ -64,7 +65,7 @@ export const RequestStatusToMessage = {
export function getErrorFromResponse(response) {
if (!response) return;
if (response.status === RequestStatus.OK) return;
console.log(response);
console.error("Got error response", response);
let message = RequestStatusToMessage[response.status];
if (!message) message = "Something went wrong (unknown request error)";
@ -192,8 +193,8 @@ export async function remoteSignal(method, ...args) {
}, ...args);
}
export async function remoteBlobUpload({methodId, requiresAuthentication, _isSignal=false}, blob) {
const calls = [[methodId, [0, blob.size]]];
export async function remoteBlobUpload({methodId, requiresAuthentication, _isSignal=false}, blob, args=[]) {
const calls = [[methodId, ...args, [0, blob.size]]];
if (requiresAuthentication && gateway.authenticated) {
const replies = await gateway.sendRPCRequest(calls, _isSignal, blob);

View file

@ -2,6 +2,7 @@ const defaults = {
"server:apiBase": `${window.location.origin || ""}/api/v1`,
"server:gatewayBase": `${location.protocol === "https:" ? "wss" : "ws"}://${location.host}/gateway`,
"server:avatarsBase": `${window.location.origin || ""}/uploads/avatar`,
"server:attachmentsBase": `${window.location.origin || ""}/uploads/attachment`,
"auth:token": "",
"ui:doAnimations": true,
"ui:theme": "dark",
@ -95,3 +96,8 @@ export function apiRoute(fragment) {
export function avatarUrl(avatarId, size) {
return `${getItem("server:avatarsBase")}/${avatarId}_${size}.webp`;
}
export function attachmentUrl(name) {
return `${getItem("server:attachmentsBase")}/${name}`;
}

View file

@ -102,8 +102,6 @@ class Store {
this.value = this._applyPipes(this.value);
if (!this._handlers.size) return;
storeLog(`[Update] (${this.name}) Will queue ${this._handlers.size} handlers`, "value:", this.value, "handlers:", this._handlers);
const isRootNode = storeCallbackQueue.length === 0;
@ -705,7 +703,24 @@ class SelectedChannelStore extends Store {
this.communityIdToSelectedChannel = new Map();
const applySavedMap = () => {
if (!gateway.channels) return;
this.communityIdToSelectedChannel.clear();
const savedMap = getItem("state:selectedChannels");
for (const [communityId, channelId] of Object.entries(savedMap)) {
const channel = gateway.channels.find(c => c.id === channelId);
if (channel) {
this.communityIdToSelectedChannel.set(parseInt(communityId), channel);
}
}
this.updateSavedMap();
}
filteredChannelsStore.subscribe((channels) => {
applySavedMap();
let channel = this.communityIdToSelectedChannel.get(selectedCommunity.value.id);
if (!channel && channels.length) {
channel = channels[0];
@ -714,17 +729,8 @@ class SelectedChannelStore extends Store {
this.updated();
});
gateway.subscribe(GatewayEventType.Ready, ({ channels }) => {
this.communityIdToSelectedChannel.clear();
const savedMap = getItem("state:selectedChannels");
for (const [communityId, channelId] of Object.entries(savedMap)) {
const channel = channels.find(c => c.id === channelId);
if (channel) {
this.communityIdToSelectedChannel.set(parseInt(communityId), channel);
}
}
this.updateSavedMap();
});
gateway.subscribe(GatewayEventType.Ready, () => applySavedMap());
gateway.subscribe(GatewayEventType.CommunityDelete, ({ id }) => {
if (this.communityIdToSelectedChannel.delete(id) && this.value && this.value.id === id) {
this.value = noneChannel;
@ -821,16 +827,16 @@ class SelectedCommunityStore extends Store {
class FilteredChannelsStore extends Store {
constructor() {
super([], "FilteredChannelsStore");
channels.on(() => this.update());
channelsStore.on(() => this.update());
selectedCommunity.on(() => this.update());
this.update();
}
update() {
if (selectedCommunity.value.id === -1) {
this.value = channels.value.filter(n => n.community_id === null);
this.value = channelsStore.value.filter(n => n.community_id === null);
} else {
this.value = channels.value.filter(n => n.community_id === selectedCommunity.value.id);
this.value = channelsStore.value.filter(n => n.community_id === selectedCommunity.value.id);
}
this.updated();
}
@ -838,7 +844,7 @@ class FilteredChannelsStore extends Store {
export const selectedCommunity = new SelectedCommunityStore();
export const channels = new ChannelsStore();
export const channelsStore = new ChannelsStore();
export const filteredChannelsStore = new FilteredChannelsStore();
export const selectedChannel = new SelectedChannelStore();
export const showSidebar = new Store(true, "showSidebar");
@ -862,33 +868,6 @@ export const totalUnreadsStore = new Store(0, "TotalUnreadsStore");
export const statusBarStore = new Store(null, "statusBarStore");
export const setMessageInputEvent = new Store(null, "event:setMessageInput");
export const sendMessageAction = createAction("sendMessageAction", async ({channelId, content}) => {
if (content.trim() === "" || !userInfoStore.value)
return;
// optimistically add message to store
const optimisticMessageId = Math.floor(Math.random() * 999999);
const optimisticMessage = {
id: optimisticMessageId,
content: content,
channel_id: channelId,
author_id: userInfoStore.value.id,
author_username: userInfoStore.value.username,
created_at: Date.now().toString(),
_isPending: true
};
const messagesStoreForChannel = messagesStoreProvider.getStore(channelId);
messagesStoreForChannel.addMessage(optimisticMessage);
const res = await remoteSignal(methods.createChannelMessage, channelId, content, optimisticMessageId, null);
if (!responseOk(res)) {
messagesStoreForChannel.deleteMessage({
id: optimisticMessageId
});
overlayStore.toast(`Couldn't send message: ${getMessageFromResponse(res)}`);
}
});
export const allStores = {
selectedChannel,
@ -898,7 +877,7 @@ export const allStores = {
showChannelView,
theme,
doAnimations,
channels,
channels: channelsStore,
gatewayStatus,
messagesStoreProvider,
userInfoStore,
@ -908,7 +887,6 @@ export const allStores = {
unreadStore,
pluginStore,
setMessageInputEvent,
sendMessageAction,
};
unreadStore.subscribe(() => {

View file

@ -383,6 +383,7 @@ body {
text-align: center;
border: none;
border-radius: var(--radius-md);
text-decoration: none;
}
.icon-button .material-icons,

View file

@ -17,6 +17,7 @@
"dotenv": "^16.0.1",
"express": "^4.18.1",
"express-validator": "^6.14.2",
"file-type": "^18.5.0",
"jsonwebtoken": "^8.5.1",
"pg": "^8.8.0",
"sharp": "^0.31.3",

View file

@ -2,21 +2,13 @@ import e, { NextFunction, Request, Response } from "express";
import { sign, verify } from "jsonwebtoken";
import { query } from "./database";
import { errors } from "./errors";
import serverConfig from "./serverconfig";
import serverConfig, { jwtSecret } from "./serverconfig";
import { compare } from "bcrypt";
const jwtSecret = process.env.JWT_SECRET || "[generic token]";
const tokenTypes = {
BEARER: 1
}
if (jwtSecret === "[generic token]") {
console.error("ERROR: No JWT_SECRET environment variable was specified.");
console.error("ERROR: exiting...");
process.exit(1);
}
export function getUserPermissions(user: User) {
return {
create_channel: serverConfig.superuserRequirement.createChannel ? user.is_superuser : true

View file

@ -1,4 +1,4 @@
import { query, withClient } from ".";
import { query } from ".";
export default async function databaseInit() {
const migrations = [
@ -43,7 +43,32 @@ export default async function databaseInit() {
`,
`
ALTER TABLE channels ADD COLUMN IF NOT EXISTS community_id INT NULL REFERENCES communities(id) ON DELETE CASCADE;
`,
`
CREATE TABLE IF NOT EXISTS message_attachments(
id SERIAL PRIMARY KEY,
type VARCHAR(64) NOT NULL,
owner_id SERIAL REFERENCES users ON DELETE CASCADE,
message_id SERIAL REFERENCES messages ON DELETE CASCADE,
created_at BIGINT,
file VARCHAR(256) DEFAULT NULL,
file_mime VARCHAR(256) DEFAULT NULL,
file_size_bytes BIGINT DEFAULT NULL
);
`,
`
ALTER TABLE messages ADD COLUMN IF NOT EXISTS pending_attachments INT DEFAULT NULL;
`,
`
ALTER TABLE message_attachments ADD COLUMN IF NOT EXISTS width INT DEFAULT NULL;
`,
`
ALTER TABLE message_attachments ADD COLUMN IF NOT EXISTS height INT DEFAULT NULL;
`,
`
ALTER TABLE message_attachments ADD COLUMN IF NOT EXISTS file_name VARCHAR(256) DEFAULT NULL;
`,
];
for (let i = 0; i < migrations.length; i++) {

View file

@ -1,4 +1,41 @@
export const getMessageById = "SELECT messages.id, messages.content, messages.channel_id, messages.created_at, messages.author_id, messages.nick_username, users.avatar AS author_avatar, users.username AS author_username FROM messages JOIN users ON messages.author_id = users.id WHERE messages.id = $1";
export const getMessagesByChannelFirstPage = (limit: number) => `SELECT messages.id, messages.content, messages.created_at, messages.author_id, messages.nick_username, users.avatar AS author_avatar, users.username AS author_username FROM messages JOIN users ON messages.author_id = users.id WHERE messages.channel_id = $1 ORDER BY id DESC LIMIT ${limit}`;
export const getMessagesByChannelPage = (limit: number) => `SELECT messages.id, messages.content, messages.created_at, messages.author_id, messages.nick_username, users.avatar AS author_avatar, users.username AS author_username FROM messages JOIN users ON messages.author_id = users.id WHERE messages.id < $1 AND messages.channel_id = $2 ORDER BY id DESC LIMIT ${limit}`;
export const getMessagesByChannelAfterPage = (limit: number) => `SELECT messages.id, messages.content, messages.created_at, messages.author_id, messages.nick_username, users.avatar AS author_avatar, users.username AS author_username FROM messages JOIN users ON messages.author_id = users.id WHERE messages.id > $1 AND messages.channel_id = $2 ORDER BY id DESC LIMIT ${limit}`;
export const getMessageById = `
SELECT m.id, m.content, m.channel_id, m.created_at, m.author_id, m.nick_username, m.pending_attachments, u.avatar AS author_avatar, u.username AS author_username,
(SELECT jsonb_agg(to_jsonb(ma)) FROM message_attachments ma WHERE ma.message_id = m.id) AS attachments
FROM messages m
JOIN users u ON m.author_id = u.id
WHERE m.id = $1
GROUP BY m.id, u.avatar, u.username;
`;
export const getMessagesByChannelFirstPage = (limit: number) => `
SELECT m.id, m.content, m.channel_id, m.created_at, m.author_id, m.nick_username, m.pending_attachments, u.avatar AS author_avatar, u.username AS author_username,
(SELECT jsonb_agg(to_jsonb(ma)) FROM message_attachments ma WHERE ma.message_id = m.id) AS attachments
FROM messages m
JOIN users u ON m.author_id = u.id
WHERE m.channel_id = $1
GROUP BY m.id, u.avatar, u.username
ORDER BY m.id DESC
LIMIT ${limit};
`;
export const getMessagesByChannelPage = (limit: number) => `
SELECT m.id, m.content, m.channel_id, m.created_at, m.author_id, m.nick_username, m.pending_attachments, u.avatar AS author_avatar, u.username AS author_username,
(SELECT jsonb_agg(to_jsonb(ma)) FROM message_attachments ma WHERE ma.message_id = m.id) AS attachments
FROM messages m
JOIN users u ON m.author_id = u.id
WHERE m.id < $1 AND m.channel_id = $2
GROUP BY m.id, u.avatar, u.username
ORDER BY m.id DESC
LIMIT ${limit};
`;
export const getMessagesByChannelAfterPage = (limit: number) => `
SELECT m.id, m.content, m.channel_id, m.created_at, m.author_id, m.nick_username, m.pending_attachments, u.avatar AS author_avatar, u.username AS author_username,
(SELECT jsonb_agg(to_jsonb(ma)) FROM message_attachments ma WHERE ma.message_id = m.id) AS attachments
FROM messages m
JOIN users u ON m.author_id = u.id
WHERE m.id > $1 AND m.channel_id = $2
GROUP BY m.id, u.avatar, u.username
ORDER BY m.id DESC
LIMIT ${limit};
`;

View file

@ -1,6 +1,6 @@
import { Server } from "node:http";
import { performance } from "node:perf_hooks";
import { WebSocketServer, WebSocket } from "ws";
import WebSocket, { WebSocketServer } from "ws";
import { decodeTokenOrNull, getPublicUserObject } from "../auth";
import { query } from "../database";
import { gatewayErrors } from "../errors";
@ -15,16 +15,21 @@ const GATEWAY_PING_INTERVAL = 40000;
const MAX_CLIENT_MESSAGES_PER_BATCH = 30; // TODO: how well does this work for weak connections?
const MAX_GATEWAY_SESSIONS_PER_USER = 5;
// mapping between a dispatch id and a websocket client
const dispatchChannels = new Map<string, Set<WebSocket>>();
const dispatchChannels = new Map<string, Set<GatewayClient>>();
// mapping between a user id and the websocket sessions it has
const sessionsByUserId = new Map<number, WebSocket[]>();
const sessionsByUserId = new Map<number, GatewayClient[]>();
// mapping between a dispatch id and a temporary handler
const dispatchTemporary = new Map<string, Set<(payload: GatewayPayload) => void>>();
export function handle(channels: string[], handler: (payload: GatewayPayload) => void): (() => any) {
// all clients
const gatewayClients = new Set<GatewayClient>();
function handle(channels: string[], handler: (payload: GatewayPayload) => void): (() => any) {
channels.forEach(c => {
if (!dispatchTemporary.get(c)) {
dispatchTemporary.set(c, new Set());
@ -64,65 +69,17 @@ export function waitForEvent(channels: string[], timeout: number) {
});
}
function clientSubscribe(ws: WebSocket, dispatchChannel: string) {
ws.state.dispatchChannels.add(dispatchChannel);
if (!dispatchChannels.get(dispatchChannel)) {
dispatchChannels.set(dispatchChannel, new Set());
}
dispatchChannels.get(dispatchChannel)?.add(ws);
}
function clientUnsubscribe(ws: WebSocket, dispatchChannel: string) {
if (!ws.state) return;
ws.state.dispatchChannels.delete(dispatchChannel);
const set = dispatchChannels.get(dispatchChannel);
if (!set) return;
set.delete(ws);
if (set.size < 1) {
dispatchChannels.delete(dispatchChannel);
}
}
export function dispatchChannelSubscribe(target: string, dispatchChannel: string) {
const set = dispatchChannels.get(target);
if (!set) return;
set.forEach(c => {
clientSubscribe(c, dispatchChannel);
});
}
function clientUnsubscribeAll(ws: WebSocket) {
if (!ws.state) return;
ws.state.dispatchChannels.forEach(e => {
const set = dispatchChannels.get(e);
if (!set) return;
set.delete(ws);
if (set && set.size < 1) {
dispatchChannels.delete(e);
}
});
ws.state.dispatchChannels = new Set();
}
export function dispatch(channel: string, message: GatewayPayload | ((ws: WebSocket | null) => GatewayPayload)) {
export function dispatch(channel: string, message: GatewayPayload | ((ws: GatewayClient | null) => GatewayPayload)) {
const members = dispatchChannels.get(channel);
if (!members) return;
members.forEach(e => {
if (e.state.ready) {
if (e.ready) {
let data = message;
if (typeof message === "function") {
data = message(e);
}
e.send(JSON.stringify(data));
e.send(data);
}
});
@ -139,14 +96,34 @@ export function dispatch(channel: string, message: GatewayPayload | ((ws: WebSoc
}
}
function closeWithError(ws: WebSocket, { code, message }: { code: number, message: string }) {
return ws.close(code, message);
export function dispatchChannelSubscribe(target: string, dispatchChannel: string) {
const set = dispatchChannels.get(target);
set?.forEach(c => c.subscribe(dispatchChannel));
}
function closeWithBadPayload(ws: WebSocket, hint: string) {
return ws.close(gatewayErrors.BAD_PAYLOAD.code, `${gatewayErrors.BAD_PAYLOAD.message}: ${hint}`);
}
function getInitialPresenceEntries(): GatewayPresenceEntry[] {
// The initial presence entries are sent right when the user connects.
// In the future, each user will have their own list of channels that they can join and leave.
// In that case, we will send the presence entries to a certain user only for the channels they're in.
const entries: GatewayPresenceEntry[] = [];
sessionsByUserId.forEach((clients: GatewayClient[], userId: number) => {
if (clients.length < 1)
return;
const firstClient = clients[0];
if (firstClient.ready && firstClient.user) {
const entry = firstClient.getPresenceEntry(GatewayPresenceStatus.Online);
if (entry) {
entries.push(entry);
}
}
});
return entries;
}
function parseJsonOrNull(payload: string): any {
try {
return JSON.parse(payload);
@ -155,8 +132,6 @@ function parseJsonOrNull(payload: string): any {
}
}
// The function below ensures `payload` is of the GatewayPayload
// interface payload. If it does not match, null is returned.
function ensureFormattedGatewayPayload(payload: any): GatewayPayload | null {
if (!payload) {
return null;
@ -183,288 +158,305 @@ function ensureFormattedGatewayPayload(payload: any): GatewayPayload | null {
return asPayload;
}
function sendPayload(ws: WebSocket, payload: GatewayPayload) {
ws.send(JSON.stringify(payload));
}
function getPresenceEntryForConnection(ws: WebSocket, status: GatewayPresenceStatus): GatewayPresenceEntry | null {
if (!ws.state || !ws.state.user) {
return null;
class GatewayClient {
ws: WebSocket;
user?: User;
ready: boolean;
alive: boolean;
lastAliveCheck: number;
clientDispatchChannels: Set<string>;
messagesSinceLastCheck: number;
bridgesTo?: string;
privacy?: string;
terms?: string;
constructor(ws: WebSocket) {
this.ws = ws;
this.user = undefined;
this.ready = false;
this.alive = false;
this.lastAliveCheck = performance.now();
this.clientDispatchChannels = new Set();
this.messagesSinceLastCheck = 0;
this.bridgesTo = undefined;
this.privacy = undefined;
this.terms = undefined;
gatewayClients.add(this);
this.ws.on("close", this.handleClose.bind(this));
this.ws.on("message", this.handleMessage.bind(this));
}
const entry: GatewayPresenceEntry = {
user: {
id: ws.state.user.id,
username: ws.state.user.username,
avatar: ws.state.user.avatar
},
status
};
if (typeof ws.state.bridgesTo === "string") {
entry.bridgesTo = ws.state.bridgesTo;
}
if (typeof ws.state.privacy === "string") {
entry.privacy = ws.state.privacy;
}
if (typeof ws.state.terms === "string") {
entry.terms = ws.state.terms;
greet() {
this.send({
t: GatewayPayloadType.Hello,
d: {
pingInterval: GATEWAY_PING_INTERVAL
}
});
}
return entry;
}
subscribe(channel: string) {
this.clientDispatchChannels.add(channel);
if (!dispatchChannels.get(channel)) {
dispatchChannels.set(channel, new Set());
}
dispatchChannels.get(channel)?.add(this);
}
// The initial presence entries are sent right when the user connects.
// In the future, each user will have their own list of channels that they can join and leave.
// In that case, we will send the presence entries to a certain user only for the channels they're in.
function getInitialPresenceEntries(): GatewayPresenceEntry[] {
const entries: GatewayPresenceEntry[] = [];
unsubscribeAll() {
this.clientDispatchChannels.forEach((channel) => {
const set = dispatchChannels.get(channel);
if (!set) return;
set.delete(this);
if (set && set.size < 1) {
dispatchChannels.delete(channel);
}
});
this.clientDispatchChannels.clear();
}
sessionsByUserId.forEach((wsList: WebSocket[], userId: number) => {
if (wsList.length < 1)
return;
send(payload: object) {
this.ws.send(JSON.stringify(payload));
}
const firstWs = wsList[0];
if (firstWs.state.ready && firstWs.state.user) {
const entry = getPresenceEntryForConnection(firstWs, GatewayPresenceStatus.Online);
if (entry) {
entries.push(entry);
closeWithError({ code, message }: { code: number, message: string }) {
this.ws.close(code, message);
}
closeWithBadPayload(hint: string) {
this.ws.close(gatewayErrors.BAD_PAYLOAD.code, `${gatewayErrors.BAD_PAYLOAD.message}: ${hint}`);
}
getPresenceEntry(status: GatewayPresenceStatus): GatewayPresenceEntry | null {
if (!this.user || !this.ready) {
return null;
}
return {
user: {
id: this.user.id,
username: this.user.username,
avatar: this.user.avatar
},
status,
bridgesTo: this.bridgesTo,
privacy: this.privacy,
terms: this.terms,
};
}
handleClose() {
gatewayClients.delete(this);
this.unsubscribeAll();
this.ready = false;
if (this.user) {
const sessions = sessionsByUserId.get(this.user.id);
if (sessions) {
const index = sessions.indexOf(this);
sessions.splice(index, 1);
if (!sessions.length) {
sessionsByUserId.delete(this.user.id);
// user no longer has any sessions, update presence
dispatch("*", {
t: GatewayPayloadType.PresenceUpdate,
d: [
this.getPresenceEntry(GatewayPresenceStatus.Offline)
]
});
}
}
}
});
}
return entries;
batchTick() {
const now = performance.now();
if ((now - this.lastAliveCheck) >= GATEWAY_PING_INTERVAL) {
if (!this.ready) {
return this.closeWithError(gatewayErrors.AUTHENTICATION_TIMEOUT);
}
if (!this.alive) {
return this.closeWithError(gatewayErrors.NO_PING);
}
this.messagesSinceLastCheck = 0;
}
}
async handleMessage(rawData: Buffer, isBinary: boolean) {
this.messagesSinceLastCheck++;
if (rawData.byteLength >= maxGatewayPayloadByteLength) {
return this.closeWithError(gatewayErrors.PAYLOAD_TOO_LARGE);
}
if (this.messagesSinceLastCheck > MAX_CLIENT_MESSAGES_PER_BATCH) {
return this.closeWithError(gatewayErrors.FLOODING);
}
let stringData: string;
let binaryStream: Buffer | null = null;
if (isBinary) {
// Binary frames are used in order combine our text data (JSON) with binary data.
// This is especially useful for calling RPC methods that, for example, upload files.
// The format is: [json payload]\n[begin binary stream]
let jsonSlice;
let jsonOffset = -1;
for (let i = 0; i < maxGatewayJsonStringByteLength; i++) {
if (rawData.readUInt8(i) === 0x0A) {
// hit newline
jsonSlice = rawData.subarray(0, i);
jsonOffset = i + 1;
break;
}
}
if (!jsonSlice) {
return this.closeWithBadPayload("Did not find newline to delimit JSON from binary stream. JSON payload may be too large, or newline may be missing.");
}
binaryStream = rawData.subarray(jsonOffset, rawData.byteLength);
stringData = jsonSlice.toString();
} else {
stringData = rawData.toString();
}
if (stringData.length > maxGatewayJsonStringLength) {
return this.closeWithError(gatewayErrors.PAYLOAD_TOO_LARGE);
}
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(stringData));
if (!payload) {
return this.closeWithBadPayload("Invalid JSON or message does not match schema");
}
switch (payload.t) {
case GatewayPayloadType.Authenticate: {
if (this.ready) {
return this.closeWithError(gatewayErrors.ALREADY_AUTHENTICATED);
}
const authData = payload.d;
if (typeof authData !== "object") {
return this.closeWithBadPayload("d: expected object");
}
if (typeof authData.token !== "string") {
return this.closeWithBadPayload("d: invalid field 'token'");
}
const user = await decodeTokenOrNull(authData.token);
if (!user) {
return this.closeWithError(gatewayErrors.BAD_AUTH);
}
let sessions = sessionsByUserId.get(user.id);
if (sessions) {
if ((sessions.length + 1) > MAX_GATEWAY_SESSIONS_PER_USER) {
return this.closeWithError(gatewayErrors.TOO_MANY_SESSIONS);
}
}
// TODO: each user should have their own list of channels that they join
const [channels, communities] = await Promise.all([
query("SELECT id, name, owner_id, community_id FROM channels ORDER BY id ASC"),
query("SELECT id, name, owner_id, avatar, created_at FROM communities ORDER BY id ASC"),
]);
if (!channels || !communities) {
return this.closeWithError(gatewayErrors.GOT_NO_DATABASE_DATA);
}
if (!sessions) {
sessions = [];
sessionsByUserId.set(user.id, sessions);
}
sessions.push(this);
this.subscribe("*");
for (let i = 0; i < channels.rows.length; i++) {
this.subscribe(`channel:${channels.rows[i].id}`);
}
for (let i = 0; i < communities.rows.length; i++) {
this.subscribe(`community:${communities.rows[i].id}`);
}
this.user = user;
// first session, notify others that we are online
if (sessions.length === 1) {
dispatch("*", {
t: GatewayPayloadType.PresenceUpdate,
d: [this.getPresenceEntry(GatewayPresenceStatus.Online)]
});
}
this.ready = true;
this.send({
t: GatewayPayloadType.Ready,
d: {
user: getPublicUserObject(this.user),
channels: channels.rows,
communities: communities.rows,
presence: getInitialPresenceEntries()
}
});
break;
}
case GatewayPayloadType.Ping: {
if (payload.d !== 0) {
return this.closeWithBadPayload("d: expected numeric '0'");
}
if (!this.ready) {
return this.closeWithError(gatewayErrors.NOT_AUTHENTICATED);
}
this.alive = true;
break;
}
case GatewayPayloadType.RPCSignal: /* through */
case GatewayPayloadType.RPCRequest: {
if (!this.ready || !this.user) {
return this.closeWithError(gatewayErrors.NOT_AUTHENTICATED);
}
// RPCSignal is like RPCRequest however it does not send RPC method output unless there is an error
processMethodBatch(this.user, payload.d, (payload.t === GatewayPayloadType.RPCSignal ? true : false), binaryStream).then((results) => {
this.send({
t: GatewayPayloadType.RPCResponse,
d: results,
s: payload.s
});
});
break;
}
default: {
return this.closeWithBadPayload("t: unknown type");
}
}
}
}
export default function(server: Server) {
const wss = new WebSocketServer({ server });
const batchInterval = setInterval(() => {
wss.clients.forEach((e) => {
const now = performance.now();
if (e.state && (now - e.state.lastAliveCheck) >= GATEWAY_PING_INTERVAL) {
if (!e.state.ready) {
return closeWithError(e, gatewayErrors.AUTHENTICATION_TIMEOUT);
}
if (!e.state.alive) {
return closeWithError(e, gatewayErrors.NO_PING);
}
e.state.messagesSinceLastCheck = 0;
}
});
gatewayClients.forEach(client => client.batchTick());
}, GATEWAY_BATCH_INTERVAL);
wss.on("close", () => {
console.warn("gateway: websocket server closed");
console.warn("gateway: clearing batch interval due to websocket server close");
console.error("gateway: websocket server closed");
clearInterval(batchInterval);
});
wss.on("connection", (ws) => {
ws.state = {
user: undefined,
alive: false,
ready: false,
lastAliveCheck: performance.now(),
dispatchChannels: new Set(),
messagesSinceLastCheck: 0,
bridgesTo: undefined
};
sendPayload(ws, {
t: GatewayPayloadType.Hello,
d: {
pingInterval: GATEWAY_PING_INTERVAL
}
});
ws.on("close", () => {
clientUnsubscribeAll(ws);
ws.state.ready = false;
if (ws.state.user && ws.state.user.id) {
const sessions = sessionsByUserId.get(ws.state.user.id);
if (sessions) {
const index = sessions.indexOf(ws);
sessions.splice(index, 1);
if (sessions.length < 1) {
sessionsByUserId.delete(ws.state.user.id);
// user no longer has any sessions, update presence
dispatch("*", {
t: GatewayPayloadType.PresenceUpdate,
d: [getPresenceEntryForConnection(ws, GatewayPresenceStatus.Offline)]
});
}
}
}
});
ws.on("message", async (rawData: Buffer, isBinary) => {
ws.state.messagesSinceLastCheck++;
if (ws.state.messagesSinceLastCheck > MAX_CLIENT_MESSAGES_PER_BATCH) {
return closeWithError(ws, gatewayErrors.FLOODING);
}
if (rawData.byteLength >= maxGatewayPayloadByteLength) {
return closeWithError(ws, gatewayErrors.PAYLOAD_TOO_LARGE);
}
let stringData: string;
let binaryStream: Buffer | null = null;
if (isBinary) {
// Binary frames are used in order combine our text data (JSON) with binary data.
// This is especially useful for calling RPC methods that, for example, upload files.
// The format is: [json payload]\n[begin binary stream]
let jsonSlice;
let jsonOffset = -1;
for (let i = 0; i < maxGatewayJsonStringByteLength; i++) {
if (rawData.readUInt8(i) === 0x0A) {
// hit newline
jsonSlice = rawData.subarray(0, i);
jsonOffset = i + 1;
break;
}
}
if (!jsonSlice) {
return closeWithBadPayload(ws, "Did not find newline to delimit JSON from binary stream. JSON payload may be too large, or newline may be missing.");
}
binaryStream = rawData.subarray(jsonOffset, rawData.byteLength);
stringData = jsonSlice.toString();
} else {
stringData = rawData.toString();
}
if (stringData.length > maxGatewayJsonStringLength) {
return closeWithError(ws, gatewayErrors.PAYLOAD_TOO_LARGE);
}
const payload = ensureFormattedGatewayPayload(parseJsonOrNull(stringData));
if (!payload) {
return closeWithBadPayload(ws, "Invalid JSON or message does not match schema");
}
switch (payload.t) {
case GatewayPayloadType.Authenticate: {
if (ws.state.ready) {
return closeWithError(ws, gatewayErrors.ALREADY_AUTHENTICATED);
}
const authData = payload.d;
if (typeof authData !== "object") {
return closeWithBadPayload(ws, "d: expected object");
}
if (typeof authData.token !== "string") {
return closeWithBadPayload(ws, "d: invalid field 'token'");
}
if (typeof authData.bridgesTo !== "undefined" && typeof authData.bridgesTo !== "string" && authData.bridgesTo.length > 40) {
return closeWithBadPayload(ws, "d: invalid field 'bridgesTo'");
}
if (typeof authData.privacy !== "undefined" && typeof authData.privacy !== "string" && authData.privacy.length > 200) {
return closeWithBadPayload(ws, "d: invalid field 'privacy'");
}
if (typeof authData.terms !== "undefined" && typeof authData.terms !== "string" && authData.terms.length > 200) {
return closeWithBadPayload(ws, "d: invalid field 'terms'");
}
const user = await decodeTokenOrNull(authData.token);
if (!user) {
return closeWithError(ws, gatewayErrors.BAD_AUTH);
}
let sessions = sessionsByUserId.get(user.id);
if (sessions) {
if ((sessions.length + 1) > MAX_GATEWAY_SESSIONS_PER_USER) {
return closeWithError(ws, gatewayErrors.TOO_MANY_SESSIONS);
}
}
// TODO: each user should have their own list of channels that they join
const [channels, communities] = await Promise.all([
query("SELECT id, name, owner_id, community_id FROM channels ORDER BY id ASC"),
query("SELECT id, name, owner_id, avatar, created_at FROM communities ORDER BY id ASC"),
]);
if (!channels || !communities) {
return closeWithError(ws, gatewayErrors.GOT_NO_DATABASE_DATA);
}
if (!sessions) {
sessions = [];
sessionsByUserId.set(user.id, sessions);
}
sessions.push(ws);
clientSubscribe(ws, "*");
for (let i = 0; i < channels.rows.length; i++) {
clientSubscribe(ws, `channel:${channels.rows[i].id}`);
}
for (let i = 0; i < communities.rows.length; i++) {
clientSubscribe(ws, `community:${communities.rows[i].id}`);
}
ws.state.user = user;
ws.state.bridgesTo = authData.bridgesTo;
ws.state.privacy = authData.privacy;
ws.state.terms = authData.terms;
// first session, notify others that we are online
if (sessions.length === 1) {
dispatch("*", {
t: GatewayPayloadType.PresenceUpdate,
d: [getPresenceEntryForConnection(ws, GatewayPresenceStatus.Online)]
});
}
ws.state.ready = true;
sendPayload(ws, {
t: GatewayPayloadType.Ready,
d: {
user: getPublicUserObject(ws.state.user),
channels: channels.rows,
communities: communities.rows,
presence: getInitialPresenceEntries()
}
});
break;
}
case GatewayPayloadType.Ping: {
if (payload.d !== 0) {
return closeWithBadPayload(ws, "d: expected numeric '0'");
}
if (!ws.state.ready) {
return closeWithError(ws, gatewayErrors.NOT_AUTHENTICATED);
}
// TODO: also check session here
ws.state.alive = true;
break;
}
case GatewayPayloadType.RPCSignal: /* through */
case GatewayPayloadType.RPCRequest: {
if (!ws.state.ready || !ws.state.user) {
return closeWithError(ws, gatewayErrors.NOT_AUTHENTICATED);
}
// RPCSignal is like RPCRequest however it does not send RPC method output unless there is an error
processMethodBatch(ws.state.user, payload.d, (payload.t === GatewayPayloadType.RPCSignal ? true : false), binaryStream).then((results) => {
sendPayload(ws, {
t: GatewayPayloadType.RPCResponse,
d: results,
s: payload.s
});
});
break;
}
default: {
return closeWithBadPayload(ws, "t: unknown type");
}
}
});
const client = new GatewayClient(ws);
client.greet();
});
};

View file

@ -2,12 +2,12 @@ import { query } from "./database";
import { dispatch } from "./gateway";
import { GatewayPayloadType } from "./gateway/gatewaypayloadtype";
export default async function sendMessage(user: User, channelId: number, optimisticId: number | null, content: string, nickUsername: string | null) {
export default async function sendMessage(user: User, channelId: number, optimisticId: number | null, content: string, nickUsername: string | null, pendingAttachments: number) {
const authorId = user.id;
const createdAt = Date.now().toString();
const result = await query("INSERT INTO messages(content, channel_id, author_id, created_at, nick_username) VALUES ($1, $2, $3, $4, $5) RETURNING id", [content, channelId, authorId, createdAt, nickUsername]);
if (!result || result.rowCount < 1) {
const result = await query("INSERT INTO messages(content, channel_id, author_id, created_at, nick_username, pending_attachments) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id", [content, channelId, authorId, createdAt, nickUsername, pendingAttachments]);
if (!result || result.rowCount !== 1) {
return null;
}
@ -19,12 +19,14 @@ export default async function sendMessage(user: User, channelId: number, optimis
author_username: user.username,
author_avatar: user.avatar,
created_at: createdAt,
nick_username: nickUsername
nick_username: nickUsername,
pending_attachments: pendingAttachments,
attachments: null
};
dispatch(`channel:${channelId}`, (ws) => {
let payload: any = returnObject;
if (ws && ws.state && ws.state.user && ws.state.user.id === user.id && optimisticId) {
if (ws && ws.user && ws.user.id === user.id && optimisticId) {
payload = {
...payload,
optimistic_id: optimisticId

View file

@ -4,8 +4,8 @@ import { createServer } from "node:http";
import databaseInit from "./database/init";
import gateway from "./gateway";
import server from "./server";
import { port } from "./serverconfig";
const port = process.env.PORT || 3000;
const app = express();
server(app);

View file

@ -1,10 +1,8 @@
import express from "express";
import { body, validationResult } from "express-validator";
import { PoolClient } from "pg";
import { authenticateRoute, loginAttempt } from "../../auth";
import { query, withClient } from "../../database";
import { query } from "../../database";
import { getMessagesByChannelAfterPage, getMessagesByChannelFirstPage } from "../../database/templates";
import { handle, waitForEvent } from "../../gateway";
import { waitForEvent } from "../../gateway";
import cors from "cors";
import sendMessage from "../../impl";
@ -336,7 +334,7 @@ router.put(
error: "Message body must be a string between 1 and 2000 characters"
});
}
const message = await sendMessage(req.user, channelId, null, req.body.body, null);
const message = await sendMessage(req.user, channelId, null, req.body.body, null, 0);
if (!message) {
return res.status(500).json({
errcode: "M_UNKNOWN",

14
src/routes/uploads.ts Normal file
View file

@ -0,0 +1,14 @@
import express from "express";
import { attachmentUploadDirectory } from "../serverconfig";
const router = express.Router();
router.use("/avatar", express.static("uploads/avatar"));
router.get("/attachment/:id", (req, res) => {
res.download(req.params.id, req.params.id, {
root: attachmentUploadDirectory
});
});
export default router;

122
src/rpc/apis/attachments.ts Normal file
View file

@ -0,0 +1,122 @@
import { bufferSlice, method, string, uint } from "../rpc";
import { query } from "../../database";
import { errors } from "../../errors";
import { UploadTarget, getSafeUploadPath, sanitizeFilename, supportedImageMime } from "../../uploading";
import sharp from "sharp";
import { randomBytes } from "node:crypto";
import path from "node:path";
import { promises as fsPromises } from "node:fs";
import { getMessageById } from "../../database/templates";
import { uploadsMode } from "../../serverconfig";
import { dispatch } from "../../gateway";
import { GatewayPayloadType } from "../../gateway/gatewaypayloadtype";
const fileType = eval("import('file-type')");
method(
"createMessageAttachment",
[uint(), string(2, 128), bufferSlice()],
async (user: User, messageId: number, filenameUnsafe: string, inputBuffer: Buffer) => {
if (inputBuffer.byteLength >= 16777220) {
return { ...errors.BAD_REQUEST, detail: "Uploaded file exceeds 16MiB limit." };
}
const messageCheckResult = await query(getMessageById, [messageId]);
if (!messageCheckResult || messageCheckResult.rowCount < 1) {
return errors.NOT_FOUND;
}
if (messageCheckResult.rows[0].author_id !== user.id && !user.is_superuser) {
return errors.FORBIDDEN_DUE_TO_MISSING_PERMISSIONS;
}
const randomId = randomBytes(60).toString("hex");
const safeFilename = sanitizeFilename(filenameUnsafe);
const filePath = getSafeUploadPath(UploadTarget.Attachment, `${randomId}.${safeFilename}`);
if (!filePath) {
return { ...errors.BAD_REQUEST, detail: "Invalid filename." };
}
const actualFilename = path.basename(filePath);
const filetype = await (await fileType).fileTypeFromBuffer(inputBuffer);
let bufferToUpload = inputBuffer;
let width = null;
let height = null;
if (filetype && supportedImageMime.includes(filetype.mime)) {
try {
const { data, info } = await sharp(inputBuffer, { limitInputPixels: 8000 * 8000 })
.timeout({ seconds: 5 })
.webp({ quality: 80 })
.toBuffer({ resolveWithObject: true });
bufferToUpload = data;
width = info.width;
height = info.height;
} catch (O_o) {
console.error("failed to process image attachment", O_o);
return { ...errors.INTERNAL_ERROR, detail: "An unknown error occurred while processing your image attachment." };
}
}
let fd;
try {
fd = await fsPromises.open(filePath, "wx", uploadsMode);
await fd.writeFile(bufferToUpload);
} catch (O_o) {
console.error("failed to write non-image buffer attachment", O_o);
return { ...errors.INTERNAL_ERROR, detail: "Failed to save attachment." };
} finally {
fd?.close();
}
const attachmentObject = {
id: -1,
type: "file",
owner_id: user.id,
message_id: messageId,
created_at: Date.now().toString(),
file: actualFilename,
file_mime: filetype && filetype.mime ? filetype.mime : "application/octet-stream",
file_size_bytes: bufferToUpload.byteLength,
width,
height,
file_name: safeFilename
};
const createAttachmentResult = await query(
"INSERT INTO message_attachments(type, owner_id, message_id, created_at, file, file_mime, file_size_bytes, width, height, file_name) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id",
[
attachmentObject.type,
attachmentObject.owner_id,
attachmentObject.message_id,
attachmentObject.created_at,
attachmentObject.file,
attachmentObject.file_mime,
attachmentObject.file_size_bytes,
attachmentObject.width,
attachmentObject.height,
attachmentObject.file_name
]
);
if (!createAttachmentResult || createAttachmentResult.rowCount !== 1) {
return errors.GOT_NO_DATABASE_DATA;
}
attachmentObject.id = createAttachmentResult.rows[0].id;
messageCheckResult.rows[0].attachments = [
...(messageCheckResult.rows[0].attachments ?? []),
attachmentObject
];
dispatch(`channel:${messageCheckResult.rows[0].channel_id}`, {
t: GatewayPayloadType.MessageUpdate,
d: messageCheckResult.rows[0]
});
return messageCheckResult.rows[0];
}
);

View file

@ -1,4 +1,3 @@
import express from "express";
import { channelNameRegex, method, int, string, uint, withOptional, withRegexp } from "../rpc";
import { query } from "../../database";
import { getMessagesByChannelFirstPage, getMessagesByChannelPage } from "../../database/templates";
@ -111,9 +110,9 @@ method(
method(
"createChannelMessage",
[uint(), string(1, 4000), withOptional(uint()), withOptional(string(1, 64))],
async (user: User, id: number, content: string, optimistic_id: number | null, nick_username: string | null) => {
return await sendMessage(user, id, optimistic_id, content, nick_username);
[uint(), string(1, 4000), withOptional(uint()), withOptional(string(1, 64)), withOptional(uint())],
async (user: User, id: number, content: string, optimistic_id: number | null, nick_username: string | null, pending_attachments: number | null) => {
return await sendMessage(user, id, optimistic_id, content, nick_username, pending_attachments ?? 0);
}
);

View file

@ -4,29 +4,53 @@ import { getMessageById } from "../../database/templates";
import { errors } from "../../errors";
import { dispatch } from "../../gateway";
import { GatewayPayloadType } from "../../gateway/gatewaypayloadtype";
import { unlink } from "node:fs/promises";
import path from "node:path";
import { UploadTarget, getSafeUploadPath } from "../../uploading";
method(
"deleteMessage",
[uint()],
async (user: User, id: number) => {
const permissionCheckResult = await query("SELECT author_id, channel_id FROM messages WHERE id = $1", [id]);
if (!permissionCheckResult || permissionCheckResult.rowCount < 1) {
const messageCheckResult = await query(getMessageById, [id]);
if (!messageCheckResult || messageCheckResult.rowCount < 1) {
return errors.NOT_FOUND;
}
if (permissionCheckResult.rows[0].author_id !== user.id && !user.is_superuser) {
if (messageCheckResult.rows[0].author_id !== user.id && !user.is_superuser) {
return errors.FORBIDDEN_DUE_TO_MISSING_PERMISSIONS;
}
const message = messageCheckResult.rows[0];
if (message.attachments) {
for (let i = 0; i < message.attachments.length; i++) {
const attachment = message.attachments[i];
if (attachment.type === "file" && attachment.file) {
const fileInfo = getSafeUploadPath(UploadTarget.Attachment, attachment.file);
if (fileInfo) {
const resolved = path.resolve(fileInfo);
try {
await unlink(resolved);
} catch(o_O) {
console.error("Failed to unlink message attachment upon message deletion", o_O);
}
} else {
console.error("Failed to unlink message attachment upon message deletion: got null from getSafeUploadPath. This should not happen.");
}
}
}
}
const result = await query("DELETE FROM messages WHERE id = $1", [id]);
if (!result || result.rowCount < 1) {
return errors.GOT_NO_DATABASE_DATA;
}
dispatch(`channel:${permissionCheckResult.rows[0].channel_id}`, {
dispatch(`channel:${message.channel_id}`, {
t: GatewayPayloadType.MessageDelete,
d: {
id,
channel_id: permissionCheckResult.rows[0].channel_id
channel_id: message.channel_id
}
});

View file

@ -9,15 +9,17 @@ import { randomBytes } from "crypto";
import { unlink } from "fs/promises";
import { GatewayPayloadType } from "../../gateway/gatewaypayloadtype";
import { dispatch } from "../../gateway";
import { supportedImageMime } from "../../uploading";
import { avatarUploadDirectory, disableAccountCreation, superuserKey } from "../../serverconfig";
const fileType = eval("import('file-type')");
const superuserKey = process.env.SUPERUSER_KEY ? hashSync(process.env.SUPERUSER_KEY, 10) : null;
const avatarUploadDirectory = process.env.AVATAR_UPLOADS_DIR ?? "./uploads/avatar";
methodButWarningDoesNotAuthenticate(
"createUser",
[withRegexp(usernameRegex, string(3, 32)), string(8, 1000)],
async (username: string, password: string) => {
if (process.env.DISABLE_ACCOUNT_CREATION === "true") {
if (disableAccountCreation) {
return errors.FEATURE_DISABLED;
}
@ -83,20 +85,6 @@ method(
)
const checkMagic = (buffer: Buffer, magic: number[]) => {
for (let i = 0; i < magic.length; i++) {
try {
if (buffer.readUint8(i) !== magic[i]) {
return false;
}
} catch(O_o) {
return false;
}
}
return true;
};
const profilePictureSizes = [
16, 28, 32, 64, 80, 128, 256
];
@ -110,22 +98,9 @@ method(
return { ...errors.BAD_REQUEST, detail: "Uploaded file exceeds 3MiB limit." };
}
// TODO: maybe get rid of this entirely and give buffer directly to `sharp`?
const supportedFormatMagic = [
[0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A], // PNG
[0xFF, 0xD8, 0xFF], // JPEG
[0x52, 0x49, 0x46, 0x46] // WebP
];
let isSupported = false;
for (let i = 0; i < supportedFormatMagic.length; i++) {
if (checkMagic(buffer, supportedFormatMagic[i])) {
isSupported = true;
break;
}
}
if (!isSupported) {
return { ...errors.BAD_REQUEST, detail: "Unsupported file format. Supported file formats are: png, jpeg, webp." };
const filetype = await (await fileType).fileTypeFromBuffer(buffer);
if (!filetype || !supportedImageMime.includes(filetype.mime)) {
return { ...errors.BAD_REQUEST, detail: "Unsupported file format. Supported file formats are: png, jpeg, webp, avif." };
}
const avatarId = randomBytes(8).toString("hex");
@ -149,7 +124,7 @@ method(
try {
await unlink(path.resolve(path.join(avatarUploadDirectory, filenames[i])));
} catch(o_0) {
console.error("rpc: putUserAvatar: error while removing files (upon error)", o_0);
//console.error("rpc: putUserAvatar: error while removing files (upon error)", o_0);
}
}
return errors.INTERNAL_ERROR;

View file

@ -8,6 +8,8 @@ methodGroup(300);
import "./apis/messages";
methodGroup(400);
import "./apis/communities";
methodGroup(500);
import "./apis/attachments";
console.log("--- begin rpc method map ---")

View file

@ -1,15 +1,16 @@
import express, { Application, ErrorRequestHandler, json } from "express";
import "./rpc";
import uploadsRouter from "./routes/uploads";
import rpcRouter from "./routes/api/v1/rpc";
import matrixRouter from "./routes/matrix";
import { errors } from "./errors";
const ENABLE_MATRIX_LAYER = true;
const ENABLE_MATRIX_LAYER = false;
export default function(app: Application) {
app.use(json());
app.use("/api/v1/rpc", rpcRouter);
app.use("/uploads", express.static("uploads"));
app.use("/uploads", uploadsRouter);
app.use("/", express.static("frontend/public"));
if (ENABLE_MATRIX_LAYER) {
app.use("/", matrixRouter);

View file

@ -1,10 +1,54 @@
import { hashSync } from "bcrypt";
import { existsSync, mkdirSync, statSync } from "node:fs";
import path from "node:path";
export default {
superuserRequirement: {
createChannel: false
}
};
export let port = process.env.PORT ?? 3000;
if (typeof port === "string") {
port = parseInt(port);
}
if (isNaN(port) || !isFinite(port) || port <= 0) {
console.error(`provided PORT in process environment is not valid, exiting... (got '${port}')`);
process.exit(1);
}
export const jwtSecret = process.env.JWT_SECRET ?? "";
if (!jwtSecret || jwtSecret === "") {
console.error("JWT_SECRET not found in process environment, exiting...");
process.exit(1);
}
export const maxBufferByteLength = 8388608; // 8MiB
export const maxGatewayJsonStringLength = 4500;
export const maxGatewayJsonStringByteLength = maxGatewayJsonStringLength * 2; // https://developer.mozilla.org/en-US/docs/Web/API/TextEncoder/encodeInto#buffer_sizing
export const maxGatewayPayloadByteLength = maxBufferByteLength + maxGatewayJsonStringByteLength + 4;
export const superuserKey = process.env.SUPERUSER_KEY ? hashSync(process.env.SUPERUSER_KEY, 10) : null;
export const disableAccountCreation = (process.env.DISABLE_ACCOUNT_CREATION === "true");
export const uploadsMode = 0o600;
export const uploadsDirectoryMode = 0o700;
export const avatarUploadDirectory = path.resolve(process.env.AVATAR_UPLOADS_DIR ?? "./uploads/avatar");
export const attachmentUploadDirectory = path.resolve(process.env.ATTACHMENT_UPLOADS_DIR ?? "./uploads/attachment");
const ensureDirectory = (dir: string) => {
const stats = statSync(dir, { throwIfNoEntry: false });
if (!stats) {
mkdirSync(dir, { mode: uploadsDirectoryMode });
return;
}
if (!stats.isDirectory()) {
throw new Error(`Expected path ${dir} to point to a directory`);
}
};
const ensureDirectories = () => {
ensureDirectory(avatarUploadDirectory);
ensureDirectory(attachmentUploadDirectory);
};
ensureDirectories();

View file

@ -1,11 +0,0 @@
interface GatewayClientState {
user?: User;
ready: boolean,
alive: boolean,
lastAliveCheck: number,
dispatchChannels: Set<string>,
messagesSinceLastCheck: number,
bridgesTo?: string
privacy?: string,
terms?: string,
}

7
src/types/ws.d.ts vendored
View file

@ -1,7 +0,0 @@
import ws from 'ws';
declare module 'ws' {
export interface WebSocket extends ws {
state: GatewayClientState;
}
}

75
src/uploading.ts Normal file
View file

@ -0,0 +1,75 @@
import path from "node:path";
import { attachmentUploadDirectory, avatarUploadDirectory } from "./serverconfig";
export const supportedImageMime = [
"image/jpeg",
"image/png",
"image/webp",
"image/avif",
];
// Thanks: https://github.com/parshap/node-sanitize-filename/blob/209c39b914c8eb48ee27bcbde64b2c7822fdf3de/index.js
const replacements = [
/[\/\?<>\\:\*\|"]/g,
/[\x00-\x1f\x80-\x9f]/g,
/^\.+$/,
/^(con|prn|aux|nul|com[0-9]|lpt[0-9])(\..*)?$/i,
/[\. ]+$/,
];
export function sanitizeFilename(filename: string): string | null {
if (filename.length > 256) {
return null;
}
for (let i = 0; i < replacements.length; i++) {
filename.replace(replacements[i], "");
}
if (!filename.length) {
return null;
}
return filename;
}
export enum UploadTarget {
Avatar,
Attachment
}
export function getSafeUploadPath(target: UploadTarget, filenameUnsafe: string) {
let base;
switch (target) {
case UploadTarget.Avatar: {
base = avatarUploadDirectory;
break;
}
case UploadTarget.Attachment: {
base = attachmentUploadDirectory;
break;
}
default: {
return null;
}
}
if (typeof base !== "string" || typeof filenameUnsafe !== "string") {
return null;
}
const sanitized = sanitizeFilename(filenameUnsafe);
if (!sanitized) {
return null;
}
if (sanitized.indexOf('\0') !== -1 || sanitized.indexOf('%') !== -1 || sanitized.indexOf('/') !== -1 || sanitized.indexOf('..') !== -1) {
console.error("getSafeUploadPath: attempted path traversal or illegal path, this should not be possible.");
return null;
}
const joined = path.join(base, sanitized);
if (joined.indexOf(base) !== 0) {
console.error("getSafeUploadPath: attempted path traversal or illegal path, this should not be possible.");
return null;
}
return path.resolve(joined);
}

View file

@ -7,6 +7,7 @@
"module": "commonjs",
"esModuleInterop": true,
"strict": true,
"skipLibCheck": true,
"outDir": "./dist",
}
}

835
yarn.lock

File diff suppressed because it is too large Load diff