add basic question uploading

This commit is contained in:
2025-05-02 13:03:30 +02:00
parent 4af8c825bb
commit e90dfa0b0b
10 changed files with 656 additions and 43 deletions

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2024"
[dependencies]
askama = "0.14.0"
chrono = "0.4.41"
http-body-util = "0.1.3"
hyper = { version = "1.6.0", features = ["full"] }
@@ -11,3 +12,4 @@ hyper-util = { version = "0.1.11", features = ["full"] }
serde = { version = "1.0.219", features = ["derive"] }
tokio = { version = "1.44.2", features = ["full"] }
toml = "0.8.22"
url = "2.5.4"

View File

@@ -0,0 +1,2 @@
[general]
dirs = ["../dst/"]

View File

@@ -1,9 +1,8 @@
use std::net::SocketAddr;
use serde::Deserialize;
macro_rules! def_config {
($(config $config_id: ident
{ $([$namespace: ident $struct_name: ident]$($var_id: ident = $var_type: ty),*)*$(,)? })*
{ $([$namespace: ident $struct_name: ident]$($var_id: ident = $var_type: ty),*$(,)?)* })*
) => {
$(
#[derive(Deserialize)]
@@ -24,7 +23,17 @@ def_config! {
config Config {
[server Server]
bind_to = SocketAddr, // where to bind the server tcp socket in format: "<host>:<port>"
bind_to = std::net::SocketAddr, // recommended format: "<host>:<port>"
max_question_body_size = u64 // in bytes
[performance Performance]
memory_limit = usize,
[maintenance Maintenance]
interval = u64, // in seconds
[push Push]
endpoint = std::net::SocketAddr, // recommended format: "<host>:<port>"
}

View File

@@ -17,4 +17,9 @@ macro_rules! log {
let now = chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, false);
println!(concat!("[{}] ERROR ", $text), now, $($($arg),*)?);
};
(fatal $text: literal$(, $($arg: expr),*$(,)?)?) => {
let now = chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, false);
println!(concat!("[{}] !!FATAL ERROR!! ", $text), now, $($($arg),*)?);
};
}

View File

