import asyncio import string import secrets import json import websockets clients = { "FD3ZJFHQ7r": { "doorOpen": False, "lockdown": False, "lastOpened": 1, #unix timestamp "secLevel": 1, "openedBy": "user" } } def rand_id(): return ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(16)) class WsClient: def __init__(self, ws): self.ws = ws self.module_info = {} self.pcid = None def send(self, message): self.ws.send(json.dumps(message)) class WebSocketHandler: def __init__(self): self.clients = {} def on_message(self, client, message): message_type = message.get('type') # Route messages based on type using match-case match message_type: case "CAuth": client.pcid = message["pcid"] case "CLog": print(f"[CC] [{message["level"]}] {message["msg"]}") case "CModule": match message["module"]: case "keypad": self.handle_keypad_msg(client, message["message"]); def handle_keypad_msg(self, client: WsClient, message): match message["type"]: case "CDoorOpened": print(f"[CC][KEYPAD] Door was opened on {message["time"]} {"manually" if message["manual"] else ""}") case "CGetUserStat": client.send({ "type": "SUserStat", # user info }) case "CGetDoorStat": stats = {} for pc in self.clients: i = pc.module_info["keypad"] stats[pc.uid] = i if i else {} client.send({ "type": "CDoorStat", "stats": stats }) async def new_client(self, ws): uid = rand_id() self.clients[uid] = WsClient(ws) self.clients[uid].send({ "type" : "SAuthReq" }) async for msg_t in ws: msg = json.loads(msg_t) self.on_message(self.clients[uid], msg) def stop(self): for c in self.clients: c.ws.close() async def main(): wsh = WebSocketHandler() async with websockets.serve(wsh.new_client, "localhost", 8765): await asyncio.get_running_loop().create_future() asyncio.run(main())