From 002755eb889d77e2a8f9be41a379d515a0eb6afd Mon Sep 17 00:00:00 2001
From: Pavel Kirilin <win10@list.ru>
Date: Thu, 20 Jan 2022 18:19:01 +0400
Subject: [PATCH] Initial concat protocol.

Signed-off-by: Pavel Kirilin <win10@list.ru>
---
 src/config.rs                          |  2 +-
 src/info_storages/models/file_info.rs  |  6 +++
 src/notifiers/models/message_format.rs |  6 +--
 src/protocol/core/routes.rs            |  3 +-
 src/protocol/creation/routes.rs        | 68 ++++++++++++++++++++++++--
 src/protocol/extensions.rs             |  2 +
 src/storages/file_storage.rs           | 30 +++++++++++-
 src/storages/models/storage.rs         | 15 ++++++
 src/utils/headers.rs                   |  8 +--
 9 files changed, 125 insertions(+), 15 deletions(-)

diff --git a/src/config.rs b/src/config.rs
index 15c1ab7..2a22ab3 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -177,7 +177,7 @@ pub struct RustusConf {
     /// Enabled extensions for TUS protocol.
     #[structopt(
         long,
-        default_value = "getting,creation,termination,creation-with-upload,creation-defer-length",
+        default_value = "getting,creation,termination,creation-with-upload,creation-defer-length,concatenation",
         env = "RUSTUS_TUS_EXTENSIONS",
         use_delimiter = true
     )]
diff --git a/src/info_storages/models/file_info.rs b/src/info_storages/models/file_info.rs
index c593df4..44c55a1 100644
--- a/src/info_storages/models/file_info.rs
+++ b/src/info_storages/models/file_info.rs
@@ -15,6 +15,9 @@ pub struct FileInfo {
     #[serde(with = "ts_seconds")]
     pub created_at: DateTime<Utc>,
     pub deferred_size: bool,
+    pub is_partial: bool,
+    pub is_final: bool,
+    pub parts: Option<Vec<String>>,
     pub storage: String,
     pub metadata: HashMap<String, String>,
 }
@@ -55,6 +58,9 @@ impl FileInfo {
             metadata,
             deferred_size,
             offset: 0,
+            is_final: false,
+            is_partial: false,
+            parts: None,
             created_at: chrono::Utc::now(),
         }
     }
