organize server code into multiple files

This commit is contained in:
hippoz 2021-11-04 02:41:34 +02:00
parent c18ae0a1ab
commit 1626cd99e5
No known key found for this signature in database
GPG key ID: 7C52899193467641
5 changed files with 138 additions and 131 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
frontend/node_modules/
frontend/dist/
__pycache__/

80
InputController.py Normal file
View file

@ -0,0 +1,80 @@
from pynput.mouse import Controller as MouseController
from pynput.keyboard import Controller as KeyboardController
import constants as c
from MessageParser import MessageParser
class InputController():
def __init__(self):
self.mouse_controller = MouseController()
self.keyboard_controller = KeyboardController()
self.parser = MessageParser()
# Keyboard key press
self.parser.add_handler("k", {
"key": "str"
})
# Relative mouse movement
self.parser.add_handler("r", {
"x": "int",
"y": "int"
})
# Mouse relative scroll
self.parser.add_handler("s", {
"x": "float",
"y": "float"
})
# Mouse button down
self.parser.add_handler("d", {
"button": "int"
})
# Mouse button up
self.parser.add_handler("u", {
"button": "int"
})
# Mouse button click
self.parser.add_handler("c", {
"button": "int"
})
def button_code_to_object(self, button_code: int):
# HACK
obj = None
try:
obj = c.button_code_lookup[button_code]
except IndexError:
return c.button_code_lookup[0]
return obj
def deserialize_key(self, key: str):
obj = None
try:
obj = c.keyboard_lookup[key]
except KeyError:
if len(key) != 1:
return None
return key
return obj
def process_message(self, message: str) -> bool:
code, args = self.parser.parse(message)
if code == None:
print("error while parsing message:", args)
return False
elif code == "r":
self.mouse_controller.move(args["x"], args["y"])
elif code == "d":
self.mouse_controller.press(self.button_code_to_object(args["button"]))
elif code == "u":
self.mouse_controller.release(self.button_code_to_object(args["button"]))
elif code == "c":
self.mouse_controller.click(self.button_code_to_object(args["button"]))
elif code == "s":
self.mouse_controller.scroll(args["x"], args["y"])
elif code == "k":
key = self.deserialize_key(args["key"])
if key:
self.keyboard_controller.tap(key)
else:
print("got invalid code from parser (is this a bug with the MessageParser?)")
return False
return True

40
MessageParser.py Normal file
View file

@ -0,0 +1,40 @@
class MessageParser():
def __init__(self):
self.handlers = {}
def add_handler(self, code: str, args={}):
self.handlers[code] = {
"args": args
}
def parse(self, message: str):
if len(message) < 1:
return None, "Message is empty"
message_code = message[0]
if message_code not in self.handlers:
return None, "Message code is not handled"
message_arguments = message[1:].split(";")
handler = self.handlers[message_code]
decoded_arguments = {}
if len(handler["args"]) != len(message_arguments):
return None, "Got message with invalid argument count"
for i, argument_name in enumerate(handler["args"]):
argument_type = handler["args"][argument_name]
if argument_type == "int":
try:
decoded_arguments[argument_name] = int(message_arguments[i])
except ValueError:
return None, "Error parsing int argument for message (is it really an int?)"
elif argument_type == "float":
try:
decoded_arguments[argument_name] = float(message_arguments[i])
except ValueError:
return None, "Error parsing float argument for message (is it really a float?)"
elif argument_type == "str":
decoded_arguments[argument_name] = message_arguments[i]
elif argument_type == "char":
if len(message_arguments[i]) != 1:
return None, "Error parsing char argument due to invalid size"
decoded_arguments[argument_name] = message_arguments[i]
else:
raise ValueError("parse(): Message handler references an invalid argument type")
return message_code, decoded_arguments

View file

