from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect, BackgroundTasks from fastapi.responses import JSONResponse import logging import uvicorn from typing import Dict, List, Any import json import httpx import asyncio # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("pythagoras") # Configure FastAPI app and initial values for variables app = FastAPI(title="Pythagoras", description="A proxy service handling HTTP and WebSocket connections") app.state.auto_polling = False app.state.polling_rate = 5 # Store for connected websocket clients class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}") def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}") async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() # Endpoints @app.post("/control") async def control_endpoint(request: Request): """Endpoint for control data.""" try: data = await request.json() logger.info(f"Received control data: {data}") if data['command'] == "getselectedmessage": message = await fetch_selected_message() await process_selected_message(message) logger.info(f"Received new selected message initiated from control: {message}") elif data['command'] == "setautopolling" and 'state' in data: new_state = data['state'] app.state.auto_polling = new_state logger.info(f"Polling command issued, changing auto-polling to {new_state}") elif data['command'] == "autopollingrate" and 'rate' in data: new_rate = data['rate'] app.state.polling_rate = new_rate logger.info(f"Auto-polling rate change requested: {new_rate} seconds") return JSONResponse( status_code=200, content={"status": "success", "message": "Control data received"} ) except Exception as e: logger.error(f"Error processing control data: {str(e)}") return JSONResponse( status_code=400, content={"status": "error", "message": f"Failed to process request."} ) @app.post("/subtitles") async def subtitles_endpoint(request: Request): """Endpoint for subtitle data.""" try: text_content = await request.body() subtitle_text = text_content.decode("utf-8") logger.info(f"Received subtitle text: {subtitle_text}") if manager.active_connections: await manager.broadcast(json.dumps({"type": "subtitle", "text": subtitle_text})) return JSONResponse( status_code=200, content={"status": "success", "message": "Subtitle text received"} ) except Exception as e: logger.error(f"Error processing subtitle data: {str(e)}") return JSONResponse( status_code=400, content={"status": "error", "message": f"Failed to process request."} ) @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time communication.""" await manager.connect(websocket) try: while True: data = await websocket.receive_text() logger.info(f"Received message from WebSocket: {data}") except WebSocketDisconnect: manager.disconnect(websocket) except Exception as e: logger.error(f"WebSocket error: {str(e)}") manager.disconnect(websocket) # Functions async def fetch_selected_message(): """ Fetches a selected message from the specified endpoint. Returns the message as a string or None if no message is available. """ try: async with httpx.AsyncClient() as client: response = await client.get("http://localhost:9000/api.php?cmd=getselectedmessage") if response.status_code == 200: message = response.text.strip() if message: logger.info(f"Received selected message: {message}") return message else: return None else: logger.warning(f"Failed to fetch message. Status code: {response.status_code}") return None except Exception as e: logger.error(f"Error fetching selected message: {str(e)}") return None async def process_selected_message(message: str): """ Processes the selected message. """ logger.info(f"Processing message: {message}") if manager.active_connections: await manager.broadcast(json.dumps({"type": "selectedmessage", "message": message})) async def periodic_message_check(): """Periodically checks for new messages.""" while True: if app.state.auto_polling is True: logger.info("Automatically polling message...") message = await fetch_selected_message() await process_selected_message(message) await asyncio.sleep(app.state.polling_rate) # Startup tasks setup @app.on_event("startup") async def startup_event(): """Start background tasks when the application starts.""" asyncio.create_task(periodic_message_check()) # Main function and app entry point if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)