implement server logic

This commit is contained in:
2025-05-02 14:10:44 +02:00
parent e90dfa0b0b
commit 22ff485190
9 changed files with 117 additions and 8 deletions

1
Cargo.lock generated
View File

@@ -840,6 +840,7 @@ dependencies = [
"hyper",
"hyper-util",
"serde",
"serde_json",
"tokio",
"toml",
"url",

View File

@@ -1,3 +1,7 @@
[workspace]
resolver = "2"
members = ["lib/search_and_replace", "theseus-server"]
[profile.release]
strip = true
lto = true

View File

@@ -15,6 +15,6 @@ run: dst/dev.toml $(TARGETS_CLIENT)
clean: client_clean
rm -rf dst
dst/release/theseus-server: $(SRCS_RUST_THESEUS_SERVER) $(TARGETS_CLIENT)
dst/release/theseus-server: $(SRCS_RUST_THESEUS_SERVER) $(TARGETS_CLIENT) dst/prod.toml
cargo build --package theseus-server --release
touch $@

View File

@@ -6,7 +6,7 @@ max_question_body_size = 25
memory_limit = 50
[maintenance]
interval = 60
interval = 10
[push]
endpoint = "[::1]:8081"
endpoint = "http://[::1]:8081/push"

View File

@@ -1,3 +1,7 @@
dst/dev.toml: config/dev.toml
@mkdir -p $(@D)
ln -f $< $@
dst/prod.toml: config/prod.toml
@mkdir -p $(@D)
ln -f $< $@

12
config/prod.toml Normal file
View File

@@ -0,0 +1,12 @@
[server]
bind_to = "[::1]:8080"
max_question_body_size = 2048
[performance]
memory_limit = 536870912
[maintenance]
interval = 60
[push]
endpoint = "http://[::1]:9091/api.php?cmd=newmodmessages"

View File

@@ -10,6 +10,7 @@ http-body-util = "0.1.3"
hyper = { version = "1.6.0", features = ["full"] }
hyper-util = { version = "0.1.11", features = ["full"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
tokio = { version = "1.44.2", features = ["full"] }
toml = "0.8.22"
url = "2.5.4"

View File

@@ -33,7 +33,7 @@ def_config! {
interval = u64, // in seconds
[push Push]
endpoint = std::net::SocketAddr, // recommended format: "<host>:<port>"
endpoint = String, // recommended format: "<host>:<port>"
}

View File

@@ -1,12 +1,12 @@
use std::{collections::HashSet, sync::{atomic::{AtomicUsize, Ordering}, Arc}};
use std::{collections::HashSet, sync::Arc};
use askama::Template;
use http_body_util::{combinators::BoxBody, BodyExt, Collected, Full};
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::{
body::{Body, Bytes, Incoming},
header::HeaderValue, server::conn::http1, service::service_fn, Error, Method, Request, Response
};
use hyper_util::rt::TokioIo;
use tokio::{net::TcpListener, sync::Mutex};
use tokio::{net::{TcpListener, TcpStream}, sync::Mutex};
mod logger;
mod args;
@@ -191,15 +191,102 @@ async fn new_question(
Ok(response!(main_page req, "info", "Your question was successfully added."))
}
struct PushEndpoint<'a> {
uri: &'a hyper::Uri,
host: &'a str,
port: u16,
addr: String,
authority: String,
}
impl<'a> PushEndpoint<'a> {
fn new(uri: &'a hyper::Uri) -> Result<PushEndpoint<'a>, &'static str> {
let host = uri.host().ok_or("no host provided")?;
let port = uri.port_u16().ok_or("no port provided")?;
let addr = format!("{}:{}", host, port);
let authority = uri.authority().ok_or("no authority provided")?.to_string();
Ok(Self { uri, host, port, addr, authority })
}
}
async fn maintenance(state: Arc<SharedState>) {
let Ok(uri): Result<hyper::Uri, _> = state.config.push.endpoint.parse() else {
log!(fatal "could not parse uri: {:?}", state.config.push.endpoint);
return;
};
let uri = match PushEndpoint::new(&uri) {
Ok(v) => v,
Err(e) => {
log!(fatal "could not parse endpoint address: {:?}", e);
return;
}
};
let interval = std::time::Duration::from_secs(state.config.maintenance.interval);
let mut questions: HashSet<String> = HashSet::default();
log!(debug "started maintenance routine with {:?} interval", interval);
loop {
log!(debug "running maintenance");
log!(debug "----- MAINTENANCE -----");
let questions_new = state.questions.lock().await.consume_to_push();
questions.extend(questions_new);
log!(debug "pushing {} questions", questions.len());
match push_questions(&questions, &uri).await {
Ok(()) => questions = HashSet::default(),
Err(()) => { log!(debug "push failed - will try again"); },
};
log!(debug "----- /MAINTENANCE -----");
tokio::time::sleep(interval).await;
}
}
fn make_push_request<'a>(
questions: &HashSet<String>, uri: &PushEndpoint<'a>
) -> Result<hyper::Request<Full<Bytes>>, ()> {
let Ok(body) = serde_json::to_string(questions) else { return Err(()); };
hyper::Request::builder()
.uri(uri.uri)
.header(hyper::header::HOST, &uri.authority)
.header(hyper::header::CONTENT_TYPE, "application/json")
.body(Full::new(Bytes::from(body))).map_err(|_| ())
}
async fn connect_to_push_endpoint(
addr: &String,
) -> Result<hyper::client::conn::http1::SendRequest<Full<Bytes>>, ()> {
let stream = TcpStream::connect(addr).await.map_err(|_| ())?;
let (sender, conn) = hyper::client::conn::http1::handshake(
TokioIo::new(stream)
).await.map_err(|_| ())?;
tokio::task::spawn(async move {
conn.await
});
Ok(sender)
}
async fn push_questions<'a>(questions: &HashSet<String>, uri: &PushEndpoint<'a>) -> Result<(), ()> {
if questions.len() == 0 {
log!(debug "skipping push - no new questions");
return Ok(());
}
let Ok(mut conn) = connect_to_push_endpoint(&uri.addr).await else {
log!(err "could not connect to push endpoint");
return Err(());
};
let Ok(req) = make_push_request(questions, uri) else {
log!(err "could not construct push questions request");
return Err(());
};
let Ok(res) = conn.send_request(req).await else {
log!(err "could not send questions request to push endpoint");
return Err(());
};
if res.status() != hyper::StatusCode::OK {
log!(err "got non-200 response from push endpoint");
return Err(());
}
log!(info "successfully pushed new questions");
Ok(())
}
fn load_config(path: &str) -> Result<config::Config, String> {
let Ok(file_contents) = std::fs::read_to_string(path) else {
return Err("could not read the config file".to_string());