@ -1,148 +1,21 @@
from pynput.mouse import Button, Controller
from pynput.keyboard import Controller as KeyboardController
from pynput.keyboard import Key
from sanic import Sanic
from sanic.response import file
class MessageParser():
def __init__(self):
self.handlers = {}
def add_handler(self, code: str, args={}):
self.handlers[code] = {
"args": args
}
def parse(self, message: str):
if len(message) < 1:
return None, "Message is empty"
message_code = message[0]
if message_code not in self.handlers:
return None, "Message code is not handled"
message_arguments = message[1:].split(";")
handler = self.handlers[message_code]
decoded_arguments = {}
if len(handler["args"]) != len(message_arguments):
return None, "Got message with invalid argument count"
for i, argument_name in enumerate(handler["args"]):
argument_type = handler["args"][argument_name]
if argument_type == "int":
try:
decoded_arguments[argument_name] = int(message_arguments[i])
except ValueError:
return None, "Error parsing int argument for message (is it really an int?)"
elif argument_type == "float":
try:
decoded_arguments[argument_name] = float(message_arguments[i])
except ValueError:
return None, "Error parsing float argument for message (is it really a float?)"
elif argument_type == "str":
decoded_arguments[argument_name] = message_arguments[i]
elif argument_type == "char":
if len(message_arguments[i]) != 1:
return None, "Error parsing char argument due to invalid size"
decoded_arguments[argument_name] = message_arguments[i]
else:
raise ValueError("parse(): Message handler references an invalid argument type")
return message_code, decoded_arguments
button_code_lookup = [
Button.left,
Button.right
]
keyboard_lookup = {
"{space}": Key.space,
"{ent}": Key.enter,
"{backspace}": Key.backspace
}
class InputController():
def __init__(self):
self.mouse_controller = Controller()
self.keyboard_controller = KeyboardController()
self.parser = MessageParser()
# Keyboard key press
self.parser.add_handler("k", {
"key": "str"
})
# Relative mouse movement
self.parser.add_handler("r", {
"x": "int",
"y": "int"
})
# Mouse relative scroll
self.parser.add_handler("s", {
"x": "float",
"y": "float"
})
# Mouse button down
self.parser.add_handler("d", {
"button": "int"
})
# Mouse button up
self.parser.add_handler("u", {
"button": "int"
})
# Mouse button click
self.parser.add_handler("c", {
"button": "int"
})
def button_code_to_object(self, button_code: int):
# HACK
obj = None
try:
obj = button_code_lookup[button_code]
except IndexError:
return button_code_lookup[0]
return obj
def deserialize_key(self, key: str):
obj = None
try:
obj = keyboard_lookup[key]
except KeyError:
if len(key) != 1:
return None
return key
return obj
def process_message(self, message: str) -> bool:
code, args = self.parser.parse(message)
if code == None:
print("error while parsing message:", args)
return False
elif code == "r":
self.mouse_controller.move(args["x"], args["y"])
elif code == "d":
self.mouse_controller.press(self.button_code_to_object(args["button"]))
elif code == "u":
self.mouse_controller.release(self.button_code_to_object(args["button"]))
elif code == "c":
self.mouse_controller.click(self.button_code_to_object(args["button"]))
elif code == "s":
self.mouse_controller.scroll(args["x"], args["y"])
elif code == "k":
key = self.deserialize_key(args["key"])
if key:
self.keyboard_controller.tap(key)
else:
print("got invalid code from parser (is this a bug with the MessageParser?)")
return False
return True
from InputController import InputController
app = Sanic("capybara")
app.ctx.input_controller = InputController()
app.ctx.input_controller = InputController()
app.static("app", "frontend/dist/", resource_type="dir")
@app.websocket("/gateway")
async def gateway(req, ws):
while True:
app.ctx.input_controller.process_message(await ws.recv())
def main():
app.run(host='0.0.0.0', port=4003, access_log=False)

13
constants.py Normal file
View file

@ -0,0 +1,13 @@
from pynput.keyboard import Key
from pynput.mouse import Button
button_code_lookup = [
Button.left,
Button.right
]
keyboard_lookup = {
"{space}": Key.space,
"{ent}": Key.enter,
"{backspace}": Key.backspace
}