diff --git a/Cargo.lock b/Cargo.lock index 391358798514c0e25a615f88319b103f818dcc80..d4e836f2ff0e158ea29d5e4a7e8edfce290fcb60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2511,6 +2511,7 @@ dependencies = [ "thiserror", "tokio", "tokio-amqp", + "tokio-stream", "url", "uuid 1.0.0-alpha.1", ] diff --git a/Cargo.toml b/Cargo.toml index e94783a8072379a06de11068ab6560ea125ad805..0afe62c766c556f5c93bc1630e0e85e919ea0fa8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ serde_json = "1" strfmt = "^0.1.6" thiserror = "^1.0" url = "2.2.2" +tokio-stream = "0.1.8" [dependencies.openssl] version = "0.10.38" @@ -88,7 +89,7 @@ features = ["derive"] version = "0.23" [dependencies.tokio] -features = ["time"] +features = ["time", "sync", "io-util"] version = "1.4.0" [dependencies.tokio-amqp] diff --git a/src/protocol/getting/routes.rs b/src/protocol/getting/routes.rs index 97af31a183dba4a927d2b064c5abb9098d75aad8..681f5f8f86dcee3d32647d3b0b642a369b49e6de 100644 --- a/src/protocol/getting/routes.rs +++ b/src/protocol/getting/routes.rs @@ -1,4 +1,7 @@ -use actix_web::{web, HttpRequest, Responder}; +use actix_web::{web, HttpRequest, HttpResponse, Responder}; + +use tokio::sync::mpsc::unbounded_channel; +use tokio_stream::wrappers::UnboundedReceiverStream; use crate::errors::RustusError; use crate::State; @@ -13,7 +16,14 @@ pub async fn get_file(request: HttpRequest, state: web::Data<State>) -> impl Res if file_info.storage != state.data_storage.to_string() { return Err(RustusError::FileNotFound); } - state.data_storage.get_contents(&file_info).await + let (tx, rx_body) = unbounded_channel(); + tokio::spawn(async move { + state + .data_storage + .get_contents(file_info.clone(), tx.clone()) + .await + }); + Ok(HttpResponse::Ok().streaming(UnboundedReceiverStream::new(rx_body))) } else { Err(RustusError::FileNotFound) } diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs index 8d8c989255b689b8d32f8ada2f2599511e82d599..7012c4e2de30a0a0ddc8efce7f858d59f5ad94f0 100644 --- a/src/storages/file_storage.rs +++ b/src/storages/file_storage.rs @@ -1,6 +1,6 @@ use std::path::PathBuf; -use actix_files::NamedFile; +use actix_web::web::Bytes; use async_std::fs::{remove_file, DirBuilder, OpenOptions}; use async_std::prelude::*; use async_trait::async_trait; @@ -11,6 +11,9 @@ use crate::info_storages::FileInfo; use crate::storages::Storage; use crate::RustusConf; use derive_more::Display; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, BufReader}; +use tokio::sync::mpsc::UnboundedSender; #[derive(Display)] #[display(fmt = "file_storage")] @@ -62,16 +65,30 @@ impl Storage for FileStorage { Ok(()) } - async fn get_contents(&self, file_info: &FileInfo) -> RustusResult<NamedFile> { + async fn get_contents( + &self, + file_info: FileInfo, + channel: UnboundedSender<RustusResult<Bytes>>, + ) -> RustusResult<()> { if file_info.path.is_none() { return Err(RustusError::FileNotFound); } - NamedFile::open_async(file_info.path.clone().unwrap().as_str()) - .await - .map_err(|err| { - error!("{:?}", err); - RustusError::FileNotFound - }) + let file = File::open(file_info.path.unwrap()).await?; + let mut reader = BufReader::new(file); + let mut buffer = [0; 1024 * 4]; + loop { + let read = reader.read(&mut buffer).await?; + if read == 0 { + break; + } + channel + .send(Ok(buffer.into_iter().take(read).collect::<Bytes>())) + .map_err(|err| { + error!("{}", err); + RustusError::UnableToReadInfo + })?; + } + Ok(()) } async fn add_bytes(&self, info: &FileInfo, bytes: &[u8]) -> RustusResult<()> { diff --git a/src/storages/models/storage.rs b/src/storages/models/storage.rs index e1294433f1dfff8fbbb8fdd30a7ba7a323a12e3b..3b1996190fde422d9ca352bd7f07fd85c7403a48 100644 --- a/src/storages/models/storage.rs +++ b/src/storages/models/storage.rs @@ -1,9 +1,13 @@ use crate::errors::RustusResult; use crate::info_storages::FileInfo; -use actix_files::NamedFile; + +use actix_web::web::Bytes; + use async_trait::async_trait; use std::fmt::Display; +use tokio::sync::mpsc::UnboundedSender; + #[async_trait] pub trait Storage: Display { /// Prepare storage before starting up server. @@ -25,7 +29,11 @@ pub trait Storage: Display { /// /// # Params /// `file_info` - info about current file. - async fn get_contents(&self, file_info: &FileInfo) -> RustusResult<NamedFile>; + async fn get_contents( + &self, + file_info: FileInfo, + sender: UnboundedSender<RustusResult<Bytes>>, + ) -> RustusResult<()>; /// Add bytes to the file. ///