From ff8da81aeface9c88894a8c81fa6274473d9ac75 Mon Sep 17 00:00:00 2001 From: hippoz <10706925-hippoz@users.noreply.gitlab.com> Date: Fri, 16 Sep 2022 20:32:53 +0300 Subject: [PATCH] implement basic channel system --- main.c | 225 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 188 insertions(+), 37 deletions(-) diff --git a/main.c b/main.c index 79b26d2..8150cdd 100644 --- a/main.c +++ b/main.c @@ -1,32 +1,56 @@ #include #include +#include #include #include #include #include +#include -#define MAX_CLIENTS 512 +#define MAX_CHANNELS 128 +#define MAX_CLIENTS 128 +#define MAX_CLIENTS_PER_CHANNEL 64 +#define CHANNEL_NAME_LENGTH 28 #define BACKLOG 10 #define DIE(info) ({perror((info)); exit(EXIT_FAILURE);}) static const char *socket_path = "/tmp/plumb-unix0"; -struct plumb_client { +struct client { int fd; }; -struct plumb_server { - int socket_fd; +struct channel { + char name[CHANNEL_NAME_LENGTH]; + int clients[MAX_CLIENTS_PER_CHANNEL]; + int clients_count; +}; + +struct server { + int sock_fd; int fd_num; - struct plumb_client clients[MAX_CLIENTS]; + struct channel channels[MAX_CHANNELS]; + struct client clients[MAX_CLIENTS]; struct pollfd fds[MAX_CLIENTS + 1]; }; -int plumb_server_client_add(struct plumb_server *s, int fd) +// https://en.wikipedia.org/wiki/Fowler-Noll-Vo_hash_function#FNV-1a_hash +uint64_t hashmap_hash(const char *bytes, size_t bytes_n, size_t map_len) { - printf("adding new connection, fd=%d\n", fd); + uint64_t hash = 0xcbf29ce484222325; + for (size_t i = 0; i < bytes_n; i++) { + hash *= 0x100000001b3; + hash ^= bytes[i]; + } + + return (hash % map_len); +} + + +int server_client_add(struct server *s, int fd) +{ for (int i = 0; i < MAX_CLIENTS; i++) { if (s->clients[i].fd < 0) { s->clients[i].fd = fd; @@ -38,13 +62,8 @@ int plumb_server_client_add(struct plumb_server *s, int fd) return -1; } -void plumb_server_client_remove(struct plumb_server *s, int i) +void server_client_remove(struct server *s, int i) { - if (i >= MAX_CLIENTS) - return; - - printf("removing client i=%d\n", i); - if (s->clients[i].fd >= 0) { close(s->clients[i].fd); } @@ -52,26 +71,120 @@ void plumb_server_client_remove(struct plumb_server *s, int i) s->fds[i].fd = -1; s->fds[i].events = 0; s->fds[i].revents = 0; + + for (int channel = 0; channel < MAX_CHANNELS; channel++) { + for (int client = 0; client < MAX_CLIENTS_PER_CHANNEL; client++) { + if (s->channels[channel].clients[client] == i) { + s->channels[channel].clients[client] = -1; + s->channels[channel].clients_count--; + } + } + } } +void server_client_error(struct server *s, int i, const char *msg) +{ + send(s->clients[i].fd, msg, strlen(msg), 0); + server_client_remove(s, i); +} -void plumb_server_free(struct plumb_server *s) +int server_channel_create(struct server *s, const char name[CHANNEL_NAME_LENGTH]) +{ + uint64_t bucket = hashmap_hash(name, CHANNEL_NAME_LENGTH, MAX_CHANNELS); + + for (uint64_t i = bucket; i < bucket + 12 && i < MAX_CHANNELS; i++) { + if (s->channels[i].clients_count == 0) { + strncpy(s->channels[i].name, name, CHANNEL_NAME_LENGTH); + return i; + } + } + + return -1; +} + +int server_channel_find(struct server *s, const char name[CHANNEL_NAME_LENGTH]) +{ + uint64_t bucket = hashmap_hash(name, CHANNEL_NAME_LENGTH, MAX_CHANNELS); + + for (uint64_t i = bucket; i < bucket + 12 && i < MAX_CHANNELS; i++) { + if (strncmp(s->channels[i].name, name, CHANNEL_NAME_LENGTH) == 0) { + return i; + } + } + + return -1; +} + +int server_channel_find_or_create(struct server *s, const char name[CHANNEL_NAME_LENGTH]) +{ + int found = server_channel_find(s, name); + return found == -1 ? server_channel_create(s, name) : found; +} + +int server_channel_add_client(struct server *s, int channel_index, int client_index) +{ + int *clients = s->channels[channel_index].clients; + for (int i = 0; i < MAX_CLIENTS_PER_CHANNEL; i++) { + if (clients[i] == client_index) { + return -1; + } else if (clients[i] == -1) { // empty slot + clients[i] = client_index; + s->channels[channel_index].clients_count++; + return 0; + } + } + + return -1; +} + +int server_channel_remove_client(struct server *s, int channel_index, int client_index) +{ + int *clients = s->channels[channel_index].clients; + for (int i = 0; i < MAX_CLIENTS_PER_CHANNEL; i++) { + if (clients[i] == client_index) { + clients[i] = -1; + s->channels[channel_index].clients_count--; + return 0; + } + } + + return -1; +} + +int server_channel_send(struct server *s, int ci, const void *buf, size_t buf_size) { + int *clients = s->channels[ci].clients; + int remaining = s->channels[ci].clients_count; + + for (int i = 0; i < MAX_CLIENTS_PER_CHANNEL; i++) { + if (remaining <= 0) { + break; + } + if (clients[i] >= 0) { + send(s->clients[clients[i]].fd, buf, buf_size, 0); + remaining--; + } + } + + return 0; +} + +void server_free(struct server *s) { if (!s) return; - if (s->socket_fd) close(s->socket_fd); + if (s->sock_fd) close(s->sock_fd); free(s); } -struct plumb_server *plumb_server_create() +struct server *server_create() { - struct plumb_server *s = malloc(sizeof(struct plumb_server)); + struct server *s = malloc(sizeof(struct server)); if (s == NULL) { DIE("malloc"); return NULL; } - s->socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (s->socket_fd == -1) { + s->sock_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (s->sock_fd == -1) { free(s); DIE("socket"); return NULL; @@ -82,15 +195,15 @@ struct plumb_server *plumb_server_create() name.sun_family = AF_UNIX; strncpy(name.sun_path, socket_path, sizeof(name.sun_path) - 1); - if (bind(s->socket_fd, (const struct sockaddr *)&name, sizeof(name)) == -1) { + if (bind(s->sock_fd, (const struct sockaddr *)&name, sizeof(name)) == -1) { free(s); - close(s->socket_fd); + close(s->sock_fd); DIE("bind"); return NULL; } - if (listen(s->socket_fd, BACKLOG) == -1) { + if (listen(s->sock_fd, BACKLOG) == -1) { free(s); - close(s->socket_fd); + close(s->sock_fd); DIE("listen"); return NULL; } @@ -102,31 +215,40 @@ struct plumb_server *plumb_server_create() s->fds[i].revents = 0; } - s->fds[MAX_CLIENTS].fd = s->socket_fd; - s->fds[MAX_CLIENTS].events = POLLIN; + for (int i = 0; i < MAX_CHANNELS; i++) { + memset(s->channels[i].name, 0, sizeof(s->channels[i].name)); + memset(s->channels[i].clients, -1, sizeof(s->channels[i].clients)); + s->channels[i].clients_count = 0; + } + s->fds[MAX_CLIENTS].fd = s->sock_fd; + s->fds[MAX_CLIENTS].events = POLLIN; s->fd_num = MAX_CLIENTS + 1; return s; } -int plumb_server_turn(struct plumb_server *s) +int server_turn(struct server *s) { if (poll(s->fds, s->fd_num, -1) < 0) { DIE("poll"); return -1; } +#ifndef _client_check +#define _client_check(expr, msg) ({ if((expr)){server_client_error((s),(i),(msg));continue;} }) +#endif + for (int i = 0; i < s->fd_num; i++) { if (s->fds[i].revents & POLLIN) { // file descriptor ready for reading - if (s->fds[i].fd == s->socket_fd) { // new connection - int fd = accept(s->socket_fd, NULL, NULL); + if (s->fds[i].fd == s->sock_fd) { // new connection + int fd = accept(s->sock_fd, NULL, NULL); if (fd == -1) { DIE("accept"); return -1; } - if (plumb_server_client_add(s, fd) == -1) { + if (server_client_add(s, fd) == -1) { close(fd); continue; } @@ -138,34 +260,63 @@ int plumb_server_turn(struct plumb_server *s) DIE("recv"); return -1; } else if (bytes == 0) { // client disconnected - plumb_server_client_remove(s, i); + server_client_remove(s, i); } else { - send(s->fds[i].fd, data, sizeof(data), 0); + const char *command = strtok(data, " "); + if (strcmp(command, "listen") == 0) { + char channel_name[CHANNEL_NAME_LENGTH]; + strncpy(channel_name, strtok(NULL, " "), CHANNEL_NAME_LENGTH); + _client_check(*channel_name == '\0', "Invalid channel name"); + + int channel_id = server_channel_find_or_create(s, channel_name); + _client_check(channel_id < 0, "Could not find or create channel"); + + _client_check(server_channel_add_client(s, channel_id, i) < 0, "Could not join channel"); + } else if (strcmp(command, "send") == 0) { + char channel_name[CHANNEL_NAME_LENGTH]; + strncpy(channel_name, strtok(NULL, " "), CHANNEL_NAME_LENGTH); + _client_check(*channel_name == '\0', "Invalid channel name"); + + const char *payload = strtok(NULL, " "); + _client_check(payload == NULL || *channel_name == '\0', "Invalid channel name"); + + int channel_id = server_channel_find(s, channel_name); + _client_check(channel_id < 0, "Could not find channel"); + + _client_check(server_channel_send(s, channel_id, payload, strlen(payload)) < 0, "Could not send message"); + } else { + server_client_error(s, i, "Unknown command"); + continue; + } } } } } +#ifdef _client_check +#undef _client_check +#endif + return 0; } int main() { - struct plumb_server *srv = plumb_server_create(); + struct server *srv = server_create(); if (srv == NULL) { - fprintf(stderr, "plumb_server_create failed\n"); + fprintf(stderr, "server_create failed\n"); return EXIT_FAILURE; } while (1) { - if (plumb_server_turn(srv) < 0) { - fprintf(stderr, "plumb_server_turn failed\n"); - plumb_server_free(srv); + if (server_turn(srv) < 0) { + fprintf(stderr, "server_turn failed\n"); + server_free(srv); return EXIT_FAILURE; } } - plumb_server_free(srv); + server_free(srv); return EXIT_SUCCESS; }