From 1626cd99e52130e2307c6545cb2c4b82c72f7e23 Mon Sep 17 00:00:00 2001 From: hippoz Date: Thu, 4 Nov 2021 02:41:34 +0200 Subject: [PATCH] organize server code into multiple files --- .gitignore | 1 + InputController.py | 80 +++++++++++++++++++++++++++ MessageParser.py | 40 ++++++++++++++ capybara.py | 135 ++------------------------------------------- constants.py | 13 +++++ 5 files changed, 138 insertions(+), 131 deletions(-) create mode 100644 InputController.py create mode 100644 MessageParser.py create mode 100644 constants.py diff --git a/.gitignore b/.gitignore index 39ffe5e..0c66e71 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ frontend/node_modules/ frontend/dist/ +__pycache__/ diff --git a/InputController.py b/InputController.py new file mode 100644 index 0000000..7ababb9 --- /dev/null +++ b/InputController.py @@ -0,0 +1,80 @@ +from pynput.mouse import Controller as MouseController +from pynput.keyboard import Controller as KeyboardController + +import constants as c +from MessageParser import MessageParser + +class InputController(): + def __init__(self): + self.mouse_controller = MouseController() + self.keyboard_controller = KeyboardController() + self.parser = MessageParser() + + # Keyboard key press + self.parser.add_handler("k", { + "key": "str" + }) + # Relative mouse movement + self.parser.add_handler("r", { + "x": "int", + "y": "int" + }) + # Mouse relative scroll + self.parser.add_handler("s", { + "x": "float", + "y": "float" + }) + # Mouse button down + self.parser.add_handler("d", { + "button": "int" + }) + # Mouse button up + self.parser.add_handler("u", { + "button": "int" + }) + # Mouse button click + self.parser.add_handler("c", { + "button": "int" + }) + def button_code_to_object(self, button_code: int): + # HACK + obj = None + try: + obj = c.button_code_lookup[button_code] + except IndexError: + return c.button_code_lookup[0] + return obj + def deserialize_key(self, key: str): + obj = None + try: + obj = c.keyboard_lookup[key] + except KeyError: + if len(key) != 1: + return None + return key + return obj + def process_message(self, message: str) -> bool: + code, args = self.parser.parse(message) + + if code == None: + print("error while parsing message:", args) + return False + elif code == "r": + self.mouse_controller.move(args["x"], args["y"]) + elif code == "d": + self.mouse_controller.press(self.button_code_to_object(args["button"])) + elif code == "u": + self.mouse_controller.release(self.button_code_to_object(args["button"])) + elif code == "c": + self.mouse_controller.click(self.button_code_to_object(args["button"])) + elif code == "s": + self.mouse_controller.scroll(args["x"], args["y"]) + elif code == "k": + key = self.deserialize_key(args["key"]) + if key: + self.keyboard_controller.tap(key) + else: + print("got invalid code from parser (is this a bug with the MessageParser?)") + return False + + return True \ No newline at end of file diff --git a/MessageParser.py b/MessageParser.py new file mode 100644 index 0000000..5ef79ec --- /dev/null +++ b/MessageParser.py @@ -0,0 +1,40 @@ +class MessageParser(): + def __init__(self): + self.handlers = {} + def add_handler(self, code: str, args={}): + self.handlers[code] = { + "args": args + } + def parse(self, message: str): + if len(message) < 1: + return None, "Message is empty" + message_code = message[0] + if message_code not in self.handlers: + return None, "Message code is not handled" + message_arguments = message[1:].split(";") + handler = self.handlers[message_code] + decoded_arguments = {} + if len(handler["args"]) != len(message_arguments): + return None, "Got message with invalid argument count" + for i, argument_name in enumerate(handler["args"]): + argument_type = handler["args"][argument_name] + if argument_type == "int": + try: + decoded_arguments[argument_name] = int(message_arguments[i]) + except ValueError: + return None, "Error parsing int argument for message (is it really an int?)" + elif argument_type == "float": + try: + decoded_arguments[argument_name] = float(message_arguments[i]) + except ValueError: + return None, "Error parsing float argument for message (is it really a float?)" + elif argument_type == "str": + decoded_arguments[argument_name] = message_arguments[i] + elif argument_type == "char": + if len(message_arguments[i]) != 1: + return None, "Error parsing char argument due to invalid size" + decoded_arguments[argument_name] = message_arguments[i] + else: + raise ValueError("parse(): Message handler references an invalid argument type") + + return message_code, decoded_arguments diff --git a/capybara.py b/capybara.py index 1c82d0c..7709e28 100755 --- a/capybara.py +++ b/capybara.py @@ -1,148 +1,21 @@ -from pynput.mouse import Button, Controller -from pynput.keyboard import Controller as KeyboardController -from pynput.keyboard import Key from sanic import Sanic from sanic.response import file -class MessageParser(): - def __init__(self): - self.handlers = {} - def add_handler(self, code: str, args={}): - self.handlers[code] = { - "args": args - } - def parse(self, message: str): - if len(message) < 1: - return None, "Message is empty" - message_code = message[0] - if message_code not in self.handlers: - return None, "Message code is not handled" - message_arguments = message[1:].split(";") - handler = self.handlers[message_code] - decoded_arguments = {} - if len(handler["args"]) != len(message_arguments): - return None, "Got message with invalid argument count" - for i, argument_name in enumerate(handler["args"]): - argument_type = handler["args"][argument_name] - if argument_type == "int": - try: - decoded_arguments[argument_name] = int(message_arguments[i]) - except ValueError: - return None, "Error parsing int argument for message (is it really an int?)" - elif argument_type == "float": - try: - decoded_arguments[argument_name] = float(message_arguments[i]) - except ValueError: - return None, "Error parsing float argument for message (is it really a float?)" - elif argument_type == "str": - decoded_arguments[argument_name] = message_arguments[i] - elif argument_type == "char": - if len(message_arguments[i]) != 1: - return None, "Error parsing char argument due to invalid size" - decoded_arguments[argument_name] = message_arguments[i] - else: - raise ValueError("parse(): Message handler references an invalid argument type") - - return message_code, decoded_arguments - - -button_code_lookup = [ - Button.left, - Button.right -] - -keyboard_lookup = { - "{space}": Key.space, - "{ent}": Key.enter, - "{backspace}": Key.backspace -} - -class InputController(): - def __init__(self): - self.mouse_controller = Controller() - self.keyboard_controller = KeyboardController() - self.parser = MessageParser() - - # Keyboard key press - self.parser.add_handler("k", { - "key": "str" - }) - # Relative mouse movement - self.parser.add_handler("r", { - "x": "int", - "y": "int" - }) - # Mouse relative scroll - self.parser.add_handler("s", { - "x": "float", - "y": "float" - }) - # Mouse button down - self.parser.add_handler("d", { - "button": "int" - }) - # Mouse button up - self.parser.add_handler("u", { - "button": "int" - }) - # Mouse button click - self.parser.add_handler("c", { - "button": "int" - }) - def button_code_to_object(self, button_code: int): - # HACK - obj = None - try: - obj = button_code_lookup[button_code] - except IndexError: - return button_code_lookup[0] - return obj - def deserialize_key(self, key: str): - obj = None - try: - obj = keyboard_lookup[key] - except KeyError: - if len(key) != 1: - return None - return key - return obj - def process_message(self, message: str) -> bool: - code, args = self.parser.parse(message) - - if code == None: - print("error while parsing message:", args) - return False - elif code == "r": - self.mouse_controller.move(args["x"], args["y"]) - elif code == "d": - self.mouse_controller.press(self.button_code_to_object(args["button"])) - elif code == "u": - self.mouse_controller.release(self.button_code_to_object(args["button"])) - elif code == "c": - self.mouse_controller.click(self.button_code_to_object(args["button"])) - elif code == "s": - self.mouse_controller.scroll(args["x"], args["y"]) - elif code == "k": - key = self.deserialize_key(args["key"]) - if key: - self.keyboard_controller.tap(key) - else: - print("got invalid code from parser (is this a bug with the MessageParser?)") - return False - - return True +from InputController import InputController app = Sanic("capybara") -app.ctx.input_controller = InputController() +app.ctx.input_controller = InputController() app.static("app", "frontend/dist/", resource_type="dir") + @app.websocket("/gateway") async def gateway(req, ws): while True: app.ctx.input_controller.process_message(await ws.recv()) + def main(): app.run(host='0.0.0.0', port=4003, access_log=False) diff --git a/constants.py b/constants.py new file mode 100644 index 0000000..debbfc3 --- /dev/null +++ b/constants.py @@ -0,0 +1,13 @@ +from pynput.keyboard import Key +from pynput.mouse import Button + +button_code_lookup = [ + Button.left, + Button.right +] + +keyboard_lookup = { + "{space}": Key.space, + "{ent}": Key.enter, + "{backspace}": Key.backspace +}