From 37522825d16bb6cac1ac24838e6916d577ea3511 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin <win10@list.ru> Date: Sat, 18 Dec 2021 16:29:38 +0400 Subject: [PATCH] Refactored code, Fixed creation. Signed-off-by: Pavel Kirilin <win10@list.ru> --- Cargo.lock | 1 + Cargo.toml | 1 + src/config.rs | 18 +++- src/main.rs | 17 +++- src/protocol/core/routes.rs | 40 +++----- src/protocol/creation/routes.rs | 105 ++++++++++++++++---- src/protocol/creation_with_upload/mod.rs | 16 --- src/protocol/creation_with_upload/routes.rs | 69 ------------- src/protocol/mod.rs | 5 +- src/utils/headers.rs | 43 ++++++++ src/utils/mod.rs | 1 + 11 files changed, 180 insertions(+), 136 deletions(-) delete mode 100644 src/protocol/creation_with_upload/mod.rs delete mode 100644 src/protocol/creation_with_upload/routes.rs create mode 100644 src/utils/headers.rs create mode 100644 src/utils/mod.rs diff --git a/Cargo.lock b/Cargo.lock index ac0894e..d3d38a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1913,6 +1913,7 @@ dependencies = [ "actix-web", "async-std", "async-trait", + "base64", "chrono", "derive_more", "log", diff --git a/Cargo.toml b/Cargo.toml index 8e5de99..bf9322d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,5 +22,6 @@ chrono = { version = "^0.4.19", features = ["serde"] } serde_json = "1" log = "^0.4.14" url = "2.2.2" +base64 = "^0.13.0" simple-logging = { version = "^2.0.2" } sqlx = { version = "0.5", features = ["runtime-async-std-native-tls", "sqlite"] } \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 69f1858..f0fddef 100644 --- a/src/config.rs +++ b/src/config.rs @@ -72,7 +72,7 @@ pub struct RustusConf { /// Enabled extensions for TUS protocol. #[structopt( long, - default_value = "creation,creation-with-upload,getting", + default_value = "getting,creation,creation-with-upload,creation-defer-length", env = "RUSTUS_EXTENSIONS" )] pub extensions: String, @@ -84,6 +84,7 @@ pub struct RustusConf { /// Enum of available Protocol Extensions #[derive(PartialEq, PartialOrd, Ord, Eq)] pub enum ProtocolExtensions { + CreationDeferLength, CreationWithUpload, Creation, Termination, @@ -100,6 +101,7 @@ impl TryFrom<String> for ProtocolExtensions { match value.as_str() { "creation" => Ok(ProtocolExtensions::Creation), "creation-with-upload" => Ok(ProtocolExtensions::CreationWithUpload), + "creation-defer-length" => Ok(ProtocolExtensions::CreationDeferLength), "termination" => Ok(ProtocolExtensions::Termination), "getting" => Ok(ProtocolExtensions::Getting), _ => Err(RustusError::UnknownExtension(value.clone())), @@ -116,6 +118,7 @@ impl From<ProtocolExtensions> for String { ProtocolExtensions::CreationWithUpload => "creation-with-upload".into(), ProtocolExtensions::Termination => "termination".into(), ProtocolExtensions::Getting => "getting".into(), + ProtocolExtensions::CreationDeferLength => "creation-defer-length".into(), } } } @@ -164,12 +167,23 @@ impl RustusConf { .split(',') .flat_map(|ext| ProtocolExtensions::try_from(String::from(ext))) .collect::<Vec<ProtocolExtensions>>(); - // creation-with-upload + + // If create-with-upload extension is enabled + // creation extension must be enabled too. if ext.contains(&ProtocolExtensions::CreationWithUpload) && !ext.contains(&ProtocolExtensions::Creation) { ext.push(ProtocolExtensions::Creation); } + + // If create-defer-length extension is enabled + // creation extension must be enabled too. + if ext.contains(&ProtocolExtensions::CreationDeferLength) + && !ext.contains(&ProtocolExtensions::Creation) + { + ext.push(ProtocolExtensions::Creation); + } + ext.sort(); ext } diff --git a/src/main.rs b/src/main.rs index 73a3f34..5fe1730 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,7 @@ use actix_web::{ dev::{Server, Service}, middleware, web, App, HttpServer, }; -use log::error; +use log::{error, info}; use config::RustusConf; @@ -17,6 +17,19 @@ mod errors; mod protocol; mod routes; mod storages; +mod utils; + +fn greeting(app_conf: &RustusConf) { + let extensions = app_conf + .extensions_vec() + .into_iter() + .map(String::from) + .collect::<Vec<String>>() + .join(","); + info!("Welcome to rustus!"); + info!("Base URL: {}", app_conf.base_url()); + info!("Available extensions {}", extensions); +} /// Creates Actix server. /// @@ -75,6 +88,7 @@ pub fn create_server( if let Some(workers_count) = workers { server = server.workers(workers_count); } + server = server.server_hostname("meme"); Ok(server.run()) } @@ -89,6 +103,7 @@ async fn main() -> std::io::Result<()> { error!("{}", err); return Err(err.into()); } + greeting(&app_conf); let server = create_server(storage, app_conf)?; server.await } diff --git a/src/protocol/core/routes.rs b/src/protocol/core/routes.rs index 2966b36..02da1a0 100644 --- a/src/protocol/core/routes.rs +++ b/src/protocol/core/routes.rs @@ -6,6 +6,7 @@ use actix_web::{ HttpRequest, HttpResponse, }; +use crate::utils::headers::{check_header, parse_header}; use crate::{RustusConf, Storage}; #[allow(clippy::needless_pass_by_value)] @@ -25,17 +26,20 @@ pub async fn get_file_info( storage: web::Data<Box<dyn Storage + Send + Sync>>, request: HttpRequest, ) -> actix_web::Result<HttpResponse> { - let resp = if let Some(file_id) = request.match_info().get("file_id") { + let mut builder = HttpResponseBuilder::new(StatusCode::OK); + if let Some(file_id) = request.match_info().get("file_id") { let file_info = storage.get_file_info(file_id).await?; - HttpResponseBuilder::new(StatusCode::OK) + builder .set_header("Upload-Offset", file_info.offset.to_string()) .set_header("Upload-Length", file_info.length.to_string()) - .set_header("Content-Length", file_info.offset.to_string()) - .body("") + .set_header("Content-Length", file_info.offset.to_string()); + if file_info.deferred_size { + builder.set_header("Upload-Defer-Length", "1"); + } } else { - HttpResponseBuilder::new(StatusCode::NOT_FOUND).body("") + builder.status(StatusCode::NOT_FOUND); }; - Ok(resp) + Ok(builder.body("")) } pub async fn write_bytes( @@ -43,31 +47,15 @@ pub async fn write_bytes( bytes: Bytes, storage: web::Data<Box<dyn Storage + Send + Sync>>, ) -> actix_web::Result<HttpResponse> { - let content_type = - request - .headers() - .get("Content-Type") - .and_then(|header_val| match header_val.to_str() { - Ok(val) => Some(val == "application/offset+octet-stream"), - Err(_) => None, - }); - if Some(true) != content_type { + if !check_header(&request, "Content-Type", "application/offset+octet-stream") { return Ok(HttpResponseBuilder::new(StatusCode::UNSUPPORTED_MEDIA_TYPE).body("")); } - let offset = request - .headers() - .get("Upload-Offset") - .and_then(|header_val| match header_val.to_str() { - Ok(val) => Some(String::from(val)), - Err(_) => None, - }) - .and_then(|val| match val.parse::<usize>() { - Ok(offset) => Some(offset), - Err(_) => None, - }); + let offset = parse_header(&request, "Upload-Offset"); + if offset.is_none() { return Ok(HttpResponseBuilder::new(StatusCode::UNSUPPORTED_MEDIA_TYPE).body("")); } + if let Some(file_id) = request.match_info().get("file_id") { let offset = storage .add_bytes(file_id, offset.unwrap(), bytes.bytes()) diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs index beb19b7..7b97ab2 100644 --- a/src/protocol/creation/routes.rs +++ b/src/protocol/creation/routes.rs @@ -1,28 +1,97 @@ -use actix_web::dev::HttpResponseBuilder; -use actix_web::http::StatusCode; +use std::collections::HashMap; + +use actix_web::web::{Buf, Bytes}; use actix_web::{web, HttpRequest, HttpResponse}; -use crate::Storage; +use crate::config::ProtocolExtensions; +use crate::utils::headers::{check_header, parse_header}; +use crate::{RustusConf, Storage}; + +/// Get metadata info from request. +/// +/// Metadata is located in Upload-Metadata header. +/// Key and values are separated by spaces and +/// pairs are delimited with commas. +/// +/// E.G. +/// `Upload-Metadata: Video bWVtZXM=,Category bWVtZXM=` +/// +/// All values are encoded as base64 strings. +fn get_metadata(request: &HttpRequest) -> Option<HashMap<String, String>> { + request + .headers() + .get("Upload-Metadata") + .and_then(|her| match her.to_str() { + Ok(str_val) => Some(String::from(str_val)), + Err(_) => None, + }) + .map(|header_string| { + let mut meta_map = HashMap::new(); + for meta_pair in header_string.split(',') { + let mut split = meta_pair.split(' '); + let key = split.next(); + let b64val = split.next(); + if key.is_none() || b64val.is_none() { + continue; + } + let value = + base64::decode(b64val.unwrap()).map(|value| match String::from_utf8(value) { + Ok(val) => Some(val), + Err(_) => None, + }); + if let Ok(Some(res)) = value { + meta_map.insert(String::from(key.unwrap()), res); + } + } + meta_map + }) +} pub async fn create_file( storage: web::Data<Box<dyn Storage + Send + Sync>>, + app_conf: web::Data<RustusConf>, request: HttpRequest, + bytes: Bytes, ) -> actix_web::Result<HttpResponse> { - let length = request - .headers() - .get("Upload-Length") - .and_then(|value| match value.to_str() { - Ok(header_str) => Some(String::from(header_str)), - Err(_) => None, - }) - .and_then(|val| match val.parse::<usize>() { - Ok(num) => Some(num), - Err(_) => None, - }); - let file_id = storage.create_file(length, None).await?; - let upload_url = request.url_for("core:write_bytes", &[file_id])?; - Ok(HttpResponseBuilder::new(StatusCode::CREATED) + // Getting Upload-Length header value as usize. + let length = parse_header(&request, "Upload-Length"); + // Checking Upload-Defer-Length header. + let defer_size = check_header(&request, "Upload-Defer-Length", "1"); + + // Indicator that creation-defer-length is enabled. + let defer_ext = app_conf + .extensions_vec() + .contains(&ProtocolExtensions::CreationDeferLength); + + // Check that Upload-Length header is provided. + // Otherwise checking that defer-size feature is enabled + // and header provided. + if length.is_none() && (defer_ext && !defer_size) { + return Ok(HttpResponse::BadRequest().body("")); + } + + let meta = get_metadata(&request); + // Create file and get the id. + let file_id = storage.create_file(length, meta).await?; + + // Create upload URL for this file. + let upload_url = request.url_for("core:write_bytes", &[file_id.clone()])?; + + let mut upload_offset = 0; + + // Checking if creation-with-upload extension is enabled. + let with_upload = app_conf + .extensions_vec() + .contains(&ProtocolExtensions::CreationWithUpload); + if with_upload && !bytes.is_empty() { + // Writing first bytes. + upload_offset = storage + .add_bytes(file_id.as_str(), 0, bytes.bytes()) + .await?; + } + + Ok(HttpResponse::Created() .set_header("Location", upload_url.as_str()) - .set_header("Upload-Offset", "0") + .set_header("Upload-Offset", upload_offset.to_string()) .body("")) } diff --git a/src/protocol/creation_with_upload/mod.rs b/src/protocol/creation_with_upload/mod.rs deleted file mode 100644 index 797113a..0000000 --- a/src/protocol/creation_with_upload/mod.rs +++ /dev/null @@ -1,16 +0,0 @@ -use actix_web::{guard, web}; - -use crate::RustusConf; - -mod routes; - -pub fn add_extension(web_app: &mut web::ServiceConfig, app_conf: &RustusConf) { - web_app.service( - // Post /base - // URL for creating files. - web::resource(app_conf.base_url().as_str()) - .name("creation-with-upload:create_file") - .guard(guard::Post()) - .to(routes::create_file), - ); -} diff --git a/src/protocol/creation_with_upload/routes.rs b/src/protocol/creation_with_upload/routes.rs deleted file mode 100644 index 7b6cb95..0000000 --- a/src/protocol/creation_with_upload/routes.rs +++ /dev/null @@ -1,69 +0,0 @@ -use actix_web::dev::HttpResponseBuilder; -use actix_web::http::StatusCode; -use actix_web::web::{Buf, Bytes}; -use actix_web::{web, HttpRequest, HttpResponse}; - -use crate::Storage; - -/// Creates files with initial bytes. -/// -/// This function is similar to -/// `creation:create_file`, -/// except that it can write bytes -/// right after it created a data file. -pub async fn create_file( - storage: web::Data<Box<dyn Storage + Send + Sync>>, - request: HttpRequest, - bytes: Bytes, -) -> actix_web::Result<HttpResponse> { - let length = request - .headers() - .get("Upload-Length") - .and_then(|value| match value.to_str() { - Ok(header_str) => Some(String::from(header_str)), - Err(_) => None, - }) - .and_then(|val| match val.parse::<usize>() { - Ok(num) => Some(num), - Err(_) => None, - }); - let file_id = storage.create_file(length, None).await?; - let mut upload_offset = 0; - if !bytes.is_empty() { - // Checking if content type matches. - let content_type = request - .headers() - .get("Content-Type") - .and_then(|header_val| match header_val.to_str() { - Ok(val) => Some(val == "application/offset+octet-stream"), - Err(_) => None, - }); - if Some(true) != content_type { - return Ok(HttpResponseBuilder::new(StatusCode::UNSUPPORTED_MEDIA_TYPE).body("")); - } - // Checking if - let offset = request - .headers() - .get("Upload-Offset") - .and_then(|header_val| match header_val.to_str() { - Ok(val) => Some(String::from(val)), - Err(_) => None, - }) - .and_then(|val| match val.parse::<usize>() { - Ok(offset) => Some(offset), - Err(_) => None, - }); - if offset.is_none() { - return Ok(HttpResponseBuilder::new(StatusCode::UNSUPPORTED_MEDIA_TYPE).body("")); - } - - upload_offset = storage - .add_bytes(file_id.as_str(), offset.unwrap(), bytes.bytes()) - .await?; - } - let upload_url = request.url_for("core:write_bytes", &[file_id])?; - Ok(HttpResponseBuilder::new(StatusCode::CREATED) - .set_header("Location", upload_url.as_str()) - .set_header("Upload-Offset", upload_offset.to_string()) - .body("")) -} diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 605e791..38d198c 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -5,7 +5,6 @@ use crate::RustusConf; mod core; mod creation; -mod creation_with_upload; mod getting; mod termination; @@ -18,15 +17,13 @@ pub fn setup(app_conf: RustusConf) -> Box<dyn Fn(&mut web::ServiceConfig)> { for extension in app_conf.extensions_vec() { match extension { ProtocolExtensions::Creation => creation::add_extension(web_app, &app_conf), - ProtocolExtensions::CreationWithUpload => { - creation_with_upload::add_extension(web_app, &app_conf); - } ProtocolExtensions::Termination => { termination::add_extension(web_app, &app_conf); } ProtocolExtensions::Getting => { getting::add_extension(web_app, &app_conf); } + _ => {} } } core::add_extension(web_app, &app_conf); diff --git a/src/utils/headers.rs b/src/utils/headers.rs new file mode 100644 index 0000000..4a5b678 --- /dev/null +++ b/src/utils/headers.rs @@ -0,0 +1,43 @@ +use std::str::FromStr; + +use actix_web::HttpRequest; + +/// Parse header's value. +/// +/// This function will try to parse +/// header's value to some type T. +/// +/// If header is not present or value +/// can't be parsed then it returns None. +pub fn parse_header<T: FromStr>(request: &HttpRequest, header_name: &str) -> Option<T> { + request + .headers() + // Get header + .get(header_name) + .and_then(|value| + // Parsing it to string. + match value.to_str() { + Ok(header_str) => Some(String::from(header_str)), + Err(_) => None, + }) + .and_then(|val| + // Parsing to type T. + match val.parse::<T>() { + Ok(num) => Some(num), + Err(_) => None, + }) +} + +/// Check that header's value is equal to some value. +/// +/// Returns false if header is not present or values don't match. +pub fn check_header(request: &HttpRequest, header_name: &str, value: &str) -> bool { + request + .headers() + .get(header_name) + .and_then(|header_val| match header_val.to_str() { + Ok(val) => Some(val == value), + Err(_) => None, + }) + .unwrap_or(false) +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..7b67a24 --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod headers; -- GitLab