Files
pythagoras/main.py
2025-05-04 17:27:39 +02:00

362 lines
13 KiB
Python

from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect, BackgroundTasks, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
import logging
import uvicorn
from typing import Dict, List, Any
from dataclasses import dataclass
import json
import httpx
import asyncio
import struct
import os
from pathlib import Path
# Some useful variables
PEEHAITCHPEA_ENDPOINT = "http://localhost:9000/api.php?cmd=getselectedmessage"
PYTHAGORAS_PORT = 8000
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("pythagoras")
# Configure FastAPI app and initial values for variables
app = FastAPI(title="Pythagoras", description="A proxy service handling HTTP and WebSocket connections")
app.state.auto_polling = False
app.state.polling_rate = 5
app.state.enable_libretranslate = True
app.state.client_state = "idle"
app.state.latest_message = ""
# Define the media directory
MEDIA_DIR = Path("./media")
# Create the media directory if it doesn't exist
os.makedirs(MEDIA_DIR, exist_ok=True)
# Store for connected websocket clients
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
await setscreen_single(websocket, app.state.client_state) # Send the latest client state to any new client
await self.singlecast_binary({"type": "selectedmessage", "message": app.state.latest_message}, websocket) # Broadcast latest message to all clients
logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}")
def json_to_binary(self, json_str: str):
json_bytes = json_str.encode('utf-8')
json_length = len(json_bytes)
# 4-byte unsigned integer (uint32)
length_bytes = struct.pack('!I', json_length)
return length_bytes + json_bytes
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
async def broadcast_binary(self, data_dict: dict):
"""
Broadcasts a message to all connections as binary data.
"""
if not self.active_connections:
return
json_str = json.dumps(data_dict)
message_bytes = self.json_to_binary(json_str)
for connection in self.active_connections:
try:
await connection.send_bytes(message_bytes)
logger.debug(f"Sent binary message ({len(message_bytes)} bytes) to a client")
except Exception as e:
logger.error(f"Failed to send binary message: {str(e)}")
async def singlecast_binary(self, data_dict: dict, ws: WebSocket):
"""
I love code duplication
"""
json_str = json.dumps(data_dict)
message_bytes = self.json_to_binary(json_str)
try:
await ws.send_bytes(message_bytes)
logger.debug(f"Sent binary message ({len(message_bytes)} bytes) to a client")
except Exception as e:
logger.error(f"Failed to send binary message: {str(e)}")
manager = ConnectionManager()
# Static files
def read_file(filepath: str) -> str:
with open(filepath, "r", encoding="utf-8") as f:
return f.read()
@dataclass
class StaticFiles:
index_html: str = read_file("static/index.html")
script_js: str = read_file("static/script.js")
@app.get("/presentation/")
async def presentation_index(_: Request):
return HTMLResponse(status_code=200, content=StaticFiles.index_html, media_type="text/html")
@app.get("/presentation/script.js")
async def presentation_script(_: Request):
return HTMLResponse(
status_code=200, content=StaticFiles.script_js, media_type="text/javascript"
)
# Endpoints
@app.post("/control")
async def control_endpoint(request: Request):
"""Endpoint for control data."""
try:
data = await request.json()
logger.info(f"Received control data: {data}")
if data['command'] == "getselectedmessage":
message = await fetch_selected_message()
await process_selected_message(message)
logger.info(f"Received new selected message initiated from control: {message}")
elif data['command'] == "setautopolling" and 'state' in data:
new_state = data['state']
app.state.auto_polling = new_state
logger.info(f"Polling command issued, changing auto-polling to {new_state}")
elif data['command'] == "setlibretranslate" and 'state' in data:
new_state = data['state']
app.state.auto_polling = new_state
logger.info(f"LibreTranslate command issued, changing state to {new_state}")
elif data['command'] == "autopollingrate" and 'rate' in data:
new_rate = data['rate']
app.state.polling_rate = new_rate
logger.info(f"Auto-polling rate change requested: {new_rate} seconds")
elif data['command'] == "playvideo" and 'filename' in data:
filename = data['filename']
await playvideo(filename)
logger.info(f"Video playback requested: {filename}")
elif data['command'] == "seekvideo" and 'timestamp' in data:
timestamp = data['timestamp']
await seekvideo(timestamp)
logger.info(f"Seeking of the currently playing video requested to {timestamp} seconds")
elif data['command'] == "setscreen_main":
await setscreen("main")
app.state.client_state = "main"
logger.info(f"Setting the client screen to main view.")
elif data['command'] == "setscreen_video":
await setscreen("video")
app.state.client_state = "video"
logger.info(f"Setting the client screen to video playback.")
elif data['command'] == "setscreen_idle":
await setscreen("idle")
app.state.client_state = "idle"
logger.info(f"Setting the client screen to idle.")
return JSONResponse(
status_code=200,
content={"status": "success", "message": "Control data received"}
)
except Exception as e:
logger.error(f"Error processing control data: {str(e)}")
return JSONResponse(
status_code=400,
content={"status": "error", "message": f"Failed to process request."}
)
@app.post("/subtitles/update_current")
async def subtitles_update_current_endpoint(request: Request):
"""Endpoint for subtitle data - updating the current sentence as it comes."""
return await process_subtitles(request, "update_current")
@app.post("/subtitles/submit_sentence")
async def subtitles_submit_sentence_endpoint(request: Request):
"""Endpoint for subtitle data - submitting the final version of a sentence."""
return await process_subtitles(request, "submit_sentence")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time communication."""
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
logger.info(f"Received message from WebSocket: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
manager.disconnect(websocket)
@app.get("/media/{file_path:path}")
async def get_media(file_path: str):
"""
Serve media files from the media directory.
"""
full_path = MEDIA_DIR / file_path
absolute_path = full_path.resolve()
# Security check: Make sure the path is within the media directory
if not str(absolute_path).startswith(str(MEDIA_DIR.resolve())):
logger.warning(f"Attempted directory traversal: {file_path}")
raise HTTPException(status_code=403, detail="Access denied")
# Check if the file exists
if not absolute_path.is_file():
logger.warning(f"File not found: {absolute_path}")
raise HTTPException(status_code=404, detail="File not found")
logger.info(f"Serving media file: {file_path}")
return FileResponse(path=absolute_path, filename=absolute_path.name)
# Functions
async def process_subtitles(request: Request, sub_type: str):
try:
text_content = await request.body()
en_subtitle_text = text_content.decode("utf-8")
logger.info(f"Received subtitle text: {en_subtitle_text}, request type: {sub_type}")
if manager.active_connections:
await manager.broadcast_binary({"type": f"subtitle_en_{sub_type}", "text": en_subtitle_text})
if app.state.enable_libretranslate and sub_type == "submit_sentence":
cs_subtitle_text = await translate_to_cs_libre(en_subtitle_text)
await manager.broadcast_binary({"type": f"subtitle_cs_{sub_type}", "text": cs_subtitle_text})
return JSONResponse(
status_code=200,
content={"status": "success", "message": "Subtitle text received"}
)
except Exception as e:
logger.error(f"Error processing {sub_type} subtitle data: {str(e)}")
return JSONResponse(
status_code=400,
content={"status": "error", "message": f"Failed to process request."}
)
async def translate_to_cs_libre(text: str):
"""
Translates the provided text from English to Czech using LibreTranslate.
"""
if not text:
return text
try:
url = "http://localhost:5000/translate"
payload = {
"q": text,
"source": "en",
"target": "cs",
"format": "text"
}
timeout = httpx.Timeout(10.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, json=payload)
if response.status_code == 200:
result = response.json()
translated_text = result.get("translatedText", text)
logger.info(f"Successfully translated text to Czech")
return translated_text
else:
logger.error(f"Translation API error: {response.status_code}, {response.text}")
return text
except Exception as e:
logger.error(f"Translation error: {str(e)}")
return text
async def setscreen_single(ws: WebSocket, screen: str):
return await manager.singlecast_binary({"type": "setscreen", "screen": screen}, ws)
async def setscreen(screen: str):
return await manager.broadcast_binary({"type": "setscreen", "screen": screen})
async def playvideo(filename: str):
return await manager.broadcast_binary({"type": "playvideo", "filename": filename})
async def seekvideo(timestamp: int):
return await manager.broadcast_binary({"type": "seekvideo", "timestamp": timestamp})
async def fetch_selected_message():
"""
Fetches a selected message from the specified endpoint.
Returns the message as a string or None if no message is available.
"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(PEEHAITCHPEA_ENDPOINT)
if response.status_code == 200:
message = response.text.strip()
if message:
logger.info(f"Received selected message: {message}")
return message
else:
return None
else:
logger.warning(f"Failed to fetch message. Status code: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error fetching selected message: {str(e)}")
return None
async def process_selected_message(message: str):
"""
Processes the selected message and saves it to cache.
"""
logger.info(f"Processing message: {message}")
app.state.latest_message = message
if manager.active_connections:
await manager.broadcast_binary({"type": "selectedmessage", "message": message})
async def periodic_message_check():
"""Periodically checks for new messages."""
while True:
if app.state.auto_polling is True:
logger.info("Automatically polling message...")
message = await fetch_selected_message()
await process_selected_message(message)
await asyncio.sleep(app.state.polling_rate)
# Startup tasks setup
@app.on_event("startup")
async def startup_event():
"""Start background tasks when the application starts."""
asyncio.create_task(periodic_message_check())
# Main function and app entry point
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=PYTHAGORAS_PORT, reload=True)