add broken webrtc media server and experiment system

This commit is contained in:
hippoz 2021-05-21 01:33:47 +03:00
parent 72c0b3f62e
commit dffba219d6
No known key found for this signature in database
GPG key ID: 7C52899193467641
6 changed files with 108 additions and 6 deletions

View file

@ -1,7 +1,9 @@
const websockets = require("ws"); const websockets = require("ws");
const EventEmitter = require("events"); const EventEmitter = require("events");
const uuid = require("uuid"); const uuid = require("uuid");
const werift = require("werift");
const { experiments } = require("../../../experiments");
const User = require("../../../models/User"); const User = require("../../../models/User");
const Channel = require("../../../models/Channel"); const Channel = require("../../../models/Channel");
const { parseMessage, opcodeSeparator, getOpcodeByName } = require("./messageparser"); const { parseMessage, opcodeSeparator, getOpcodeByName } = require("./messageparser");
@ -9,6 +11,31 @@ const { checkToken } = require("../../../common/auth/authfunctions");
const pingCheckDelay = 10000; const pingCheckDelay = 10000;
class GatewayRTCConnection {
constructor(ws, server) {
this.ws = ws;
this.server = server;
this.connection = new werift.RTCPeerConnection();
this.connection.onnegotiationneeded.subscribe(() => this.renegotiate);
}
async answer(offer) {
await this.connection.setRemoteDescription(offer);
await this.connection.setLocalDescription(await this.connection.createAnswer());
return this.connection.localDescription;
}
async addTrack(track) {
if (!track) return;
this.connection.addTrack(track);
}
async renegotiate() {
this.ws.send(this.server.packet("EVENT_RENEGOTIATE_REQUIRED", {}));
}
}
class GatewayServer extends EventEmitter { class GatewayServer extends EventEmitter {
constructor({ server }) { constructor({ server }) {
super(); super();
@ -63,7 +90,7 @@ class GatewayServer extends EventEmitter {
ws.channels = channels.map(x => x._id); ws.channels = channels.map(x => x._id);
ws.send(this.packet("YOO_ACK", { session_id: ws.session.sessionId, channels, user: { username: user.username, _id: user._id } })); ws.send(this.packet("YOO_ACK", { session_id: ws.session.sessionId, channels, user: { username: user.username, _id: user._id }, __global_experiments: experiments }));
console.log(`gateway: user ${user.username}: handshake complete`); console.log(`gateway: user ${user.username}: handshake complete`);
} catch (e) { } catch (e) {
console.log("gateway:", e); console.log("gateway:", e);
@ -97,6 +124,56 @@ class GatewayServer extends EventEmitter {
break; break;
} }
case "ACTION_VOICE_REQUEST_SESSION": {
if (!experiments.voiceSFUTesting) return;
// just send ourselves as the voice server lol
ws.send(this.packet("EVENT_VOICE_ASSIGN_SERVER", {
reportTo: "/gateway",
channel: message.data.channel
}));
break;
}
case "ACTION_VOICE_CONNECTION_REQUEST": {
if (!this.authMessage(ws)) return;
if (!experiments.voiceSFUTesting) return;
const offer = message.data.offer;
const channel = message.data.channel;
if (!channel || !channel._id) throw new Error("Client did not send a valid channel in ACTION_VOICE_CONNECTION_REQUEST");
if (!offer) throw new Error("Client did not send any offer in ACTION_VOICE_CONNECTION_REQUEST");
let isNewConnection = true;
if (ws.rtc) {
isNewConnection = false;
} else {
ws.rtc = new GatewayRTCConnection(ws, this);
}
ws.send(this.packet("EVENT_VOICE_CONNECTION_ANSWER", {
answer: await ws.rtc.answer(offer)
}));
if (isNewConnection) {
this.inChannel(channel._id, (otherWs) => {
//if (!this.clientReady(otherWs)) return;
if (!this.clientRTCReady(otherWs)) return;
if (otherWs.session.user._id === ws.session.user._id) return; // Don't perform the actions below on ourselves
const otherReceivers = otherWs.rtc.connection.getReceivers();
for (let i = 0; i < otherReceivers.length; i++) {
const otherRecevier = otherReceivers[i];
ws.rtc.addTrack(otherRecevier.tracks[0]);
}
const myReceviers = ws.rtc.connection.getReceivers();
for (let i = 0; i < myReceviers.length; i++) {
const myReceiver = myReceviers[i];
otherWs.rtc.addTrack(myReceiver.tracks[0]);
}
otherWs.rtc.renegotiate();
});
ws.rtc.renegotiate();
}
}
} }
} catch(e) { } catch(e) {
console.error("gateway:", e); console.error("gateway:", e);
@ -113,10 +190,20 @@ GatewayServer.prototype.broadcast = function(channelId, data) {
}); });
}; };
GatewayServer.prototype.inChannel = function(channelId, func) {
this.wss.clients.forEach((client) => {
if (this.clientReady(client) && client.channels.includes(channelId)) func(client);
});
};
GatewayServer.prototype.clientReady = function(ws) { GatewayServer.prototype.clientReady = function(ws) {
return ws.readyState === websockets.OPEN && ws.session && ws.session.authenticated; return ws.readyState === websockets.OPEN && ws.session && ws.session.authenticated;
}; };
GatewayServer.prototype.clientRTCReady = function(ws) {
return ws.rtc; // TODO: add more checks
};
GatewayServer.prototype.authMessage = function(ws) { GatewayServer.prototype.authMessage = function(ws) {
if (!this.clientReady(ws)) { if (!this.clientReady(ws)) {
ws.close(4007, "Not authenticated."); ws.close(4007, "Not authenticated.");

View file

@ -3,7 +3,12 @@ const opcodes = {
1: { name: "YOO", data: "JSON" }, 1: { name: "YOO", data: "JSON" },
2: { name: "YOO_ACK", data: "JSON" }, 2: { name: "YOO_ACK", data: "JSON" },
3: { name: "ACTION_CREATE_MESSAGE", data: "JSON" }, 3: { name: "ACTION_CREATE_MESSAGE", data: "JSON" },
4: { name: "EVENT_CREATE_MESSAGE", data: "JSON" } 4: { name: "EVENT_CREATE_MESSAGE", data: "JSON" },
21: { name: "ACTION_VOICE_REQUEST_SESSION", data: "JSON" },
22: { name: "EVENT_VOICE_ASSIGN_SERVER", data: "JSON" },
23: { name: "ACTION_VOICE_CONNECTION_REQUEST", data: "JSON" },
24: { name: "EVENT_VOICE_CONNECTION_ANSWER", data: "JSON" },
25: { name: "EVENT_RENEGOTIATE_REQUIRED", data: "JSON" }
}; };
const opcodeSeparator = "@"; const opcodeSeparator = "@";

View file

@ -3,7 +3,11 @@ const opcodes = {
1: { name: "YOO", data: "JSON" }, 1: { name: "YOO", data: "JSON" },
2: { name: "YOO_ACK", data: "JSON" }, 2: { name: "YOO_ACK", data: "JSON" },
3: { name: "ACTION_CREATE_MESSAGE", data: "JSON" }, 3: { name: "ACTION_CREATE_MESSAGE", data: "JSON" },
4: { name: "EVENT_CREATE_MESSAGE", data: "JSON" } 4: { name: "EVENT_CREATE_MESSAGE", data: "JSON" },
21: { name: "ACTION_VOICE_REQUEST_SESSION", data: "JSON" },
22: { name: "EVENT_VOICE_ASSIGN_SERVER", data: "JSON" },
23: { name: "ACTION_VOICE_CONNECTION_REQUEST", data: "JSON" },
24: { name: "EVENT_VOICE_CONNECTION_ANSWER", data: "JSON" }
}; };
const opcodeSeparator = "@"; const opcodeSeparator = "@";

6
brainlet/experiments.js Normal file
View file

@ -0,0 +1,6 @@
module.exports = {
experiments: {
voiceSFUTesting: false,
userListTest: false
}
};

View file

@ -1,5 +1,5 @@
module.exports = { module.exports = {
jwtPrivateKey: "KEY" jwtPrivateKey: "KjEY",
}; };
// Set default values // Set default values

View file

@ -168,7 +168,7 @@ JSON data format:
Voice server signaling is done through a websocket gateway. This gateway is specified by the `reportTo` property in EVENT_VOICE_ASSIGN_SERVER. Voice server signaling is done through a websocket gateway. This gateway is specified by the `reportTo` property in EVENT_VOICE_ASSIGN_SERVER.
## 50:ACTION_VOICE_CONNECTION_REQUEST ## 23:ACTION_VOICE_CONNECTION_REQUEST
*Client to server* *Client to server*
@ -178,7 +178,7 @@ Voice server signaling is done through a websocket gateway. This gateway is spec
| channel | An object that contains "_id", the id of the channel to connect to | | channel | An object that contains "_id", the id of the channel to connect to |
| offer | An SDP payload; the webrtc offer from the client | | offer | An SDP payload; the webrtc offer from the client |
## 51:EVENT_VOICE_CONNECTION_ANSWER ## 24:EVENT_VOICE_CONNECTION_ANSWER
*Auth required, Server to client* *Auth required, Server to client*