implement server logic
This commit is contained in:
@@ -10,6 +10,7 @@ http-body-util = "0.1.3"
|
||||
hyper = { version = "1.6.0", features = ["full"] }
|
||||
hyper-util = { version = "0.1.11", features = ["full"] }
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.140"
|
||||
tokio = { version = "1.44.2", features = ["full"] }
|
||||
toml = "0.8.22"
|
||||
url = "2.5.4"
|
||||
|
@@ -33,7 +33,7 @@ def_config! {
|
||||
interval = u64, // in seconds
|
||||
|
||||
[push Push]
|
||||
endpoint = std::net::SocketAddr, // recommended format: "<host>:<port>"
|
||||
endpoint = String, // recommended format: "<proto>://<host>:<port>/<path>"
|
||||
|
||||
}
|
||||
|
||||
|
@@ -1,12 +1,12 @@
|
||||
use std::{collections::HashSet, sync::{atomic::{AtomicUsize, Ordering}, Arc}};
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
use askama::Template;
|
||||
use http_body_util::{combinators::BoxBody, BodyExt, Collected, Full};
|
||||
use http_body_util::{combinators::BoxBody, BodyExt, Full};
|
||||
use hyper::{
|
||||
body::{Body, Bytes, Incoming},
|
||||
header::HeaderValue, server::conn::http1, service::service_fn, Error, Method, Request, Response
|
||||
};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use tokio::{net::TcpListener, sync::Mutex};
|
||||
use tokio::{net::{TcpListener, TcpStream}, sync::Mutex};
|
||||
|
||||
mod logger;
|
||||
mod args;
|
||||
@@ -191,15 +191,102 @@ async fn new_question(
|
||||
Ok(response!(main_page req, "info", "Your question was successfully added."))
|
||||
}
|
||||
|
||||
struct PushEndpoint<'a> {
|
||||
uri: &'a hyper::Uri,
|
||||
host: &'a str,
|
||||
port: u16,
|
||||
addr: String,
|
||||
authority: String,
|
||||
}
|
||||
|
||||
impl<'a> PushEndpoint<'a> {
|
||||
fn new(uri: &'a hyper::Uri) -> Result<PushEndpoint<'a>, &'static str> {
|
||||
let host = uri.host().ok_or("no host provided")?;
|
||||
let port = uri.port_u16().ok_or("no port provided")?;
|
||||
let addr = format!("{}:{}", host, port);
|
||||
let authority = uri.authority().ok_or("no authority provided")?.to_string();
|
||||
Ok(Self { uri, host, port, addr, authority })
|
||||
}
|
||||
}
|
||||
|
||||
async fn maintenance(state: Arc<SharedState>) {
|
||||
let Ok(uri): Result<hyper::Uri, _> = state.config.push.endpoint.parse() else {
|
||||
log!(fatal "could not parse uri: {:?}", state.config.push.endpoint);
|
||||
return;
|
||||
};
|
||||
let uri = match PushEndpoint::new(&uri) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
log!(fatal "could not parse endpoint address: {:?}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let interval = std::time::Duration::from_secs(state.config.maintenance.interval);
|
||||
let mut questions: HashSet<String> = HashSet::default();
|
||||
log!(debug "started maintenance routine with {:?} interval", interval);
|
||||
loop {
|
||||
log!(debug "running maintenance");
|
||||
log!(debug "----- MAINTENANCE -----");
|
||||
let questions_new = state.questions.lock().await.consume_to_push();
|
||||
questions.extend(questions_new);
|
||||
log!(debug "pushing {} questions", questions.len());
|
||||
match push_questions(&questions, &uri).await {
|
||||
Ok(()) => questions = HashSet::default(),
|
||||
Err(()) => { log!(debug "push failed - will try again"); },
|
||||
};
|
||||
log!(debug "----- /MAINTENANCE -----");
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn make_push_request<'a>(
|
||||
questions: &HashSet<String>, uri: &PushEndpoint<'a>
|
||||
) -> Result<hyper::Request<Full<Bytes>>, ()> {
|
||||
let Ok(body) = serde_json::to_string(questions) else { return Err(()); };
|
||||
hyper::Request::builder()
|
||||
.uri(uri.uri)
|
||||
.header(hyper::header::HOST, &uri.authority)
|
||||
.header(hyper::header::CONTENT_TYPE, "application/json")
|
||||
.body(Full::new(Bytes::from(body))).map_err(|_| ())
|
||||
}
|
||||
|
||||
async fn connect_to_push_endpoint(
|
||||
addr: &String,
|
||||
) -> Result<hyper::client::conn::http1::SendRequest<Full<Bytes>>, ()> {
|
||||
let stream = TcpStream::connect(addr).await.map_err(|_| ())?;
|
||||
let (sender, conn) = hyper::client::conn::http1::handshake(
|
||||
TokioIo::new(stream)
|
||||
).await.map_err(|_| ())?;
|
||||
tokio::task::spawn(async move {
|
||||
conn.await
|
||||
});
|
||||
Ok(sender)
|
||||
}
|
||||
|
||||
async fn push_questions<'a>(questions: &HashSet<String>, uri: &PushEndpoint<'a>) -> Result<(), ()> {
|
||||
if questions.len() == 0 {
|
||||
log!(debug "skipping push - no new questions");
|
||||
return Ok(());
|
||||
}
|
||||
let Ok(mut conn) = connect_to_push_endpoint(&uri.addr).await else {
|
||||
log!(err "could not connect to push endpoint");
|
||||
return Err(());
|
||||
};
|
||||
let Ok(req) = make_push_request(questions, uri) else {
|
||||
log!(err "could not construct push questions request");
|
||||
return Err(());
|
||||
};
|
||||
let Ok(res) = conn.send_request(req).await else {
|
||||
log!(err "could not send questions request to push endpoint");
|
||||
return Err(());
|
||||
};
|
||||
if res.status() != hyper::StatusCode::OK {
|
||||
log!(err "got non-200 response from push endpoint");
|
||||
return Err(());
|
||||
}
|
||||
log!(info "successfully pushed new questions");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load_config(path: &str) -> Result<config::Config, String> {
|
||||
let Ok(file_contents) = std::fs::read_to_string(path) else {
|
||||
return Err("could not read the config file".to_string());
|
||||
|
Reference in New Issue
Block a user