from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect, BackgroundTasks, HTTPException from fastapi.responses import HTMLResponse, JSONResponse, FileResponse import logging import uvicorn from typing import Dict, List, Any from dataclasses import dataclass import json import httpx import asyncio import struct import os from pathlib import Path # Some useful variables PEEHAITCHPEA_ENDPOINT = "http://localhost:9000/api.php?cmd=getselectedmessage" PYTHAGORAS_PORT = 8000 # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("pythagoras") # Configure FastAPI app and initial values for variables app = FastAPI(title="Pythagoras", description="A proxy service handling HTTP and WebSocket connections") app.state.auto_polling = False app.state.polling_rate = 5 # Define the media directory MEDIA_DIR = Path("./media") # Create the media directory if it doesn't exist os.makedirs(MEDIA_DIR, exist_ok=True) # Store for connected websocket clients class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}") def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}") async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) async def broadcast_binary(self, data_dict: dict): """ Broadcasts a message to all connections as binary data. """ if not self.active_connections: return json_str = json.dumps(data_dict) json_bytes = json_str.encode('utf-8') json_length = len(json_bytes) # 4-byte unsigned integer (uint32) length_bytes = struct.pack('!I', json_length) message_bytes = length_bytes + json_bytes for connection in self.active_connections: try: await connection.send_bytes(message_bytes) logger.debug(f"Sent binary message ({json_length} bytes) to a client") except Exception as e: logger.error(f"Failed to send binary message: {str(e)}") manager = ConnectionManager() # Static files def read_file(filepath: str) -> str: with open(filepath, "r", encoding="utf-8") as f: return f.read() @dataclass class StaticFiles: index_html: str = read_file("static/index.html") script_js: str = read_file("static/script.js") @app.get("/presentation/") async def presentation_index(_: Request): return HTMLResponse(status_code=200, content=StaticFiles.index_html, media_type="text/html") @app.get("/presentation/script.js") async def presentation_script(_: Request): return HTMLResponse( status_code=200, content=StaticFiles.script_js, media_type="text/javascript" ) # Endpoints @app.post("/control") async def control_endpoint(request: Request): """Endpoint for control data.""" try: data = await request.json() logger.info(f"Received control data: {data}") if data['command'] == "getselectedmessage": message = await fetch_selected_message() await process_selected_message(message) logger.info(f"Received new selected message initiated from control: {message}") elif data['command'] == "setautopolling" and 'state' in data: new_state = data['state'] app.state.auto_polling = new_state logger.info(f"Polling command issued, changing auto-polling to {new_state}") elif data['command'] == "autopollingrate" and 'rate' in data: new_rate = data['rate'] app.state.polling_rate = new_rate logger.info(f"Auto-polling rate change requested: {new_rate} seconds") return JSONResponse( status_code=200, content={"status": "success", "message": "Control data received"} ) except Exception as e: logger.error(f"Error processing control data: {str(e)}") return JSONResponse( status_code=400, content={"status": "error", "message": f"Failed to process request."} ) @app.post("/subtitles/update_current") async def subtitles_update_current_endpoint(request: Request): """Endpoint for subtitle data - updating the current sentence as it comes.""" return await process_subtitles(request, "update_current") @app.post("/subtitles/submit_sentence") async def subtitles_submit_sentence_endpoint(request: Request): """Endpoint for subtitle data - submitting the final version of a sentence.""" return await process_subtitles(request, "submit_sentence") @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time communication.""" await manager.connect(websocket) try: while True: data = await websocket.receive_text() logger.info(f"Received message from WebSocket: {data}") except WebSocketDisconnect: manager.disconnect(websocket) except Exception as e: logger.error(f"WebSocket error: {str(e)}") manager.disconnect(websocket) @app.get("/media/{file_path:path}") async def get_media(file_path: str): """ Serve media files from the media directory. """ full_path = MEDIA_DIR / file_path absolute_path = full_path.resolve() # Security check: Make sure the path is within the media directory if not str(absolute_path).startswith(str(MEDIA_DIR.resolve())): logger.warning(f"Attempted directory traversal: {file_path}") raise HTTPException(status_code=403, detail="Access denied") # Check if the file exists if not absolute_path.is_file(): logger.warning(f"File not found: {absolute_path}") raise HTTPException(status_code=404, detail="File not found") logger.info(f"Serving media file: {file_path}") return FileResponse(path=absolute_path, filename=absolute_path.name) # Functions async def process_subtitles(request: Request, sub_type: str): try: text_content = await request.body() subtitle_text = text_content.decode("utf-8") logger.info(f"Received subtitle text: {subtitle_text}, request type: {sub_type}") if manager.active_connections: await manager.broadcast_binary({"type": f"subtitle_{sub_type}", "text": subtitle_text}) return JSONResponse( status_code=200, content={"status": "success", "message": "Subtitle text received"} ) except Exception as e: logger.error(f"Error processing {sub_type} subtitle data: {str(e)}") return JSONResponse( status_code=400, content={"status": "error", "message": f"Failed to process request."} ) async def fetch_selected_message(): """ Fetches a selected message from the specified endpoint. Returns the message as a string or None if no message is available. """ try: async with httpx.AsyncClient() as client: response = await client.get(PEEHAITCHPEA_ENDPOINT) if response.status_code == 200: message = response.text.strip() if message: logger.info(f"Received selected message: {message}") return message else: return None else: logger.warning(f"Failed to fetch message. Status code: {response.status_code}") return None except Exception as e: logger.error(f"Error fetching selected message: {str(e)}") return None async def process_selected_message(message: str): """ Processes the selected message. """ logger.info(f"Processing message: {message}") if manager.active_connections: await manager.broadcast_binary({"type": "selectedmessage", "message": message}) async def periodic_message_check(): """Periodically checks for new messages.""" while True: if app.state.auto_polling is True: logger.info("Automatically polling message...") message = await fetch_selected_message() await process_selected_message(message) await asyncio.sleep(app.state.polling_rate) # Startup tasks setup @app.on_event("startup") async def startup_event(): """Start background tasks when the application starts.""" asyncio.create_task(periodic_message_check()) # Main function and app entry point if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=PYTHAGORAS_PORT, reload=True)