add initial implementation of server signal broadcast

This commit is contained in:
hippoz 2023-01-15 04:32:51 +02:00
parent 6d9f9c365c
commit adc299c847
Signed by: hippoz
GPG key ID: 56C4E02A85F2FBED
3 changed files with 180 additions and 4 deletions

View file

@ -314,9 +314,8 @@ int bus_broadcast_message(bus_t *s, wire_message_t *msg, wire_context_t *ctx, ch
TRYST(wire_compose_unicast_reply(reply_ctx, ctx, msg, sender_unique_name)); TRYST(wire_compose_unicast_reply(reply_ctx, ctx, msg, sender_unique_name));
TRYST(send(c->fd, reply_ctx->data, reply_ctx->byte_cursor, 0)); TRYST(send(c->fd, reply_ctx->data, reply_ctx->byte_cursor, 0));
ctx->byte_cursor = previous_cursor; ctx->byte_cursor = previous_cursor;
if (match_left) {
memset(reply_ctx->data, 0, reply_ctx->data_len); memset(reply_ctx->data, 0, reply_ctx->data_len);
} break;
} }
} }
} }
@ -327,6 +326,35 @@ int bus_broadcast_message(bus_t *s, wire_message_t *msg, wire_context_t *ctx, ch
return 0; return 0;
} }
int bus_broadcast_signal(bus_t *s, wire_context_t *ctx, wire_message_t *msg)
{
int left = s->clients_count;
for (int i = 0; i < BUS_MAX_CLIENTS && left > 0; i++) {
if (s->clients[i].match_count <= 0) {
continue;
}
left--;
bus_client_t *c = &s->clients[i];
int match_left = c->match_count;
for (int j = 0; j < BUS_MAX_MATCH && match_left > 0; j++) {
if (!c->matches[j]) {
continue;
}
match_left--;
if (match_rule_check(c->matches[j], msg, ctx) >= 0) {
uint32_t previous_cursor = ctx->byte_cursor;
TRYST(send(c->fd, ctx->data, ctx->byte_cursor, 0));
ctx->byte_cursor = previous_cursor;
break;
}
}
}
return 0;
}
int bus_unicast_message(bus_t *s, wire_message_t *msg, wire_context_t *ctx, char *target_name, char *sender_unique_name, wire_context_t *reply_ctx) int bus_unicast_message(bus_t *s, wire_message_t *msg, wire_context_t *ctx, char *target_name, char *sender_unique_name, wire_context_t *reply_ctx)
{ {
bus_client_t *target = TRYPTR(bus_name_find_client(s, target_name)); bus_client_t *target = TRYPTR(bus_name_find_client(s, target_name));
@ -335,6 +363,25 @@ int bus_unicast_message(bus_t *s, wire_message_t *msg, wire_context_t *ctx, char
return 0; return 0;
} }
#define _signal_begin(M_sig, M_member) \
uint32_t *signal_body_length = NULL; \
uint32_t signal_body_start = 0; \
uint8_t signal_reply_data[8192]; \
memset(signal_reply_data, 0, 8192); \
wire_context_t signal_reply_ctx = { \
.byte_cursor = 0, \
.data = signal_reply_data, \
.data_len = 8192, \
}; \
wire_message_t signal_reply_msg = {}; \
TRYST(wire_compose_signal(&signal_reply_ctx, &signal_reply_msg, (M_sig), (M_member), &signal_body_length)); \
signal_body_start = signal_reply_ctx.byte_cursor;
#define _signal_end() \
*signal_body_length = signal_reply_ctx.byte_cursor - signal_body_start; \
TRYST(bus_broadcast_signal(s, &signal_reply_ctx, &signal_reply_msg)); \
#define _reply_begin(M_sig) \ #define _reply_begin(M_sig) \
uint32_t *body_length = NULL; \ uint32_t *body_length = NULL; \
uint32_t body_start = 0; \ uint32_t body_start = 0; \
@ -347,7 +394,7 @@ int bus_unicast_message(bus_t *s, wire_message_t *msg, wire_context_t *ctx, char
if (send(s->fds[i].fd, reply_ctx->data, reply_ctx->byte_cursor, 0) != reply_ctx->byte_cursor) { \ if (send(s->fds[i].fd, reply_ctx->data, reply_ctx->byte_cursor, 0) != reply_ctx->byte_cursor) { \
return -1; \ return -1; \
} \ } \
printf("send: sent %d bytes!\n", reply_ctx->byte_cursor); \ printf("reply_end: send: sent %d bytes!\n", reply_ctx->byte_cursor); \
} /* if (!(msg.flags & DBUS_FLAG_NO_REPLY_EXPECTED)) */ } /* if (!(msg.flags & DBUS_FLAG_NO_REPLY_EXPECTED)) */
#define _reply_error(message) \ #define _reply_error(message) \
@ -374,7 +421,13 @@ int handle_hello(bus_t *s, int i, wire_message_t *msg, wire_context_t *ctx, wire
} }
int handle_request_name(bus_t *s, int i, wire_message_t *msg, wire_context_t *ctx, wire_context_t *reply_ctx) { int handle_request_name(bus_t *s, int i, wire_message_t *msg, wire_context_t *ctx, wire_context_t *reply_ctx) {
if (s->clients[i].unique_name_index < 0) {
return -1;
}
char *name = TRYPTR(wire_get_name_string(ctx)); char *name = TRYPTR(wire_get_name_string(ctx));
TRYPTR(wire_get_u32(ctx)); // unused flags
char *name_str = TRYPTR(string_dup(name)); char *name_str = TRYPTR(string_dup(name));
int status_code = DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER; int status_code = DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER;
@ -390,6 +443,15 @@ int handle_request_name(bus_t *s, int i, wire_message_t *msg, wire_context_t *ct
} _reply_end() } _reply_end()
if (status_code == DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) { if (status_code == DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
_signal_begin("sss", "NameOwnerChanged") {
/* Name with a new owner */
TRYPTR(wire_set_string(&signal_reply_ctx, name_str));
/* Old owner or empty string if none */
TRYPTR(wire_set_string(&signal_reply_ctx, ""));
/* New owner or empty string if none */
TRYPTR(wire_set_string(&signal_reply_ctx, s->names[s->clients[i].unique_name_index].name));
} _signal_end();
printf("client '%s' (index=%d) now owns name '%s'\n", s->names[s->clients[i].unique_name_index].name, i, name_str); printf("client '%s' (index=%d) now owns name '%s'\n", s->names[s->clients[i].unique_name_index].name, i, name_str);
} }
return 0; return 0;

113
wire.c
View file

@ -174,6 +174,119 @@ int wire_parse_message(wire_context_t *c, wire_message_t *msg)
return 0; return 0;
} }
int wire_compose_signal(wire_context_t *c, wire_message_t *out_msg, const char *signature, const char *member, uint32_t **out_body_length)
{
out_msg->endianness = 'l';
TRYPTR(wire_set_u8(c, 'l')); /* endianness */
out_msg->type = DBUS_MESSAGE_SIGNAL;
TRYPTR(wire_set_u8(c, DBUS_MESSAGE_SIGNAL)); /* type */
out_msg->flags = 0 | DBUS_FLAG_NO_REPLY_EXPECTED;
TRYPTR(wire_set_u8(c, out_msg->flags)); /* flags */
out_msg->protocol_version = DBUS_PROTOCOL_VERSION;
TRYPTR(wire_set_u8(c, DBUS_PROTOCOL_VERSION)); /* protocol_version */
uint32_t *body_length = TRYPTR(wire_set_u32(c, 0)); /* body length */
TRYPTR(wire_set_u32(c, 1)); /* serial */
uint32_t *header_fields_length = TRYPTR(wire_set_u32(c, 0)); /* header_fields_length */
uint32_t header_fields_start = c->byte_cursor;
/* header fields */
{
/* SENDER */
{
out_msg->fields[DBUS_HEADER_FIELD_SENDER].t.str = "org.freedesktop.DBus";
out_msg->fields[DBUS_HEADER_FIELD_SENDER].present = true;
/* byte (field code) */
TRYPTR(wire_set_u8(c, DBUS_HEADER_FIELD_SENDER));
/* variant */
/* signature */
TRYPTR(wire_set_signature(c, "s"));
/* string */
TRYPTR(wire_set_string(c, "org.freedesktop.DBus"));
// need to align to 8 byte boundry after each array element?
TRYPTR(wire_write_align(c, 8));
}
/* INTERFACE */
{
out_msg->fields[DBUS_HEADER_FIELD_INTERFACE].t.str = "org.freedesktop.DBus";
out_msg->fields[DBUS_HEADER_FIELD_INTERFACE].present = true;
/* byte (field code) */
TRYPTR(wire_set_u8(c, DBUS_HEADER_FIELD_INTERFACE));
/* variant */
/* signature */
TRYPTR(wire_set_signature(c, "s"));
/* string */
TRYPTR(wire_set_string(c, "org.freedesktop.DBus"));
// need to align to 8 byte boundry after each array element?
TRYPTR(wire_write_align(c, 8));
}
/* PATH */
{
out_msg->fields[DBUS_HEADER_FIELD_PATH].t.str = "/org/freedesktop/DBus";
out_msg->fields[DBUS_HEADER_FIELD_PATH].present = true;
/* byte (field code) */
TRYPTR(wire_set_u8(c, DBUS_HEADER_FIELD_PATH));
/* variant */
/* signature */
TRYPTR(wire_set_signature(c, "o"));
/* string */
TRYPTR(wire_set_string(c, "/org/freedesktop/DBus"));
// need to align to 8 byte boundry after each array element?
TRYPTR(wire_write_align(c, 8));
}
/* MEMBER */
{
out_msg->fields[DBUS_HEADER_FIELD_MEMBER].t.str = (char*)member;
out_msg->fields[DBUS_HEADER_FIELD_MEMBER].present = true;
/* byte (field code) */
TRYPTR(wire_set_u8(c, DBUS_HEADER_FIELD_MEMBER));
/* variant */
/* signature */
TRYPTR(wire_set_signature(c, "s"));
/* string */
TRYPTR(wire_set_string(c, member));
// need to align to 8 byte boundry after each array element?
TRYPTR(wire_write_align(c, 8));
}
/* SIGNATURE */
{
out_msg->fields[DBUS_HEADER_FIELD_SIGNATURE].t.str = (char*)signature;
out_msg->fields[DBUS_HEADER_FIELD_SIGNATURE].present = true;
/* byte (field code) */
TRYPTR(wire_set_u8(c, DBUS_HEADER_FIELD_SIGNATURE));
/* variant */
/* signature */
TRYPTR(wire_set_signature(c, "g"));
/* signature */
TRYPTR(wire_set_signature(c, signature));
}
}
*header_fields_length = c->byte_cursor - header_fields_start;
// header ends on an 8 byte alignment
TRYPTR(wire_write_align(c, 8));
if (out_body_length) {
*out_body_length = body_length;
}
return 0;
}
int wire_compose_reply(wire_context_t *c, wire_message_t *msg, const char *signature, uint32_t **out_body_length) int wire_compose_reply(wire_context_t *c, wire_message_t *msg, const char *signature, uint32_t **out_body_length)
{ {

1
wire.h
View file

@ -128,6 +128,7 @@ int wire_parse_message(wire_context_t *c, wire_message_t *msg);
int wire_compose_reply(wire_context_t *c, wire_message_t *msg, const char *signature, uint32_t **out_body_length); int wire_compose_reply(wire_context_t *c, wire_message_t *msg, const char *signature, uint32_t **out_body_length);
int wire_compose_error(wire_context_t *c, wire_message_t *msg, const char *error_name); int wire_compose_error(wire_context_t *c, wire_message_t *msg, const char *error_name);
int wire_compose_unicast_reply(wire_context_t *c, wire_context_t *msg_c, wire_message_t *msg, char *sender_unique_name); int wire_compose_unicast_reply(wire_context_t *c, wire_context_t *msg_c, wire_message_t *msg, char *sender_unique_name);
int wire_compose_signal(wire_context_t *c, wire_message_t *out_msg, const char *signature, const char *member, uint32_t **out_body_length);
int wire_collect_strings(wire_context_t *c, wire_message_t *msg, wire_message_body_string_t *strings, int strings_count); int wire_collect_strings(wire_context_t *c, wire_message_t *msg, wire_message_body_string_t *strings, int strings_count);
static inline char *wire_get_signature(wire_context_t *c) { static inline char *wire_get_signature(wire_context_t *c) {