Files
pythagoras/main.py

168 lines
5.9 KiB
Python

from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.responses import JSONResponse
import logging
import uvicorn
from typing import Dict, List, Any
import json
import httpx
import asyncio
# Some useful variables
PEEHAITCHPEA_ENDPOINT = "http://localhost:9000/api.php?cmd=getselectedmessage"
PYTHAGORAS_PORT = 9000
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("pythagoras")
# Configure FastAPI app and initial values for variables
app = FastAPI(title="Pythagoras", description="A proxy service handling HTTP and WebSocket connections")
app.state.auto_polling = False
app.state.polling_rate = 5
# Store for connected websocket clients
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}")
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
# Endpoints
@app.post("/control")
async def control_endpoint(request: Request):
"""Endpoint for control data."""
try:
data = await request.json()
logger.info(f"Received control data: {data}")
if data['command'] == "getselectedmessage":
message = await fetch_selected_message()
await process_selected_message(message)
logger.info(f"Received new selected message initiated from control: {message}")
elif data['command'] == "setautopolling" and 'state' in data:
new_state = data['state']
app.state.auto_polling = new_state
logger.info(f"Polling command issued, changing auto-polling to {new_state}")
elif data['command'] == "autopollingrate" and 'rate' in data:
new_rate = data['rate']
app.state.polling_rate = new_rate
logger.info(f"Auto-polling rate change requested: {new_rate} seconds")
return JSONResponse(
status_code=200,
content={"status": "success", "message": "Control data received"}
)
except Exception as e:
logger.error(f"Error processing control data: {str(e)}")
return JSONResponse(
status_code=400,
content={"status": "error", "message": f"Failed to process request."}
)
@app.post("/subtitles")
async def subtitles_endpoint(request: Request):
"""Endpoint for subtitle data."""
try:
text_content = await request.body()
subtitle_text = text_content.decode("utf-8")
logger.info(f"Received subtitle text: {subtitle_text}")
if manager.active_connections:
await manager.broadcast(json.dumps({"type": "subtitle", "text": subtitle_text}))
return JSONResponse(
status_code=200,
content={"status": "success", "message": "Subtitle text received"}
)
except Exception as e:
logger.error(f"Error processing subtitle data: {str(e)}")
return JSONResponse(
status_code=400,
content={"status": "error", "message": f"Failed to process request."}
)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time communication."""
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
logger.info(f"Received message from WebSocket: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
manager.disconnect(websocket)
# Functions
async def fetch_selected_message():
"""
Fetches a selected message from the specified endpoint.
Returns the message as a string or None if no message is available.
"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(PEEHAITCHPEA_ENDPOINT)
if response.status_code == 200:
message = response.text.strip()
if message:
logger.info(f"Received selected message: {message}")
return message
else:
return None
else:
logger.warning(f"Failed to fetch message. Status code: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error fetching selected message: {str(e)}")
return None
async def process_selected_message(message: str):
"""
Processes the selected message.
"""
logger.info(f"Processing message: {message}")
if manager.active_connections:
await manager.broadcast(json.dumps({"type": "selectedmessage", "message": message}))
async def periodic_message_check():
"""Periodically checks for new messages."""
while True:
if app.state.auto_polling is True:
logger.info("Automatically polling message...")
message = await fetch_selected_message()
await process_selected_message(message)
await asyncio.sleep(app.state.polling_rate)
# Startup tasks setup
@app.on_event("startup")
async def startup_event():
"""Start background tasks when the application starts."""
asyncio.create_task(periodic_message_check())
# Main function and app entry point
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=PYTHAGORAS_PORT, reload=True)