From 9315abc4883f91f8e98a76efd06ed47fc05526af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Mekina?= Date: Mon, 5 May 2025 11:26:59 +0200 Subject: [PATCH] quic and dirty fix --- client/presentationmgr.ts | 3 +++ client/pythagoras_client.ts | 19 +++++++-------- client/tools.ts | 10 +++++++- main.py | 46 ++++++++++++++++++++++++++++--------- 4 files changed, 55 insertions(+), 23 deletions(-) diff --git a/client/presentationmgr.ts b/client/presentationmgr.ts index b3bf740..1d05704 100644 --- a/client/presentationmgr.ts +++ b/client/presentationmgr.ts @@ -99,6 +99,8 @@ class MainScreen implements PresentationScreen { } public async end(): Promise { + this.dom_root.style.opacity = "0"; + await sleep(500); } private async show_question(text: string): Promise { @@ -231,6 +233,7 @@ class IdleScreen implements PresentationScreen { } public async start(): Promise { + await sleep(10); this.dom_title.style.transform = "translateY(0)"; this.dom_title.style.opacity = "1"; await sleep(250); diff --git a/client/pythagoras_client.ts b/client/pythagoras_client.ts index ba0ef42..4a97340 100644 --- a/client/pythagoras_client.ts +++ b/client/pythagoras_client.ts @@ -92,7 +92,8 @@ export class PythagorasClient { private async recv_inner_bytes(): Promise { const received = await this.recv_inner(); if (received.bytes !== undefined) { return await received.bytes(); } - return utf8_encode(await received.text()); + const text = await received.text(); + return utf8_encode(text); } /** @@ -115,16 +116,12 @@ export class PythagorasClient { public async recv(): Promise { while (true) { - const advertised_length = uint_bytes_to_num(await this.recv_length(4)); - try { - const payload = utf8_decode(await this.recv_length(advertised_length)); - const parsed = JSON.parse(payload); - console.log(parsed); - return parsed; - } catch { - this.buf = new Uint8Array(0); - await this.reconnect(); - } + const payload = await this.recv_inner(); + const text = payload.bytes === undefined ? + (await payload.text()).slice(4) : (utf8_decode((await payload.bytes()).slice(4))); + console.log("payload:", text); + const parsed = JSON.parse(text); + return parsed; } } } diff --git a/client/tools.ts b/client/tools.ts index 7cf4bb6..a4cf0e1 100644 --- a/client/tools.ts +++ b/client/tools.ts @@ -9,7 +9,15 @@ export function uint_bytes_to_num(value: Uint8Array): number { export function utf8_encode(value: string): Uint8Array { const encoder = new TextEncoder(); - return encoder.encode(value); + let res: Uint8Array = new Uint8Array(0); + for (const ch of value) { + const cur = encoder.encode(ch)[0]; + const prev = res; + res = new Uint8Array(prev.length + 1); + res.set(prev, 0); + res.set([cur], prev.length); + } + return res; } export function utf8_decode(value: Uint8Array): string { diff --git a/main.py b/main.py index 6c19e4c..b9a9667 100644 --- a/main.py +++ b/main.py @@ -10,6 +10,7 @@ import asyncio import struct import os from pathlib import Path +from asyncio import Lock # Some useful variables PEEHAITCHPEA_ENDPOINT = "http://localhost:9000/api.php?cmd=getselectedmessage" @@ -38,27 +39,49 @@ os.makedirs(MEDIA_DIR, exist_ok=True) # Store for connected websocket clients +class WSConnection: + inner: WebSocket + lock: Lock + + def __init__(self, sock: WebSocket): + self.inner = sock + self.lock = Lock() + + async def accept(self): + async with self.lock: + await self.inner.accept() + + async def send_bytes(self, data: bytes): + async with self.lock: + await self.inner.send_bytes(data) + + async def send_text(self, data: str): + async with self.lock: + await self.inner.send_text(data) + class ConnectionManager: def __init__(self): - self.active_connections: List[WebSocket] = [] + self.active_connections: List[WSConnection] = [] - async def connect(self, websocket: WebSocket): + async def connect(self, websocket: WSConnection): await websocket.accept() self.active_connections.append(websocket) await setscreen_single(websocket, app.state.client_state) # Send the latest client state to any new client await self.singlecast_binary({"type": "selectedmessage", "message": app.state.latest_message}, websocket) # Broadcast latest message to all clients logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}") - def disconnect(self, websocket: WebSocket): + def disconnect(self, websocket: WSConnection): self.active_connections.remove(websocket) logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}") def json_to_binary(self, json_str: str): json_bytes = json_str.encode('utf-8') - json_length = len(json_bytes) & 0xFFFFFFFF + json_length = len(json_bytes) # 4-byte unsigned integer (uint32) length_bytes = struct.pack('!I', json_length) + if len(length_bytes) != 4: + raise Exception("invalid packed length") return length_bytes + json_bytes @@ -84,7 +107,7 @@ class ConnectionManager: except Exception as e: logger.error(f"Failed to send binary message: {str(e)}") - async def singlecast_binary(self, data_dict: dict, ws: WebSocket): + async def singlecast_binary(self, data_dict: dict, ws: WSConnection): """ I love code duplication """ @@ -205,16 +228,17 @@ async def subtitles_submit_sentence_endpoint(request: Request): @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time communication.""" - await manager.connect(websocket) + conn = WSConnection(websocket) + await manager.connect(conn) try: while True: data = await websocket.receive_text() logger.info(f"Received message from WebSocket: {data}") except WebSocketDisconnect: - manager.disconnect(websocket) + manager.disconnect(conn) except Exception as e: logger.error(f"WebSocket error: {str(e)}") - manager.disconnect(websocket) + manager.disconnect(conn) @app.get("/media/{file_path:path}") async def get_media(file_path: str): @@ -300,13 +324,13 @@ async def translate_to_cs_libre(text: str): logger.error(f"Translation error: {str(e)}") return text -async def setscreen_single(ws: WebSocket, screen: str): +async def setscreen_single(ws: WSConnection, screen: str): return await manager.singlecast_binary({"type": "setscreen", "screen": screen}, ws) async def setscreen(screen: str): return await manager.broadcast_binary({"type": "setscreen", "screen": screen}) -async def playvideo(filename: str, subtitles: str, seconds_from_start: int): +async def playvideo(filename: str, subtitles: str | None, seconds_from_start: int): return await manager.broadcast_binary({"type": "playvideo", "filename": filename, "subtitles": subtitles, "seconds_from_start": seconds_from_start}) async def seekvideo(timestamp: int): @@ -337,7 +361,7 @@ async def fetch_selected_message(): logger.error(f"Error fetching selected message: {str(e)}") return None -async def process_selected_message(message: str): +async def process_selected_message(message: str | None): """ Processes the selected message and saves it to cache. """