From c9391958ace5d0608ab9a214475e64cccd316d11 Mon Sep 17 00:00:00 2001
From: Pavel Kirilin <win10@list.ru>
Date: Sun, 6 Mar 2022 21:17:07 +0400
Subject: [PATCH] Made rustus go Brrrr. (#51)

Made rustus go Brrrr.
Description:
* Optimized number of syscalls for writing by using Buffered IO;
* Rmoved redundant clone calls;
* Changed Storage add_bytes signature;
* Moved serde calls to tokio;
* Removed redundant memory allocations.
* Async runtime is changed back to actix.
* Spawning tasks with actix runtime.
* Super mega solution to speedup fs ops.

Signed-off-by: Pavel Kirilin <win10@list.ru>
---
 Cargo.lock                              |   1 +
 Cargo.toml                              |   5 +-
 src/config.rs                           |   3 +
 src/errors.rs                           |   2 +
 src/info_storages/db_info_storage.rs    |   2 +-
 src/info_storages/file_info_storage.rs  |  77 ++++++----
 src/info_storages/models/file_info.rs   |  26 +++-
 src/info_storages/redis_info_storage.rs |   2 +-
 src/main.rs                             |   2 +-
 src/notifiers/dir_notifier.rs           |   1 +
 src/protocol/core/write_bytes.rs        |  28 ++--
 src/protocol/creation/routes.rs         |  12 +-
 src/protocol/getting/routes.rs          |   4 +-
 src/protocol/termination/routes.rs      |   2 +-
 src/state.rs                            |   1 +
 src/storages/file_storage.rs            | 185 +++++++++++++-----------
 src/storages/models/available_stores.rs |   1 +
 src/storages/models/storage.rs          |   3 +-
 src/utils/headers.rs                    |   2 +-
 19 files changed, 214 insertions(+), 145 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index cc866e4..decd5b4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2398,6 +2398,7 @@ dependencies = [
  "actix-web",
  "async-trait",
  "base64",
+ "bytes",
  "chrono",
  "derive_more",
  "fern",
diff --git a/Cargo.toml b/Cargo.toml
index b4c2507..4af3e82 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,6 +16,7 @@ serde_json = "1"
 strfmt = "^0.1.6"
 thiserror = "^1.0"
 url = "2.2.2"
+bytes = "1.1.0"
 
 [dependencies.futures]
 version = "0.3.21"
@@ -82,7 +83,7 @@ features = ["derive"]
 version = "0.23"
 
 [dependencies.tokio]
-features = ["time", "process", "fs"]
+features = ["time", "process", "fs", "io-std", "io-util", "rt-multi-thread", "bytes"]
 version = "1.4.0"
 
 [dependencies.tokio-amqp]
@@ -114,7 +115,7 @@ httptest = "0.15.4"
 
 [profile]
 [profile.release]
-lto = true
+lto = "fat"
 panic = "abort"
 opt-level = 3
 codegen-units = 1
diff --git a/src/config.rs b/src/config.rs
index 07cd41e..f1d862c 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -27,6 +27,9 @@ pub struct StorageOptions {
 
     #[structopt(long, env = "RUSTUS_DIR_STRUCTURE", default_value = "")]
     pub dir_structure: String,
+
+    #[structopt(long, parse(from_flag))]
+    pub force_fsync: bool,
 }
 
 #[derive(StructOpt, Debug, Clone)]
diff --git a/src/errors.rs b/src/errors.rs
index 942fe39..99eaec5 100644
--- a/src/errors.rs
+++ b/src/errors.rs
@@ -58,6 +58,8 @@ pub enum RustusError {
     AMQPPoolError(#[from] mobc_lapin::mobc::Error<lapin::Error>),
     #[error("Std error: {0}")]
     StdError(#[from] std::io::Error),
+    #[error("Can't spawn task: {0}")]
+    TokioSpawnError(#[from] tokio::task::JoinError),
 }
 
 /// This conversion allows us to use `RustusError` in the `main` function.
diff --git a/src/info_storages/db_info_storage.rs b/src/info_storages/db_info_storage.rs
index 164f2bc..baa0092 100644
--- a/src/info_storages/db_info_storage.rs
+++ b/src/info_storages/db_info_storage.rs
@@ -66,7 +66,7 @@ impl InfoStorage for DBInfoStorage {
     async fn get_info(&self, file_id: &str) -> RustusResult<FileInfo> {
         let model: Option<DbModel> = self.db.fetch_by_column("id", file_id).await?;
         if let Some(info) = model {
-            serde_json::from_str(info.info.as_str()).map_err(RustusError::from)
+            FileInfo::from_json(info.info.to_string()).await
         } else {
             Err(RustusError::FileNotFound)
         }
diff --git a/src/info_storages/file_info_storage.rs b/src/info_storages/file_info_storage.rs
index 51f5567..309b47a 100644
--- a/src/info_storages/file_info_storage.rs
+++ b/src/info_storages/file_info_storage.rs
@@ -1,9 +1,11 @@
+use std::io::{Read, Write};
 use std::path::PathBuf;
 
 use async_trait::async_trait;
 use log::error;
-use tokio::fs::{read_to_string, remove_file, DirBuilder, OpenOptions};
-use tokio::io::copy;
+use std::fs::{remove_file, File, OpenOptions};
+use std::io::{BufReader, BufWriter};
+use tokio::fs::DirBuilder;
 
 use crate::errors::{RustusError, RustusResult};
 use crate::info_storages::{FileInfo, InfoStorage};
@@ -35,42 +37,57 @@ impl InfoStorage for FileInfoStorage {
     }
 
     async fn set_info(&self, file_info: &FileInfo, create: bool) -> RustusResult<()> {
-        let mut file = OpenOptions::new()
-            .write(true)
-            .create(create)
-            .truncate(true)
-            .open(self.info_file_path(file_info.id.as_str()).as_path())
-            .await
-            .map_err(|err| {
-                error!("{:?}", err);
-                RustusError::UnableToWrite(err.to_string())
-            })?;
-        copy(&mut file_info.json().await?.as_bytes(), &mut file).await?;
-        tokio::task::spawn(async move { file.sync_data().await });
-        Ok(())
+        let info = file_info.clone();
+        let path = self.info_file_path(info.id.as_str());
+        actix_web::rt::task::spawn_blocking(move || {
+            let file = OpenOptions::new()
+                .write(true)
+                .create(create)
+                .truncate(true)
+                .open(path)
+                .map_err(|err| {
+                    error!("{:?}", err);
+                    RustusError::UnableToWrite(err.to_string())
+                })?;
+            let data = serde_json::to_string(&info).map_err(RustusError::from)?;
+            {
+                let mut writer = BufWriter::new(file);
+                writer.write_all(data.as_bytes())?;
+                writer.flush()?;
+            }
+            Ok(())
+        })
+        .await?
     }
 
     async fn get_info(&self, file_id: &str) -> RustusResult<FileInfo> {
         let info_path = self.info_file_path(file_id);
-        if !info_path.exists() {
-            return Err(RustusError::FileNotFound);
-        }
-        let contents = read_to_string(info_path).await.map_err(|err| {
-            error!("{:?}", err);
-            RustusError::UnableToReadInfo
-        })?;
-        serde_json::from_str::<FileInfo>(contents.as_str()).map_err(RustusError::from)
+        actix_web::rt::task::spawn_blocking(move || {
+            if !info_path.exists() {
+                return Err(RustusError::FileNotFound);
+            }
+            let info = File::open(info_path)?;
+            let mut contents = String::new();
+            let mut reader = BufReader::new(info);
+            reader.read_to_string(&mut contents)?;
+            serde_json::from_str::<FileInfo>(contents.as_str()).map_err(RustusError::from)
+        })
+        .await?
     }
 
     async fn remove_info(&self, file_id: &str) -> RustusResult<()> {
-        let info_path = self.info_file_path(file_id);
-        if !info_path.exists() {
-            return Err(RustusError::FileNotFound);
-        }
-        remove_file(info_path).await.map_err(|err| {
-            error!("{:?}", err);
-            RustusError::UnableToRemove(String::from(file_id))
+        let id = String::from(file_id);
+        let info_path = self.info_file_path(id.as_str());
+        actix_web::rt::task::spawn_blocking(move || {
+            if !info_path.exists() {
+                return Err(RustusError::FileNotFound);
+            }
+            remove_file(info_path).map_err(|err| {
+                error!("{:?}", err);
+                RustusError::UnableToRemove(id)
+            })
         })
+        .await?
     }
 }
 
diff --git a/src/info_storages/models/file_info.rs b/src/info_storages/models/file_info.rs
index 5202706..806ef3e 100644
--- a/src/info_storages/models/file_info.rs
+++ b/src/info_storages/models/file_info.rs
@@ -93,13 +93,25 @@ impl FileInfo {
 
     pub async fn json(&self) -> RustusResult<String> {
         let info_clone = self.clone();
-        let data = tokio::task::spawn_blocking(move || serde_json::to_string(&info_clone))
-            .await
-            .map_err(|err| {
-                error!("{}", err);
-                RustusError::UnableToWrite("Can't serialize info".into())
-            })??;
-        Ok(data)
+        actix_web::rt::task::spawn_blocking(move || {
+            serde_json::to_string(&info_clone).map_err(RustusError::from)
+        })
+        .await
+        .map_err(|err| {
+            error!("{}", err);
+            RustusError::UnableToWrite("Can't serialize info".into())
+        })?
+    }
+
+    pub async fn from_json(data: String) -> RustusResult<Self> {
+        actix_web::rt::task::spawn_blocking(move || {
+            serde_json::from_str::<Self>(data.as_str()).map_err(RustusError::from)
+        })
+        .await
+        .map_err(|err| {
+            error!("{}", err);
+            RustusError::UnableToWrite("Can't serialize info".into())
+        })?
     }
 
     #[cfg(test)]
diff --git a/src/info_storages/redis_info_storage.rs b/src/info_storages/redis_info_storage.rs
index 5e412a0..01bde22 100644
--- a/src/info_storages/redis_info_storage.rs
+++ b/src/info_storages/redis_info_storage.rs
@@ -46,7 +46,7 @@ impl InfoStorage for RedisStorage {
         if res.is_none() {
             return Err(RustusError::FileNotFound);
         }
-        serde_json::from_str(res.unwrap().as_str()).map_err(RustusError::from)
+        FileInfo::from_json(res.unwrap()).await
     }
 
     async fn remove_info(&self, file_id: &str) -> RustusResult<()> {
diff --git a/src/main.rs b/src/main.rs
index 5b5330c..99c82d3 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,9 +3,9 @@
 use std::str::FromStr;
 use std::sync::Arc;
 
-use actix_web::http::Method;
 use actix_web::{
     dev::{Server, Service},
+    http::Method,
     middleware, web, App, HttpServer,
 };
 use fern::colors::{Color, ColoredLevelConfig};
diff --git a/src/notifiers/dir_notifier.rs b/src/notifiers/dir_notifier.rs
index 008d79c..73b0b9a 100644
--- a/src/notifiers/dir_notifier.rs
+++ b/src/notifiers/dir_notifier.rs
@@ -19,6 +19,7 @@ impl DirNotifier {
 
 #[async_trait]
 impl Notifier for DirNotifier {
+    #[cfg_attr(coverage, no_coverage)]
     async fn prepare(&mut self) -> RustusResult<()> {
         Ok(())
     }
diff --git a/src/protocol/core/write_bytes.rs b/src/protocol/core/write_bytes.rs
index 322b219..9adb0b4 100644
--- a/src/protocol/core/write_bytes.rs
+++ b/src/protocol/core/write_bytes.rs
@@ -31,7 +31,7 @@ pub async fn write_bytes(
     // Parses header `Upload-Length` only if the creation-defer-length extension is enabled.
     let updated_len = if state
         .config
-        .extensions_vec()
+        .tus_extensions
         .contains(&Extensions::CreationDeferLength)
     {
         parse_header(&request, "Upload-Length")
@@ -82,20 +82,19 @@ pub async fn write_bytes(
     if Some(file_info.offset) == file_info.length {
         return Err(RustusError::FrozenFile);
     }
-
+    let chunk_len = bytes.len();
     // Appending bytes to file.
-    state
-        .data_storage
-        .add_bytes(&file_info, bytes.as_ref())
-        .await?;
+    state.data_storage.add_bytes(&file_info, bytes).await?;
     // Updating offset.
-    file_info.offset += bytes.len();
+    file_info.offset += chunk_len;
     // Saving info to info storage.
     state.info_storage.set_info(&file_info, false).await?;
 
     let mut hook = Hook::PostReceive;
+    let mut keep_alive = true;
     if file_info.length == Some(file_info.offset) {
         hook = Hook::PostFinish;
+        keep_alive = false;
     }
     if state.config.hook_is_active(hook) {
         let message = state
@@ -104,16 +103,23 @@ pub async fn write_bytes(
             .hooks_format
             .format(&request, &file_info)?;
         let headers = request.headers().clone();
-        tokio::spawn(async move {
+        actix_web::rt::spawn(async move {
             state
                 .notification_manager
                 .send_message(message, hook, &headers)
                 .await
         });
     }
-    Ok(HttpResponse::NoContent()
-        .insert_header(("Upload-Offset", file_info.offset.to_string()))
-        .finish())
+    if keep_alive {
+        Ok(HttpResponse::NoContent()
+            .insert_header(("Upload-Offset", file_info.offset.to_string()))
+            .keep_alive()
+            .finish())
+    } else {
+        Ok(HttpResponse::NoContent()
+            .insert_header(("Upload-Offset", file_info.offset.to_string()))
+            .finish())
+    }
 }
 
 #[cfg(test)]
diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs
index 20df718..7a6828b 100644
--- a/src/protocol/creation/routes.rs
+++ b/src/protocol/creation/routes.rs
@@ -180,11 +180,11 @@ pub async fn create_file(
         let octet_stream = |val: &str| val == "application/offset+octet-stream";
         if check_header(&request, "Content-Type", octet_stream) {
             // Writing first bytes.
-            state
-                .data_storage
-                .add_bytes(&file_info, bytes.as_ref())
-                .await?;
-            file_info.offset += bytes.len();
+            let chunk_len = bytes.len();
+            // Appending bytes to file.
+            state.data_storage.add_bytes(&file_info, bytes).await?;
+            // Updating offset.
+            file_info.offset += chunk_len;
         }
     }
 
@@ -199,7 +199,7 @@ pub async fn create_file(
         let headers = request.headers().clone();
         // Adding send_message task to tokio reactor.
         // Thin function would be executed in background.
-        tokio::spawn(async move {
+        actix_web::rt::spawn(async move {
             state
                 .notification_manager
                 .send_message(message, Hook::PostCreate, &headers)
diff --git a/src/protocol/getting/routes.rs b/src/protocol/getting/routes.rs
index e3e7c0a..f16b8b5 100644
--- a/src/protocol/getting/routes.rs
+++ b/src/protocol/getting/routes.rs
@@ -21,12 +21,12 @@ pub async fn get_file(request: HttpRequest, state: web::Data<State>) -> RustusRe
 }
 
 #[cfg(test)]
-#[cfg_attr(coverage, no_coverage)]
 mod test {
     use crate::{rustus_service, State};
     use actix_web::http::StatusCode;
     use actix_web::test::{call_service, init_service, TestRequest};
     use actix_web::{web, App};
+    use bytes::Bytes;
 
     #[actix_rt::test]
     async fn success() {
@@ -38,7 +38,7 @@ mod test {
         let file_info = state.create_test_file().await;
         state
             .data_storage
-            .add_bytes(&file_info, "data".as_bytes())
+            .add_bytes(&file_info, Bytes::from("testing"))
             .await
             .unwrap();
         let request = TestRequest::get()
diff --git a/src/protocol/termination/routes.rs b/src/protocol/termination/routes.rs
index 82ae88e..33f11a9 100644
--- a/src/protocol/termination/routes.rs
+++ b/src/protocol/termination/routes.rs
@@ -27,7 +27,7 @@ pub async fn terminate(
                 .hooks_format
                 .format(&request, &file_info)?;
             let headers = request.headers().clone();
-            tokio::spawn(async move {
+            actix_web::rt::spawn(async move {
                 state
                     .notification_manager
                     .send_message(message, Hook::PostTerminate, &headers)
diff --git a/src/state.rs b/src/state.rs
index 07de073..45fa622 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -31,6 +31,7 @@ impl State {
             data_storage: Box::new(crate::storages::file_storage::FileStorage::new(
                 config.storage_opts.data_dir.clone(),
                 config.storage_opts.dir_structure.clone(),
+                config.storage_opts.force_fsync,
             )),
             info_storage: Box::new(
                 crate::info_storages::file_info_storage::FileInfoStorage::new(
diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs
index b2a850b..4f348d7 100644
--- a/src/storages/file_storage.rs
+++ b/src/storages/file_storage.rs
@@ -1,10 +1,12 @@
+use std::io::Write;
 use std::path::PathBuf;
 
 use actix_files::NamedFile;
 use async_trait::async_trait;
+use bytes::Bytes;
 use log::error;
-use tokio::fs::{remove_file, DirBuilder, OpenOptions};
-use tokio::io::copy;
+use std::fs::{remove_file, DirBuilder, OpenOptions};
+use std::io::{copy, BufReader, BufWriter};
 
 use crate::errors::{RustusError, RustusResult};
 use crate::info_storages::FileInfo;
@@ -17,17 +19,19 @@ use derive_more::Display;
 pub struct FileStorage {
     data_dir: PathBuf,
     dir_struct: String,
+    force_fsync: bool,
 }
 
 impl FileStorage {
-    pub fn new(data_dir: PathBuf, dir_struct: String) -> FileStorage {
+    pub fn new(data_dir: PathBuf, dir_struct: String, force_fsync: bool) -> FileStorage {
         FileStorage {
             data_dir,
             dir_struct,
+            force_fsync,
         }
     }
 
-    pub async fn data_file_path(&self, file_id: &str) -> RustusResult<PathBuf> {
+    pub fn data_file_path(&self, file_id: &str) -> RustusResult<PathBuf> {
         let dir = self
             .data_dir
             // We're working wit absolute paths, because tus.io says so.
@@ -40,7 +44,6 @@ impl FileStorage {
         DirBuilder::new()
             .recursive(true)
             .create(dir.as_path())
-            .await
             .map_err(|err| {
                 error!("{}", err);
                 RustusError::UnableToWrite(err.to_string())
@@ -58,7 +61,6 @@ impl Storage for FileStorage {
             DirBuilder::new()
                 .recursive(true)
                 .create(self.data_dir.as_path())
-                .await
                 .map_err(|err| RustusError::UnableToPrepareStorage(err.to_string()))?;
         }
         Ok(())
@@ -76,48 +78,61 @@ impl Storage for FileStorage {
             })
     }
 
-    async fn add_bytes(&self, info: &FileInfo, bytes: &[u8]) -> RustusResult<()> {
+    async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()> {
         // In normal situation this `if` statement is not
         // gonna be called, but what if it is ...
-        if info.path.is_none() {
+        if file_info.path.is_none() {
             return Err(RustusError::FileNotFound);
         }
-        // Opening file in w+a mode.
-        // It means that we're going to append some
-        // bytes to the end of a file.
-        let mut file = OpenOptions::new()
-            .write(true)
-            .append(true)
-            .create(false)
-            .open(info.path.as_ref().unwrap())
-            .await
-            .map_err(|err| {
-                error!("{:?}", err);
-                RustusError::UnableToWrite(err.to_string())
-            })?;
-        #[allow(clippy::clone_double_ref)]
-        let mut buffer = bytes.clone();
-        copy(&mut buffer, &mut file).await?;
-        tokio::task::spawn(async move { file.sync_data().await });
-        Ok(())
+        let path = String::from(file_info.path.as_ref().unwrap());
+        let force_sync = self.force_fsync;
+        actix_web::rt::task::spawn_blocking(move || {
+            // Opening file in w+a mode.
+            // It means that we're going to append some
+            // bytes to the end of a file.
+            let file = OpenOptions::new()
+                .write(true)
+                .append(true)
+                .create(false)
+                .read(false)
+                .truncate(false)
+                .open(path.as_str())
+                .map_err(|err| {
+                    error!("{:?}", err);
+                    RustusError::UnableToWrite(err.to_string())
+                })?;
+            {
+                let mut writer = BufWriter::new(file);
+                writer.write_all(bytes.as_ref())?;
+                writer.flush()?;
+                if force_sync {
+                    writer.get_ref().sync_data()?;
+                }
+            }
+            Ok(())
+        })
+        .await?
     }
 
     async fn create_file(&self, file_info: &FileInfo) -> RustusResult<String> {
+        let info = file_info.clone();
         // New path to file.
-        let file_path = self.data_file_path(file_info.id.as_str()).await?;
-        // Creating new file.
-        OpenOptions::new()
-            .create(true)
-            .write(true)
-            .truncate(true)
-            .create_new(true)
-            .open(file_path.as_path())
-            .await
-            .map_err(|err| {
-                error!("{:?}", err);
-                RustusError::FileAlreadyExists(file_info.id.clone())
-            })?;
-        Ok(file_path.display().to_string())
+        let file_path = self.data_file_path(info.id.as_str())?;
+        actix_web::rt::task::spawn_blocking(move || {
+            // Creating new file.
+            OpenOptions::new()
+                .create(true)
+                .write(true)
+                .truncate(true)
+                .create_new(true)
+                .open(file_path.as_path())
+                .map_err(|err| {
+                    error!("{:?}", err);
+                    RustusError::FileAlreadyExists(info.id.clone())
+                })?;
+            Ok(file_path.display().to_string())
+        })
+        .await?
     }
 
     async fn concat_files(
@@ -125,41 +140,48 @@ impl Storage for FileStorage {
         file_info: &FileInfo,
         parts_info: Vec<FileInfo>,
     ) -> RustusResult<()> {
-        let mut file = OpenOptions::new()
-            .write(true)
-            .append(true)
-            .create(true)
-            .open(file_info.path.as_ref().unwrap().clone())
-            .await
-            .map_err(|err| {
-                error!("{:?}", err);
-                RustusError::UnableToWrite(err.to_string())
-            })?;
-        for part in parts_info {
-            if part.path.is_none() {
-                return Err(RustusError::FileNotFound);
+        let info = file_info.clone();
+        actix_web::rt::task::spawn_blocking(move || {
+            let mut file = OpenOptions::new()
+                .write(true)
+                .append(true)
+                .create(true)
+                .open(info.path.as_ref().unwrap().clone())
+                .map_err(|err| {
+                    error!("{:?}", err);
+                    RustusError::UnableToWrite(err.to_string())
+                })?;
+            for part in parts_info {
+                if part.path.is_none() {
+                    return Err(RustusError::FileNotFound);
+                }
+                let part_file = OpenOptions::new()
+                    .read(true)
+                    .open(part.path.as_ref().unwrap())?;
+                let mut reader = BufReader::new(part_file);
+                copy(&mut reader, &mut file)?;
             }
-            let mut part_file = OpenOptions::new()
-                .read(true)
-                .open(part.path.as_ref().unwrap())
-                .await?;
-            copy(&mut part_file, &mut file).await?;
-        }
-        file.sync_data().await?;
-        Ok(())
+            file.sync_data()?;
+            Ok(())
+        })
+        .await?
     }
 
     async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> {
-        // Let's remove the file itself.
-        let data_path = PathBuf::from(file_info.path.as_ref().unwrap().clone());
-        if !data_path.exists() {
-            return Err(RustusError::FileNotFound);
-        }
-        remove_file(data_path).await.map_err(|err| {
-            error!("{:?}", err);
-            RustusError::UnableToRemove(file_info.id.clone())
-        })?;
-        Ok(())
+        let info = file_info.clone();
+        actix_web::rt::task::spawn_blocking(move || {
+            // Let's remove the file itself.
+            let data_path = PathBuf::from(info.path.as_ref().unwrap().clone());
+            if !data_path.exists() {
+                return Err(RustusError::FileNotFound);
+            }
+            remove_file(data_path).map_err(|err| {
+                error!("{:?}", err);
+                RustusError::UnableToRemove(info.id.clone())
+            })?;
+            Ok(())
+        })
+        .await?
     }
 }
 
@@ -168,6 +190,7 @@ mod tests {
     use super::FileStorage;
     use crate::info_storages::FileInfo;
     use crate::Storage;
+    use bytes::Bytes;
     use std::fs::File;
     use std::io::{Read, Write};
     use std::path::PathBuf;
@@ -176,7 +199,7 @@ mod tests {
     async fn preparation() {
         let dir = tempdir::TempDir::new("file_storage").unwrap();
         let target_path = dir.into_path().join("not_exist");
-        let mut storage = FileStorage::new(target_path.clone(), "".into());
+        let mut storage = FileStorage::new(target_path.clone(), "".into(), false);
         assert_eq!(target_path.exists(), false);
         storage.prepare().await.unwrap();
         assert_eq!(target_path.exists(), true);
@@ -185,7 +208,7 @@ mod tests {
     #[actix_rt::test]
     async fn create_file() {
         let dir = tempdir::TempDir::new("file_storage").unwrap();
-        let storage = FileStorage::new(dir.into_path().clone(), "".into());
+        let storage = FileStorage::new(dir.into_path().clone(), "".into(), false);
         let file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None);
         let new_path = storage.create_file(&file_info).await.unwrap();
         assert!(PathBuf::from(new_path).exists());
@@ -195,7 +218,7 @@ mod tests {
     async fn create_file_but_it_exists() {
         let dir = tempdir::TempDir::new("file_storage").unwrap();
         let base_path = dir.into_path().clone();
-        let storage = FileStorage::new(base_path.clone(), "".into());
+        let storage = FileStorage::new(base_path.clone(), "".into(), false);
         let file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None);
         File::create(base_path.join("test_id")).unwrap();
         let result = storage.create_file(&file_info).await;
@@ -205,13 +228,13 @@ mod tests {
     #[actix_rt::test]
     async fn adding_bytes() {
         let dir = tempdir::TempDir::new("file_storage").unwrap();
-        let storage = FileStorage::new(dir.into_path().clone(), "".into());
+        let storage = FileStorage::new(dir.into_path().clone(), "".into(), false);
         let mut file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None);
         let new_path = storage.create_file(&file_info).await.unwrap();
         let test_data = "MyTestData";
         file_info.path = Some(new_path.clone());
         storage
-            .add_bytes(&file_info, test_data.as_bytes())
+            .add_bytes(&file_info, Bytes::from(test_data))
             .await
             .unwrap();
         let mut file = File::open(new_path).unwrap();
@@ -223,7 +246,7 @@ mod tests {
     #[actix_rt::test]
     async fn adding_bytes_to_unknown_file() {
         let dir = tempdir::TempDir::new("file_storage").unwrap();
-        let storage = FileStorage::new(dir.into_path().clone(), "".into());
+        let storage = FileStorage::new(dir.into_path().clone(), "".into(), false);
         let file_info = FileInfo::new(
             "test_id",
             Some(5),
@@ -232,14 +255,14 @@ mod tests {
             None,
         );
         let test_data = "MyTestData";
-        let result = storage.add_bytes(&file_info, test_data.as_bytes()).await;
+        let result = storage.add_bytes(&file_info, Bytes::from(test_data)).await;
         assert!(result.is_err())
     }
 
     #[actix_rt::test]
     async fn get_contents_of_unknown_file() {
         let dir = tempdir::TempDir::new("file_storage").unwrap();
-        let storage = FileStorage::new(dir.into_path().clone(), "".into());
+        let storage = FileStorage::new(dir.into_path().clone(), "".into(), false);
         let file_info = FileInfo::new(
             "test_id",
             Some(5),
@@ -254,7 +277,7 @@ mod tests {
     #[actix_rt::test]
     async fn remove_unknown_file() {
         let dir = tempdir::TempDir::new("file_storage").unwrap();
-        let storage = FileStorage::new(dir.into_path().clone(), "".into());
+        let storage = FileStorage::new(dir.into_path().clone(), "".into(), false);
         let file_info = FileInfo::new(
             "test_id",
             Some(5),
@@ -269,7 +292,7 @@ mod tests {
     #[actix_rt::test]
     async fn success_concatenation() {
         let dir = tempdir::TempDir::new("file_storage").unwrap();
-        let storage = FileStorage::new(dir.into_path().clone(), "".into());
+        let storage = FileStorage::new(dir.into_path().clone(), "".into(), false);
 
         let mut parts = Vec::new();
         let part1_path = storage.data_dir.as_path().join("part1");
diff --git a/src/storages/models/available_stores.rs b/src/storages/models/available_stores.rs
index 7bde098..25ff19e 100644
--- a/src/storages/models/available_stores.rs
+++ b/src/storages/models/available_stores.rs
@@ -26,6 +26,7 @@ impl AvailableStores {
             Self::FileStorage => Box::new(file_storage::FileStorage::new(
                 config.storage_opts.data_dir.clone(),
                 config.storage_opts.dir_structure.clone(),
+                config.storage_opts.force_fsync,
             )),
         }
     }
diff --git a/src/storages/models/storage.rs b/src/storages/models/storage.rs
index 0d790b9..c14820b 100644
--- a/src/storages/models/storage.rs
+++ b/src/storages/models/storage.rs
@@ -2,6 +2,7 @@ use crate::errors::RustusResult;
 use crate::info_storages::FileInfo;
 use actix_files::NamedFile;
 use async_trait::async_trait;
+use bytes::Bytes;
 use std::fmt::Display;
 
 #[async_trait]
@@ -41,7 +42,7 @@ pub trait Storage: Display {
     /// # Params
     /// `file_info` - info about current file.
     /// `bytes` - bytes to append to the file.
-    async fn add_bytes(&self, file_info: &FileInfo, bytes: &[u8]) -> RustusResult<()>;
+    async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()>;
 
     /// Create file in storage.
     ///
diff --git a/src/utils/headers.rs b/src/utils/headers.rs
index 493b09a..1bec0b8 100644
--- a/src/utils/headers.rs
+++ b/src/utils/headers.rs
@@ -17,7 +17,7 @@ pub fn parse_header<T: FromStr>(request: &HttpRequest, header_name: &str) -> Opt
         .and_then(|value|
             // Parsing it to string.
             match value.to_str() {
-                Ok(header_str) => Some(String::from(header_str)),
+                Ok(header_str) => Some(header_str),
                 Err(_) => None,
             })
         .and_then(|val|
-- 
GitLab