diff --git a/src/notifiers/models/message_format.rs b/src/notifiers/models/message_format.rs
index 6c9fb9c..6d80200 100644
--- a/src/notifiers/models/message_format.rs
+++ b/src/notifiers/models/message_format.rs
@@ -63,9 +63,9 @@ impl From<FileInfo> for TusdFileInfo {
             offset: file_info.offset,
             size: file_info.length,
             size_is_deferred: deferred_size,
-            is_final: true,
-            is_partial: false,
-            partial_uploads: None,
+            is_final: file_info.is_final,
+            is_partial: file_info.is_partial,
+            partial_uploads: file_info.parts,
             metadata: file_info.metadata,
             storage: TusdStorageInfo {
                 storage_type: file_info.storage,
diff --git a/src/protocol/core/routes.rs b/src/protocol/core/routes.rs
index 72fb39e..de51352 100644
--- a/src/protocol/core/routes.rs
+++ b/src/protocol/core/routes.rs
@@ -56,7 +56,8 @@ pub async fn write_bytes(
     state: web::Data<State>,
 ) -> actix_web::Result<HttpResponse> {
     // Checking if request has required headers.
-    if !check_header(&request, "Content-Type", "application/offset+octet-stream") {
+    let check_content_type = |val: &str| val == "application/offset+octet-stream";
+    if !check_header(&request, "Content-Type", check_content_type) {
         return Ok(HttpResponse::UnsupportedMediaType().body(""));
     }
     // Getting current offset.
diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs
index 0e8a30a..54d5181 100644
--- a/src/protocol/creation/routes.rs
+++ b/src/protocol/creation/routes.rs
@@ -49,6 +49,17 @@ fn get_metadata(request: &HttpRequest) -> Option<HashMap<String, String>> {
         })
 }
 
+fn get_upload_parts(request: &HttpRequest) -> Vec<String> {
+    let concat_header = request.headers().get("Upload-Concat").unwrap();
+    let header_str = concat_header.to_str().unwrap();
+    let urls = header_str.strip_prefix("final;").unwrap();
+
+    urls.split(' ')
+        .filter_map(|val: &str| val.split('/').last().map(String::from))
+        .filter(|val| val.trim() != "")
+        .collect()
+}
+
 /// Create file.
 ///
 /// This method allows you to create file to start uploading.
@@ -57,6 +68,7 @@ fn get_metadata(request: &HttpRequest) -> Option<HashMap<String, String>> {
 /// you don't know actual file length and
 /// you can upload first bytes if creation-with-upload
 /// extension is enabled.
+#[allow(clippy::too_many_lines)]
 pub async fn create_file(
     state: web::Data<State>,
     request: HttpRequest,
@@ -65,7 +77,7 @@ pub async fn create_file(
     // Getting Upload-Length header value as usize.
     let length = parse_header(&request, "Upload-Length");
     // Checking Upload-Defer-Length header.
-    let defer_size = check_header(&request, "Upload-Defer-Length", "1");
+    let defer_size = check_header(&request, "Upload-Defer-Length", |val| val == "1");
 
     // Indicator that creation-defer-length is enabled.
     let defer_ext = state
@@ -73,11 +85,18 @@ pub async fn create_file(
         .extensions_vec()
         .contains(&Extensions::CreationDeferLength);
 
+    let is_final = check_header(&request, "Upload-Concat", |val| val.starts_with("final;"));
+
+    let concat_ext = state
+        .config
+        .extensions_vec()
+        .contains(&Extensions::Concatenation);
+
     // Check that Upload-Length header is provided.
     // Otherwise checking that defer-size feature is enabled
     // and header provided.
-    if length.is_none() && (defer_ext && !defer_size) {
-        return Ok(HttpResponse::BadRequest().body(""));
+    if length.is_none() && !((defer_ext && defer_size) || (concat_ext && is_final)) {
+        return Ok(HttpResponse::BadRequest().body("Upload-Length header is required"));
     }
 
     let meta = get_metadata(&request);
@@ -91,6 +110,18 @@ pub async fn create_file(
         meta.clone(),
     );
 
+    let is_partial = check_header(&request, "Upload-Concat", |val| val == "partial");
+
+    if concat_ext {
+        if is_final {
+            file_info.is_final = true;
+            file_info.parts = Some(get_upload_parts(&request));
+        }
+        if is_partial {
+            file_info.is_partial = true;
+        }
+    }
+
     if state.config.hook_is_active(Hook::PreCreate) {
         let message = state
             .config
@@ -107,6 +138,32 @@ pub async fn create_file(
     // Create file and get the it's path.
     file_info.path = Some(state.data_storage.create_file(&file_info).await?);
 
+    if file_info.is_final {
+        let mut final_size = 0;
+        let mut parts_info = Vec::new();
+        for part_id in file_info.clone().parts.unwrap() {
+            let part = state.info_storage.get_info(part_id.as_str()).await?;
+            if part.length != Some(part.offset) {
+                return Ok(
+                    HttpResponse::BadRequest().body(format!("{} upload is not complete.", part.id))
+                );
+            }
+            if !part.is_partial {
+                return Ok(
+                    HttpResponse::BadRequest().body(format!("{} upload is not partial.", part.id))
+                );
+            }
+            final_size += &part.length.unwrap();
+            parts_info.push(part.clone());
+        }
+        state
+            .data_storage
+            .concat_files(&file_info, parts_info)
+            .await?;
+        file_info.offset = final_size;
+        file_info.length = Some(final_size);
+    }
+
     // Create upload URL for this file.
     let upload_url = request.url_for("core:write_bytes", &[file_info.id.clone()])?;
 
@@ -115,8 +172,9 @@ pub async fn create_file(
         .config
         .extensions_vec()
         .contains(&Extensions::CreationWithUpload);
-    if with_upload && !bytes.is_empty() {
-        if !check_header(&request, "Content-Type", "application/offset+octet-stream") {
+    if with_upload && !bytes.is_empty() && !(concat_ext && is_final) {
+        let octet_stream = |val: &str| val == "application/offset+octet-stream";
+        if !check_header(&request, "Content-Type", octet_stream) {
             return Ok(HttpResponse::BadRequest().body(""));
         }
         // Writing first bytes.
diff --git a/src/protocol/extensions.rs b/src/protocol/extensions.rs
index 5624b83..4eba162 100644
--- a/src/protocol/extensions.rs
+++ b/src/protocol/extensions.rs
@@ -14,6 +14,8 @@ pub enum Extensions {
     Creation,
     #[display(fmt = "termination")]
     Termination,
+    #[display(fmt = "concatenation")]
+    Concatenation,
     #[display(fmt = "getting")]
     Getting,
 }
diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs
index 8d8c989..12d8b15 100644
--- a/src/storages/file_storage.rs
+++ b/src/storages/file_storage.rs
@@ -1,7 +1,8 @@
 use std::path::PathBuf;
 
 use actix_files::NamedFile;
-use async_std::fs::{remove_file, DirBuilder, OpenOptions};
+use async_std::fs::{remove_file, DirBuilder, File, OpenOptions};
+use async_std::io::copy;
 use async_std::prelude::*;
 use async_trait::async_trait;
 use log::error;
@@ -128,6 +129,33 @@ impl Storage for FileStorage {
         Ok(file_path.display().to_string())
     }
 
+    async fn concat_files(
+        &self,
+        file_info: &FileInfo,
+        parts_info: Vec<FileInfo>,
+    ) -> RustusResult<()> {
+        let mut file = OpenOptions::new()
+            .write(true)
+            .append(true)
+            .create(false)
+            .create_new(false)
+            .open(file_info.path.as_ref().unwrap().clone())
+            .await
+            .map_err(|err| {
+                error!("{:?}", err);
+                RustusError::UnableToWrite(err.to_string())
+            })?;
+        for part in parts_info {
+            if part.path.is_none() {
+                return Err(RustusError::FileNotFound);
+            }
+            let mut part_file = File::open(part.path.as_ref().unwrap()).await?;
+            copy(&mut part_file, &mut file).await?;
+        }
+        file.sync_data().await?;
+        Ok(())
+    }
+
     async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> {
         // Let's remove the file itself.
         let data_path = PathBuf::from(file_info.path.as_ref().unwrap().clone());
diff --git a/src/storages/models/storage.rs b/src/storages/models/storage.rs
index e129443..0d790b9 100644
--- a/src/storages/models/storage.rs
+++ b/src/storages/models/storage.rs
@@ -53,6 +53,21 @@ pub trait Storage: Display {
     /// `file_info` - info about current file.
     async fn create_file(&self, file_info: &FileInfo) -> RustusResult<String>;
 
+    /// Concatenate files.
+    ///
+    /// This method is used to merge multiple files together.
+    ///
+    /// This function is used by concat extension of the protocol.
+    ///
+    /// # Params
+    /// `file_info` - info about current file.
+    /// `parts_info` - info about merged files.
+    async fn concat_files(
+        &self,
+        file_info: &FileInfo,
+        parts_info: Vec<FileInfo>,
+    ) -> RustusResult<()>;
+
     /// Remove file from storage
     ///
     /// This method removes file and all associated
diff --git a/src/utils/headers.rs b/src/utils/headers.rs
index 4a5b678..f2b0c42 100644
--- a/src/utils/headers.rs
+++ b/src/utils/headers.rs
@@ -28,15 +28,15 @@ pub fn parse_header<T: FromStr>(request: &HttpRequest, header_name: &str) -> Opt
             })
 }
 
-/// Check that header's value is equal to some value.
+/// Check that header value satisfies some predicate.
 ///
-/// Returns false if header is not present or values don't match.
-pub fn check_header(request: &HttpRequest, header_name: &str, value: &str) -> bool {
+/// Passes header as a parameter to expr if header is present.
+pub fn check_header(request: &HttpRequest, header_name: &str, expr: fn(&str) -> bool) -> bool {
     request
         .headers()
         .get(header_name)
         .and_then(|header_val| match header_val.to_str() {
-            Ok(val) => Some(val == value),
+            Ok(val) => Some(expr(val)),
             Err(_) => None,
         })
         .unwrap_or(false)
-- 
GitLab