@@ -1,62 +1,259 @@
use std::sync::{atomic::AtomicU32, Arc};
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use std::{collections::HashSet, sync::{atomic::{AtomicUsize, Ordering}, Arc}};
use askama::Template;
use http_body_util::{combinators::BoxBody, BodyExt, Collected, Full};
use hyper::{
body::{Bytes, Incoming},
server::conn::http1, service::service_fn, Error, Method, Request, Response
body::{Body, Bytes, Incoming},
header::HeaderValue, server::conn::http1, service::service_fn, Error, Method, Request, Response
};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::{net::TcpListener, sync::Mutex};
mod logger;
mod args;
mod config;
const CTYPE_FORM: HeaderValue = HeaderValue::from_static("application/x-www-form-urlencoded");
macro_rules! response {
($status: ident $body: expr) => {{
(@server) => { "zumepro/ask_stallman" };
(file $req: expr, $source: literal, $mime: literal) => {
response!(
$req, OK include_str!(concat!("../../dst/", $source)),
CONTENT_TYPE: $mime,
CACHE_CONTROL: "max-age=180, public",
)
};
(main_page $req: expr, $message_class: literal, $message: literal) => {
response!( $req, OK MainPage {
ntfy_class: $message_class, message: $message, prefill_question: None
}.render()?, CONTENT_TYPE: "text/html")
};
(main_page $req: expr, $message_class: literal, $message: literal$(, $prefill: expr)?) => {
response!($req, OK MainPage {
ntfy_class: $message_class, message: $message, prefill_question: Some($prefill)
}.render()?, CONTENT_TYPE: "text/html")
};
($req: expr, $status: ident $body: expr) => {{
log!(info "{} \"{} {:?}\"", hyper::StatusCode::$status, $req.method(), $req.uri());
let mut res = Response::new(Full::new(Bytes::from($body)).map_err(|n| match n {}).boxed());
res.headers_mut().append(
hyper::header::SERVER, hyper::header::HeaderValue::from_static(response!(@server))
);
*res.status_mut() = hyper::StatusCode::$status;
res
}};
($status: ident $body: expr, $($hkey: ident : $hval: literal),*$(,)?) => {{
($req: expr, $status: ident $body: expr, $($hkey: ident : $hval: literal),*$(,)?) => {{
log!(info "{} \"{} {:?}\"", hyper::StatusCode::$status, $req.method(), $req.uri());
let mut res = Response::new(Full::new(Bytes::from($body)).map_err(|n| match n {}).boxed());
*res.status_mut() = hyper::StatusCode::$status;
res.headers_mut().append(
hyper::header::SERVER, hyper::header::HeaderValue::from_static(response!(@server))
);
$(res.headers_mut()
.append(hyper::header::$hkey, hyper::header::HeaderValue::from_static($hval));)*
res
}};
}
#[derive(askama::Template, Default)]
#[template(path = "index.html")]
struct MainPage<'a> {
ntfy_class: &'a str,
message: &'a str,
prefill_question: Option<String>,
}
#[derive(Debug)]
enum RouterError {
Templating(askama::Error),
NotImplemented,
IO(Error),
}
impl std::fmt::Display for RouterError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Templating(e) => write!(f, "templating: {}", e),
Self::IO(e) => write!(f, "io: {}", e),
Self::NotImplemented => write!(f, "not implemented"),
}
}
}
impl std::error::Error for RouterError {
fn cause(&self) -> Option<&dyn std::error::Error> {
match self {
Self::Templating(e) => Some(e),
Self::IO(e) => Some(e),
_ => None,
}
}
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Templating(e) => Some(e),
Self::IO(e) => Some(e),
_ => None,
}
}
fn description(&self) -> &str {
match self {
Self::Templating(_) => "could not construct template",
Self::IO(_) => "i/o error",
Self::NotImplemented => "reached a code block not yet implemented",
}
}
}
impl From<Error> for RouterError {
fn from(value: Error) -> Self {
Self::IO(value)
}
}
impl From<askama::Error> for RouterError {
fn from(value: askama::Error) -> Self {
Self::Templating(value)
}
}
async fn router(
req: Request<Incoming>,
_: Arc<SharedState>,
) -> Result<Response<BoxBody<Bytes, Error>>, Error> {
state: Arc<SharedState>,
) -> Result<Response<BoxBody<Bytes, Error>>, RouterError> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") => Ok(response!(
OK include_str!("../../dst/index.html"),
CONTENT_TYPE: "text/html",
)),
_ => Ok(response!(
// pages
(&Method::GET, "/") => Ok(response!(main_page req, "", "")),
(&Method::POST, "/") => new_question(req, state).await,
// assets
(&Method::GET, "/script.js") => Ok(response!(file req, "script.js", "text/javascript")),
(&Method::GET, "/style.css") => Ok(response!(file req, "style.css", "text/css")),
_ => Ok(response!(req,
NOT_FOUND include_str!("../../dst/not_found.html"),
CONTENT_TYPE: "text/html",
)),
}
}
fn load_config(path: &str) -> Result<config::Config, &'static str> {
let Ok(file_contents) = std::fs::read_to_string(path) else {
return Err("could not read the config file");
fn parse_form<'a>(bytes: &'a [u8]) -> Result<std::borrow::Cow<'a, str>, ()> {
let parsed = url::form_urlencoded::parse(bytes);
for field in parsed {
if field.0 == "question" {
return Ok(field.1);
}
}
Err(())
}
async fn new_question(
mut req: Request<Incoming>,
state: Arc<SharedState>,
) -> Result<Response<BoxBody<Bytes, Error>>, RouterError> {
// check size
let body_size = req.body().size_hint().upper().unwrap_or(u64::MAX);
if body_size > state.config.server.max_question_body_size {
return Ok(response!(main_page req, "error", "Question was too long to add."));
}
// check headers
if req.headers().get(hyper::header::CONTENT_TYPE) != Some(&CTYPE_FORM) {
return Ok(response!(main_page req,
"error", "Your browser sent a POST request without form data. Please try again."
));
}
// get question
let body = (&mut req).collect().await?.to_bytes();
let Ok(question) = parse_form(&body) else {
return Ok(response!(main_page req,
"error", "The question your browser sent was in invalid format. Please try again."
));
};
toml::from_str(&file_contents).map_err(|_| "invalid config file structure or fields")
let question = question.to_string();
// insert question
match state.questions.lock().await.add_new(question) {
Ok(()) => {},
Err(()) => return Ok(response!(main_page req,
"error", "We got too many questions in total. So we are not accepting new ones \
anymore. We are so sorry. :(")
)
}
Ok(response!(main_page req, "info", "Your question was successfully added."))
}
async fn maintenance(state: Arc<SharedState>) {
let interval = std::time::Duration::from_secs(state.config.maintenance.interval);
log!(debug "started maintenance routine with {:?} interval", interval);
loop {
log!(debug "running maintenance");
tokio::time::sleep(interval).await;
}
}
fn load_config(path: &str) -> Result<config::Config, String> {
let Ok(file_contents) = std::fs::read_to_string(path) else {
return Err("could not read the config file".to_string());
};
toml::from_str(&file_contents)
.map_err(|e| format!("invalid config file structure or fields: {:?}", e))
}
#[derive(Default)]
struct Questions {
total_size: usize,
max_size: usize,
to_push: HashSet<String>,
pushed: HashSet<String>,
}
impl Questions {
fn with_capacity(max_size: usize) -> Self {
Self {
max_size,
..Default::default()
}
}
fn add_new(&mut self, question: String) -> Result<(), ()> {
if self.pushed.contains(&question) || self.to_push.contains(&question) { return Ok(()); }
if self.total_size + question.len() > self.max_size {
return Err(());
}
self.total_size += question.len();
self.to_push.insert(question);
Ok(())
}
fn consume_to_push(&mut self) -> HashSet<String> {
for question in self.to_push.iter() {
self.pushed.insert(question.clone());
}
std::mem::take(&mut self.to_push)
}
}
struct SharedState {
counter: AtomicU32,
config: config::Config,
questions: Mutex<Questions>,
}
impl Default for SharedState {
fn default() -> Self {
Self { counter: 0.into() }
impl SharedState {
fn new(config: config::Config) -> Self {
let memory_limit = config.performance.memory_limit;
Self {
config,
questions: Mutex::new(Questions::with_capacity(memory_limit)),
}
}
}
@@ -73,34 +270,39 @@ async fn main() {
let config = match load_config(args.config_path()) {
Ok(v) => v,
Err(e) => {
eprintln!("{}", e);
log!(fatal "{}", e);
return;
}
};
// shared state
let state = Arc::new(SharedState::default());
let state = Arc::new(SharedState::new(config));
// server
let Ok(listener) = TcpListener::bind(config.server.bind_to).await else {
eprintln!("unable to bind to: {:?}", config.server.bind_to);
// server initialization
let Ok(listener) = TcpListener::bind(state.config.server.bind_to).await else {
log!(fatal "unable to bind to {:?}", state.config.server.bind_to);
return;
};
// server runtime
let state_maintenance = state.clone();
tokio::task::spawn(async move { maintenance(state_maintenance).await; });
log!(info "listening on {:?}", state.config.server.bind_to);
loop {
let Ok((stream, addr)) = listener.accept().await else {
eprintln!("unable to accept new connections");
log!(fatal "unable to accept new connections");
return;
};
let io = TokioIo::new(stream);
log!(debug "new connection from {:?}", addr);
let io = TokioIo::new(stream);
let state_clone = state.clone();
tokio::task::spawn(async move {
if let Err(_) = http1::Builder::new().serve_connection(io, service_fn(move |req| {
router(req, state_clone.clone())
})).await {
println!("closed connection");
log!(debug "transport error to {:?}", addr);
}
log!(debug "closing connection to {:?}", addr);
});
}
}