from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect, BackgroundTasks from fastapi.responses import HTMLResponse, JSONResponse import logging import uvicorn from typing import Dict, List, Any from dataclasses import dataclass import json import httpx import asyncio # Some useful variables PEEHAITCHPEA_ENDPOINT = "http://localhost:9000/api.php?cmd=getselectedmessage" PYTHAGORAS_PORT = 8000 # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("pythagoras") # Configure FastAPI app and initial values for variables app = FastAPI(title="Pythagoras", description="A proxy service handling HTTP and WebSocket connections") app.state.auto_polling = False app.state.polling_rate = 5 # Store for connected websocket clients class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}") def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}") async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() # Static files def read_file(filepath: str) -> str: with open(filepath, "r", encoding="utf-8") as f: return f.read() @dataclass class StaticFiles: index_html: str = read_file("static/index.html") @app.get("/presentation/") async def presentation_index(_: Request): return HTMLResponse(status_code=200, content=StaticFiles.index_html) # Endpoints @app.post("/control") async def control_endpoint(request: Request): """Endpoint for control data.""" try: data = await request.json() logger.info(f"Received control data: {data}") if data['command'] == "getselectedmessage": message = await fetch_selected_message() await process_selected_message(message) logger.info(f"Received new selected message initiated from control: {message}") elif data['command'] == "setautopolling" and 'state' in data: new_state = data['state'] app.state.auto_polling = new_state logger.info(f"Polling command issued, changing auto-polling to {new_state}") elif data['command'] == "autopollingrate" and 'rate' in data: new_rate = data['rate'] app.state.polling_rate = new_rate logger.info(f"Auto-polling rate change requested: {new_rate} seconds") return JSONResponse( status_code=200, content={"status": "success", "message": "Control data received"} ) except Exception as e: logger.error(f"Error processing control data: {str(e)}") return JSONResponse( status_code=400, content={"status": "error", "message": f"Failed to process request."} ) @app.post("/subtitles/update_current") async def subtitles_update_current_endpoint(request: Request): """Endpoint for subtitle data - updating the current sentence as it comes.""" return await process_subtitles(request, "update_current") @app.post("/subtitles/submit_sentence") async def subtitles_submit_sentence_endpoint(request: Request): """Endpoint for subtitle data - submitting the final version of a sentence.""" return await process_subtitles(request, "submit_sentence") @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time communication.""" await manager.connect(websocket) try: while True: data = await websocket.receive_text() logger.info(f"Received message from WebSocket: {data}") except WebSocketDisconnect: manager.disconnect(websocket) except Exception as e: logger.error(f"WebSocket error: {str(e)}") manager.disconnect(websocket) # Functions async def process_subtitles(request: Request, sub_type: str): try: text_content = await request.body() subtitle_text = text_content.decode("utf-8") logger.info(f"Received subtitle text: {subtitle_text}, request type: {sub_type}") if manager.active_connections: await manager.broadcast(json.dumps({"type": f"subtitle_{sub_type}", "text": subtitle_text})) return JSONResponse( status_code=200, content={"status": "success", "message": "Subtitle text received"} ) except Exception as e: logger.error(f"Error processing {sub_type} subtitle data: {str(e)}") return JSONResponse( status_code=400, content={"status": "error", "message": f"Failed to process request."} ) async def fetch_selected_message(): """ Fetches a selected message from the specified endpoint. Returns the message as a string or None if no message is available. """ try: async with httpx.AsyncClient() as client: response = await client.get(PEEHAITCHPEA_ENDPOINT) if response.status_code == 200: message = response.text.strip() if message: logger.info(f"Received selected message: {message}") return message else: return None else: logger.warning(f"Failed to fetch message. Status code: {response.status_code}") return None except Exception as e: logger.error(f"Error fetching selected message: {str(e)}") return None async def process_selected_message(message: str): """ Processes the selected message. """ logger.info(f"Processing message: {message}") if manager.active_connections: await manager.broadcast(json.dumps({"type": "selectedmessage", "message": message})) async def periodic_message_check(): """Periodically checks for new messages.""" while True: if app.state.auto_polling is True: logger.info("Automatically polling message...") message = await fetch_selected_message() await process_selected_message(message) await asyncio.sleep(app.state.polling_rate) # Startup tasks setup @app.on_event("startup") async def startup_event(): """Start background tasks when the application starts.""" asyncio.create_task(periodic_message_check()) # Main function and app entry point if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=PYTHAGORAS_PORT, reload=True)