rtmc-be/server.py
2024-09-09 04:04:18 +03:00

99 lines
2.4 KiB
Python

import asyncio
import string
import secrets
import json
import websockets
clients = {
"FD3ZJFHQ7r": {
"doorOpen": False,
"lockdown": False,
"lastOpened": 1, #unix timestamp
"secLevel": 1,
"openedBy": "user"
}
}
def rand_id():
return ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(16))
class WsClient:
def __init__(self, ws):
self.ws = ws
self.module_info = {}
self.pcid = None
def send(self, message):
self.ws.send(json.dumps(message))
class WebSocketHandler:
def __init__(self):
self.clients = {}
def on_message(self, client, message):
message_type = message.get('type')
# Route messages based on type using match-case
match message_type:
case "CAuth":
client.pcid = message["pcid"]
case "CLog":
print(f"[CC] [{message["level"]}] {message["msg"]}")
case "CModule":
match message["module"]:
case "keypad":
self.handle_keypad_msg(client, message["message"]);
def handle_keypad_msg(self, client: WsClient, message):
match message["type"]:
case "CDoorOpened":
print(f"[CC][KEYPAD] Door was opened on {message["time"]} {"manually" if message["manual"] else ""}")
case "CGetUserStat":
client.send({
"type": "SUserStat",
# user info
})
case "CGetDoorStat":
stats = {}
for pc in self.clients:
i = pc.module_info["keypad"]
stats[pc.uid] = i if i else {}
client.send({
"type": "CDoorStat",
"stats": stats
})
async def new_client(self, ws):
uid = rand_id()
self.clients[uid] = WsClient(ws)
self.clients[uid].send({
"type" : "SAuthReq"
})
async for msg_t in ws:
msg = json.loads(msg_t)
self.on_message(self.clients[uid], msg)
def stop(self):
for c in self.clients:
c.ws.close()
async def main():
wsh = WebSocketHandler()
async with websockets.serve(wsh.new_client, "localhost", 8765):
await asyncio.get_running_loop().create_future()
asyncio.run(main())