diff --git a/Cargo.lock b/Cargo.lock index cc866e4bcf922c571c6a2d40fb4caa86ac9ba9b7..decd5b46166d001dbe5212500deca539f0ca3bcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2398,6 +2398,7 @@ dependencies = [ "actix-web", "async-trait", "base64", + "bytes", "chrono", "derive_more", "fern", diff --git a/Cargo.toml b/Cargo.toml index b4c25074b336ca7af4bb05cf48468ada2b395890..4af3e82fa119f8c2137e4395399b92dca8b11ab5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ serde_json = "1" strfmt = "^0.1.6" thiserror = "^1.0" url = "2.2.2" +bytes = "1.1.0" [dependencies.futures] version = "0.3.21" @@ -82,7 +83,7 @@ features = ["derive"] version = "0.23" [dependencies.tokio] -features = ["time", "process", "fs"] +features = ["time", "process", "fs", "io-std", "io-util", "rt-multi-thread", "bytes"] version = "1.4.0" [dependencies.tokio-amqp] @@ -114,7 +115,7 @@ httptest = "0.15.4" [profile] [profile.release] -lto = true +lto = "fat" panic = "abort" opt-level = 3 codegen-units = 1 diff --git a/src/config.rs b/src/config.rs index 07cd41e10ae73ad6f499cc1e9c0acf66c06f5007..f1d862c54c1bc4d43f29a351bcf4b4c0b3fba492 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,6 +27,9 @@ pub struct StorageOptions { #[structopt(long, env = "RUSTUS_DIR_STRUCTURE", default_value = "")] pub dir_structure: String, + + #[structopt(long, parse(from_flag))] + pub force_fsync: bool, } #[derive(StructOpt, Debug, Clone)] diff --git a/src/errors.rs b/src/errors.rs index 942fe39fdef95c98a0623950f8ed02a20dc13258..99eaec55d95d85aa6e020cca891801d9ed9f4bd5 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -58,6 +58,8 @@ pub enum RustusError { AMQPPoolError(#[from] mobc_lapin::mobc::Error<lapin::Error>), #[error("Std error: {0}")] StdError(#[from] std::io::Error), + #[error("Can't spawn task: {0}")] + TokioSpawnError(#[from] tokio::task::JoinError), } /// This conversion allows us to use `RustusError` in the `main` function. diff --git a/src/info_storages/db_info_storage.rs b/src/info_storages/db_info_storage.rs index 164f2bcf82a9bea0a9d2ed645cdd8429c7a8316e..baa00921259278469143b02435bdfb385d685313 100644 --- a/src/info_storages/db_info_storage.rs +++ b/src/info_storages/db_info_storage.rs @@ -66,7 +66,7 @@ impl InfoStorage for DBInfoStorage { async fn get_info(&self, file_id: &str) -> RustusResult<FileInfo> { let model: Option<DbModel> = self.db.fetch_by_column("id", file_id).await?; if let Some(info) = model { - serde_json::from_str(info.info.as_str()).map_err(RustusError::from) + FileInfo::from_json(info.info.to_string()).await } else { Err(RustusError::FileNotFound) } diff --git a/src/info_storages/file_info_storage.rs b/src/info_storages/file_info_storage.rs index 51f55674cd42aae557f4fefeaaee56ebfbd36171..309b47a02b66b12227d401f42aadb978c664e785 100644 --- a/src/info_storages/file_info_storage.rs +++ b/src/info_storages/file_info_storage.rs @@ -1,9 +1,11 @@ +use std::io::{Read, Write}; use std::path::PathBuf; use async_trait::async_trait; use log::error; -use tokio::fs::{read_to_string, remove_file, DirBuilder, OpenOptions}; -use tokio::io::copy; +use std::fs::{remove_file, File, OpenOptions}; +use std::io::{BufReader, BufWriter}; +use tokio::fs::DirBuilder; use crate::errors::{RustusError, RustusResult}; use crate::info_storages::{FileInfo, InfoStorage}; @@ -35,42 +37,57 @@ impl InfoStorage for FileInfoStorage { } async fn set_info(&self, file_info: &FileInfo, create: bool) -> RustusResult<()> { - let mut file = OpenOptions::new() - .write(true) - .create(create) - .truncate(true) - .open(self.info_file_path(file_info.id.as_str()).as_path()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::UnableToWrite(err.to_string()) - })?; - copy(&mut file_info.json().await?.as_bytes(), &mut file).await?; - tokio::task::spawn(async move { file.sync_data().await }); - Ok(()) + let info = file_info.clone(); + let path = self.info_file_path(info.id.as_str()); + actix_web::rt::task::spawn_blocking(move || { + let file = OpenOptions::new() + .write(true) + .create(create) + .truncate(true) + .open(path) + .map_err(|err| { + error!("{:?}", err); + RustusError::UnableToWrite(err.to_string()) + })?; + let data = serde_json::to_string(&info).map_err(RustusError::from)?; + { + let mut writer = BufWriter::new(file); + writer.write_all(data.as_bytes())?; + writer.flush()?; + } + Ok(()) + }) + .await? } async fn get_info(&self, file_id: &str) -> RustusResult<FileInfo> { let info_path = self.info_file_path(file_id); - if !info_path.exists() { - return Err(RustusError::FileNotFound); - } - let contents = read_to_string(info_path).await.map_err(|err| { - error!("{:?}", err); - RustusError::UnableToReadInfo - })?; - serde_json::from_str::<FileInfo>(contents.as_str()).map_err(RustusError::from) + actix_web::rt::task::spawn_blocking(move || { + if !info_path.exists() { + return Err(RustusError::FileNotFound); + } + let info = File::open(info_path)?; + let mut contents = String::new(); + let mut reader = BufReader::new(info); + reader.read_to_string(&mut contents)?; + serde_json::from_str::<FileInfo>(contents.as_str()).map_err(RustusError::from) + }) + .await? } async fn remove_info(&self, file_id: &str) -> RustusResult<()> { - let info_path = self.info_file_path(file_id); - if !info_path.exists() { - return Err(RustusError::FileNotFound); - } - remove_file(info_path).await.map_err(|err| { - error!("{:?}", err); - RustusError::UnableToRemove(String::from(file_id)) + let id = String::from(file_id); + let info_path = self.info_file_path(id.as_str()); + actix_web::rt::task::spawn_blocking(move || { + if !info_path.exists() { + return Err(RustusError::FileNotFound); + } + remove_file(info_path).map_err(|err| { + error!("{:?}", err); + RustusError::UnableToRemove(id) + }) }) + .await? } } diff --git a/src/info_storages/models/file_info.rs b/src/info_storages/models/file_info.rs index 52027062fdbd23d5167614793cae8c5b99b21dc5..806ef3ef254b3b0f018c0a854ec2942e20311be6 100644 --- a/src/info_storages/models/file_info.rs +++ b/src/info_storages/models/file_info.rs @@ -93,13 +93,25 @@ impl FileInfo { pub async fn json(&self) -> RustusResult<String> { let info_clone = self.clone(); - let data = tokio::task::spawn_blocking(move || serde_json::to_string(&info_clone)) - .await - .map_err(|err| { - error!("{}", err); - RustusError::UnableToWrite("Can't serialize info".into()) - })??; - Ok(data) + actix_web::rt::task::spawn_blocking(move || { + serde_json::to_string(&info_clone).map_err(RustusError::from) + }) + .await + .map_err(|err| { + error!("{}", err); + RustusError::UnableToWrite("Can't serialize info".into()) + })? + } + + pub async fn from_json(data: String) -> RustusResult<Self> { + actix_web::rt::task::spawn_blocking(move || { + serde_json::from_str::<Self>(data.as_str()).map_err(RustusError::from) + }) + .await + .map_err(|err| { + error!("{}", err); + RustusError::UnableToWrite("Can't serialize info".into()) + })? } #[cfg(test)] diff --git a/src/info_storages/redis_info_storage.rs b/src/info_storages/redis_info_storage.rs index 5e412a0177e07d8da08a346137ae4a20aa0b4f4b..01bde229674858f30f3cf02858b559f8363a2115 100644 --- a/src/info_storages/redis_info_storage.rs +++ b/src/info_storages/redis_info_storage.rs @@ -46,7 +46,7 @@ impl InfoStorage for RedisStorage { if res.is_none() { return Err(RustusError::FileNotFound); } - serde_json::from_str(res.unwrap().as_str()).map_err(RustusError::from) + FileInfo::from_json(res.unwrap()).await } async fn remove_info(&self, file_id: &str) -> RustusResult<()> { diff --git a/src/main.rs b/src/main.rs index 5b5330c6d3dc628e60910afd449a07c49cb6b6cc..99c82d3ad4bdcaddff24d56ac63fcd9633a80240 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,9 +3,9 @@ use std::str::FromStr; use std::sync::Arc; -use actix_web::http::Method; use actix_web::{ dev::{Server, Service}, + http::Method, middleware, web, App, HttpServer, }; use fern::colors::{Color, ColoredLevelConfig}; diff --git a/src/notifiers/dir_notifier.rs b/src/notifiers/dir_notifier.rs index 008d79ca07ca42244619e1aef9d4eddc65a5e32b..73b0b9ace6a1bdb7635d22b60d173cd7f75a30c2 100644 --- a/src/notifiers/dir_notifier.rs +++ b/src/notifiers/dir_notifier.rs @@ -19,6 +19,7 @@ impl DirNotifier { #[async_trait] impl Notifier for DirNotifier { + #[cfg_attr(coverage, no_coverage)] async fn prepare(&mut self) -> RustusResult<()> { Ok(()) } diff --git a/src/protocol/core/write_bytes.rs b/src/protocol/core/write_bytes.rs index 322b2191a6864e7e0ccf27fc2b6628c5fad2540a..9adb0b4a2f017e556cf22340aece8b3196a35db9 100644 --- a/src/protocol/core/write_bytes.rs +++ b/src/protocol/core/write_bytes.rs @@ -31,7 +31,7 @@ pub async fn write_bytes( // Parses header `Upload-Length` only if the creation-defer-length extension is enabled. let updated_len = if state .config - .extensions_vec() + .tus_extensions .contains(&Extensions::CreationDeferLength) { parse_header(&request, "Upload-Length") @@ -82,20 +82,19 @@ pub async fn write_bytes( if Some(file_info.offset) == file_info.length { return Err(RustusError::FrozenFile); } - + let chunk_len = bytes.len(); // Appending bytes to file. - state - .data_storage - .add_bytes(&file_info, bytes.as_ref()) - .await?; + state.data_storage.add_bytes(&file_info, bytes).await?; // Updating offset. - file_info.offset += bytes.len(); + file_info.offset += chunk_len; // Saving info to info storage. state.info_storage.set_info(&file_info, false).await?; let mut hook = Hook::PostReceive; + let mut keep_alive = true; if file_info.length == Some(file_info.offset) { hook = Hook::PostFinish; + keep_alive = false; } if state.config.hook_is_active(hook) { let message = state @@ -104,16 +103,23 @@ pub async fn write_bytes( .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); - tokio::spawn(async move { + actix_web::rt::spawn(async move { state .notification_manager .send_message(message, hook, &headers) .await }); } - Ok(HttpResponse::NoContent() - .insert_header(("Upload-Offset", file_info.offset.to_string())) - .finish()) + if keep_alive { + Ok(HttpResponse::NoContent() + .insert_header(("Upload-Offset", file_info.offset.to_string())) + .keep_alive() + .finish()) + } else { + Ok(HttpResponse::NoContent() + .insert_header(("Upload-Offset", file_info.offset.to_string())) + .finish()) + } } #[cfg(test)] diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs index 20df718620cc9b1a40aa2ddba47c3896cd462dff..7a6828b0777df72597ee7280a7075d76a4aa0a8f 100644 --- a/src/protocol/creation/routes.rs +++ b/src/protocol/creation/routes.rs @@ -180,11 +180,11 @@ pub async fn create_file( let octet_stream = |val: &str| val == "application/offset+octet-stream"; if check_header(&request, "Content-Type", octet_stream) { // Writing first bytes. - state - .data_storage - .add_bytes(&file_info, bytes.as_ref()) - .await?; - file_info.offset += bytes.len(); + let chunk_len = bytes.len(); + // Appending bytes to file. + state.data_storage.add_bytes(&file_info, bytes).await?; + // Updating offset. + file_info.offset += chunk_len; } } @@ -199,7 +199,7 @@ pub async fn create_file( let headers = request.headers().clone(); // Adding send_message task to tokio reactor. // Thin function would be executed in background. - tokio::spawn(async move { + actix_web::rt::spawn(async move { state .notification_manager .send_message(message, Hook::PostCreate, &headers) diff --git a/src/protocol/getting/routes.rs b/src/protocol/getting/routes.rs index e3e7c0a59973d727c828186fe153830593f59491..f16b8b5c95dcf207d33560ad5c031c74ed43c440 100644 --- a/src/protocol/getting/routes.rs +++ b/src/protocol/getting/routes.rs @@ -21,12 +21,12 @@ pub async fn get_file(request: HttpRequest, state: web::Data<State>) -> RustusRe } #[cfg(test)] -#[cfg_attr(coverage, no_coverage)] mod test { use crate::{rustus_service, State}; use actix_web::http::StatusCode; use actix_web::test::{call_service, init_service, TestRequest}; use actix_web::{web, App}; + use bytes::Bytes; #[actix_rt::test] async fn success() { @@ -38,7 +38,7 @@ mod test { let file_info = state.create_test_file().await; state .data_storage - .add_bytes(&file_info, "data".as_bytes()) + .add_bytes(&file_info, Bytes::from("testing")) .await .unwrap(); let request = TestRequest::get() diff --git a/src/protocol/termination/routes.rs b/src/protocol/termination/routes.rs index 82ae88e6c3ca51aede0518ed8e31c91f7f71b2fc..33f11a92887fabed9c9d38aa480f70698d1357e9 100644 --- a/src/protocol/termination/routes.rs +++ b/src/protocol/termination/routes.rs @@ -27,7 +27,7 @@ pub async fn terminate( .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); - tokio::spawn(async move { + actix_web::rt::spawn(async move { state .notification_manager .send_message(message, Hook::PostTerminate, &headers) diff --git a/src/state.rs b/src/state.rs index 07de07387dbf81c457e8f9ecb202ae7a45981c3d..45fa622adf357d1195cfcb6e8fc844c9621ba96a 100644 --- a/src/state.rs +++ b/src/state.rs @@ -31,6 +31,7 @@ impl State { data_storage: Box::new(crate::storages::file_storage::FileStorage::new( config.storage_opts.data_dir.clone(), config.storage_opts.dir_structure.clone(), + config.storage_opts.force_fsync, )), info_storage: Box::new( crate::info_storages::file_info_storage::FileInfoStorage::new( diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs index b2a850b5e86e5b272f89ffda69d596dcb40cc843..4f348d71f725cf59d8cc65dc2b69fcbb2e2591f8 100644 --- a/src/storages/file_storage.rs +++ b/src/storages/file_storage.rs @@ -1,10 +1,12 @@ +use std::io::Write; use std::path::PathBuf; use actix_files::NamedFile; use async_trait::async_trait; +use bytes::Bytes; use log::error; -use tokio::fs::{remove_file, DirBuilder, OpenOptions}; -use tokio::io::copy; +use std::fs::{remove_file, DirBuilder, OpenOptions}; +use std::io::{copy, BufReader, BufWriter}; use crate::errors::{RustusError, RustusResult}; use crate::info_storages::FileInfo; @@ -17,17 +19,19 @@ use derive_more::Display; pub struct FileStorage { data_dir: PathBuf, dir_struct: String, + force_fsync: bool, } impl FileStorage { - pub fn new(data_dir: PathBuf, dir_struct: String) -> FileStorage { + pub fn new(data_dir: PathBuf, dir_struct: String, force_fsync: bool) -> FileStorage { FileStorage { data_dir, dir_struct, + force_fsync, } } - pub async fn data_file_path(&self, file_id: &str) -> RustusResult<PathBuf> { + pub fn data_file_path(&self, file_id: &str) -> RustusResult<PathBuf> { let dir = self .data_dir // We're working wit absolute paths, because tus.io says so. @@ -40,7 +44,6 @@ impl FileStorage { DirBuilder::new() .recursive(true) .create(dir.as_path()) - .await .map_err(|err| { error!("{}", err); RustusError::UnableToWrite(err.to_string()) @@ -58,7 +61,6 @@ impl Storage for FileStorage { DirBuilder::new() .recursive(true) .create(self.data_dir.as_path()) - .await .map_err(|err| RustusError::UnableToPrepareStorage(err.to_string()))?; } Ok(()) @@ -76,48 +78,61 @@ impl Storage for FileStorage { }) } - async fn add_bytes(&self, info: &FileInfo, bytes: &[u8]) -> RustusResult<()> { + async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()> { // In normal situation this `if` statement is not // gonna be called, but what if it is ... - if info.path.is_none() { + if file_info.path.is_none() { return Err(RustusError::FileNotFound); } - // Opening file in w+a mode. - // It means that we're going to append some - // bytes to the end of a file. - let mut file = OpenOptions::new() - .write(true) - .append(true) - .create(false) - .open(info.path.as_ref().unwrap()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::UnableToWrite(err.to_string()) - })?; - #[allow(clippy::clone_double_ref)] - let mut buffer = bytes.clone(); - copy(&mut buffer, &mut file).await?; - tokio::task::spawn(async move { file.sync_data().await }); - Ok(()) + let path = String::from(file_info.path.as_ref().unwrap()); + let force_sync = self.force_fsync; + actix_web::rt::task::spawn_blocking(move || { + // Opening file in w+a mode. + // It means that we're going to append some + // bytes to the end of a file. + let file = OpenOptions::new() + .write(true) + .append(true) + .create(false) + .read(false) + .truncate(false) + .open(path.as_str()) + .map_err(|err| { + error!("{:?}", err); + RustusError::UnableToWrite(err.to_string()) + })?; + { + let mut writer = BufWriter::new(file); + writer.write_all(bytes.as_ref())?; + writer.flush()?; + if force_sync { + writer.get_ref().sync_data()?; + } + } + Ok(()) + }) + .await? } async fn create_file(&self, file_info: &FileInfo) -> RustusResult<String> { + let info = file_info.clone(); // New path to file. - let file_path = self.data_file_path(file_info.id.as_str()).await?; - // Creating new file. - OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .create_new(true) - .open(file_path.as_path()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::FileAlreadyExists(file_info.id.clone()) - })?; - Ok(file_path.display().to_string()) + let file_path = self.data_file_path(info.id.as_str())?; + actix_web::rt::task::spawn_blocking(move || { + // Creating new file. + OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .create_new(true) + .open(file_path.as_path()) + .map_err(|err| { + error!("{:?}", err); + RustusError::FileAlreadyExists(info.id.clone()) + })?; + Ok(file_path.display().to_string()) + }) + .await? } async fn concat_files( @@ -125,41 +140,48 @@ impl Storage for FileStorage { file_info: &FileInfo, parts_info: Vec<FileInfo>, ) -> RustusResult<()> { - let mut file = OpenOptions::new() - .write(true) - .append(true) - .create(true) - .open(file_info.path.as_ref().unwrap().clone()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::UnableToWrite(err.to_string()) - })?; - for part in parts_info { - if part.path.is_none() { - return Err(RustusError::FileNotFound); + let info = file_info.clone(); + actix_web::rt::task::spawn_blocking(move || { + let mut file = OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(info.path.as_ref().unwrap().clone()) + .map_err(|err| { + error!("{:?}", err); + RustusError::UnableToWrite(err.to_string()) + })?; + for part in parts_info { + if part.path.is_none() { + return Err(RustusError::FileNotFound); + } + let part_file = OpenOptions::new() + .read(true) + .open(part.path.as_ref().unwrap())?; + let mut reader = BufReader::new(part_file); + copy(&mut reader, &mut file)?; } - let mut part_file = OpenOptions::new() - .read(true) - .open(part.path.as_ref().unwrap()) - .await?; - copy(&mut part_file, &mut file).await?; - } - file.sync_data().await?; - Ok(()) + file.sync_data()?; + Ok(()) + }) + .await? } async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> { - // Let's remove the file itself. - let data_path = PathBuf::from(file_info.path.as_ref().unwrap().clone()); - if !data_path.exists() { - return Err(RustusError::FileNotFound); - } - remove_file(data_path).await.map_err(|err| { - error!("{:?}", err); - RustusError::UnableToRemove(file_info.id.clone()) - })?; - Ok(()) + let info = file_info.clone(); + actix_web::rt::task::spawn_blocking(move || { + // Let's remove the file itself. + let data_path = PathBuf::from(info.path.as_ref().unwrap().clone()); + if !data_path.exists() { + return Err(RustusError::FileNotFound); + } + remove_file(data_path).map_err(|err| { + error!("{:?}", err); + RustusError::UnableToRemove(info.id.clone()) + })?; + Ok(()) + }) + .await? } } @@ -168,6 +190,7 @@ mod tests { use super::FileStorage; use crate::info_storages::FileInfo; use crate::Storage; + use bytes::Bytes; use std::fs::File; use std::io::{Read, Write}; use std::path::PathBuf; @@ -176,7 +199,7 @@ mod tests { async fn preparation() { let dir = tempdir::TempDir::new("file_storage").unwrap(); let target_path = dir.into_path().join("not_exist"); - let mut storage = FileStorage::new(target_path.clone(), "".into()); + let mut storage = FileStorage::new(target_path.clone(), "".into(), false); assert_eq!(target_path.exists(), false); storage.prepare().await.unwrap(); assert_eq!(target_path.exists(), true); @@ -185,7 +208,7 @@ mod tests { #[actix_rt::test] async fn create_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); let new_path = storage.create_file(&file_info).await.unwrap(); assert!(PathBuf::from(new_path).exists()); @@ -195,7 +218,7 @@ mod tests { async fn create_file_but_it_exists() { let dir = tempdir::TempDir::new("file_storage").unwrap(); let base_path = dir.into_path().clone(); - let storage = FileStorage::new(base_path.clone(), "".into()); + let storage = FileStorage::new(base_path.clone(), "".into(), false); let file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); File::create(base_path.join("test_id")).unwrap(); let result = storage.create_file(&file_info).await; @@ -205,13 +228,13 @@ mod tests { #[actix_rt::test] async fn adding_bytes() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let mut file_info = FileInfo::new("test_id", Some(5), None, storage.to_string(), None); let new_path = storage.create_file(&file_info).await.unwrap(); let test_data = "MyTestData"; file_info.path = Some(new_path.clone()); storage - .add_bytes(&file_info, test_data.as_bytes()) + .add_bytes(&file_info, Bytes::from(test_data)) .await .unwrap(); let mut file = File::open(new_path).unwrap(); @@ -223,7 +246,7 @@ mod tests { #[actix_rt::test] async fn adding_bytes_to_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -232,14 +255,14 @@ mod tests { None, ); let test_data = "MyTestData"; - let result = storage.add_bytes(&file_info, test_data.as_bytes()).await; + let result = storage.add_bytes(&file_info, Bytes::from(test_data)).await; assert!(result.is_err()) } #[actix_rt::test] async fn get_contents_of_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -254,7 +277,7 @@ mod tests { #[actix_rt::test] async fn remove_unknown_file() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let file_info = FileInfo::new( "test_id", Some(5), @@ -269,7 +292,7 @@ mod tests { #[actix_rt::test] async fn success_concatenation() { let dir = tempdir::TempDir::new("file_storage").unwrap(); - let storage = FileStorage::new(dir.into_path().clone(), "".into()); + let storage = FileStorage::new(dir.into_path().clone(), "".into(), false); let mut parts = Vec::new(); let part1_path = storage.data_dir.as_path().join("part1"); diff --git a/src/storages/models/available_stores.rs b/src/storages/models/available_stores.rs index 7bde0987c00c335302068b56c9194631606b82ca..25ff19e1065e22c6e4d597cdd34478cf47934db0 100644 --- a/src/storages/models/available_stores.rs +++ b/src/storages/models/available_stores.rs @@ -26,6 +26,7 @@ impl AvailableStores { Self::FileStorage => Box::new(file_storage::FileStorage::new( config.storage_opts.data_dir.clone(), config.storage_opts.dir_structure.clone(), + config.storage_opts.force_fsync, )), } } diff --git a/src/storages/models/storage.rs b/src/storages/models/storage.rs index 0d790b96266809c70f39dc50da75c91317be82fe..c14820ba6a0577f681d475fa0a11bd07bda32e59 100644 --- a/src/storages/models/storage.rs +++ b/src/storages/models/storage.rs @@ -2,6 +2,7 @@ use crate::errors::RustusResult; use crate::info_storages::FileInfo; use actix_files::NamedFile; use async_trait::async_trait; +use bytes::Bytes; use std::fmt::Display; #[async_trait] @@ -41,7 +42,7 @@ pub trait Storage: Display { /// # Params /// `file_info` - info about current file. /// `bytes` - bytes to append to the file. - async fn add_bytes(&self, file_info: &FileInfo, bytes: &[u8]) -> RustusResult<()>; + async fn add_bytes(&self, file_info: &FileInfo, bytes: Bytes) -> RustusResult<()>; /// Create file in storage. /// diff --git a/src/utils/headers.rs b/src/utils/headers.rs index 493b09ab79571000e5661ef8ad9db44346c3c4ba..1bec0b8b633fa6d86070bbcd2acdeb6d9e9c1482 100644 --- a/src/utils/headers.rs +++ b/src/utils/headers.rs @@ -17,7 +17,7 @@ pub fn parse_header<T: FromStr>(request: &HttpRequest, header_name: &str) -> Opt .and_then(|value| // Parsing it to string. match value.to_str() { - Ok(header_str) => Some(String::from(header_str)), + Ok(header_str) => Some(header_str), Err(_) => None, }) .and_then(|val|