diff --git a/Cargo.lock b/Cargo.lock index b1df24ac72a7a9ccf695e1aa6514aa5a7215bfc8..87fef6ff310ee084a3db447431ce9e287a7b4a54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -376,6 +376,23 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-process" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83137067e3a2a6a06d67168e49e68a0957d215410473a740cea95a2425c0b7c6" +dependencies = [ + "async-io", + "blocking", + "cfg-if", + "event-listener", + "futures-lite", + "libc", + "once_cell", + "signal-hook", + "winapi", +] + [[package]] name = "async-std" version = "1.10.0" @@ -2465,10 +2482,11 @@ dependencies = [ [[package]] name = "rustus" -version = "0.2.0" +version = "0.3.0" dependencies = [ "actix-files", "actix-web", + "async-process", "async-std", "async-trait", "base64", @@ -2687,6 +2705,16 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "signal-hook" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "647c97df271007dcea485bb74ffdb57f2e683f1306c854f468a0c244badabf2d" +dependencies = [ + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" diff --git a/Cargo.toml b/Cargo.toml index a2e37c493461353e47cd4b66d499ec31f16628fa..b2a78e2ed13cab41014c09c3ae7a932db5642e27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rustus" -version = "0.2.0" +version = "0.3.0" edition = "2021" description = "TUS protocol implementation written in Rust." @@ -71,6 +71,10 @@ version = "^3.0" optional = true version = "2.0" +[dependencies.async-process] +version = "1.3.0" +optional = true + [dependencies.reqwest] features = ["json"] optional = true @@ -96,15 +100,16 @@ features = ["v4"] version = "^1.0.0-alpha.1" [features] -all = ["redis_info_storage", "db_info_storage", "http_notifier", "amqp_notifier"] +all = ["redis_info_storage", "db_info_storage", "http_notifier", "amqp_notifier", "file_notifiers"] amqp_notifier = ["lapin", "tokio-amqp", "mobc-lapin"] db_info_storage = ["rbatis", "rbson"] default = [] http_notifier = ["reqwest"] redis_info_storage = ["mobc-redis"] +file_notifiers = ["async-process"] [profile] [profile.release] lto = true opt-level = "z" -codegen-units = 1 \ No newline at end of file +codegen-units = 1 diff --git a/README.md b/README.md index 3e913382778046cbfb9ffc89874f0d51418a04b7..ce46e03c3d430c6ceb2d846898cf4e34d052def5 100644 --- a/README.md +++ b/README.md @@ -110,7 +110,7 @@ We can't validate anything to stop uploading. * [x] Notification interface; * [x] Notifications via http hooks; * [x] Notifications via RabbitMQ; -* [ ] Executable files notifications; +* [X] Executable files notifications; * [ ] S3 as data storage store support; * [ ] Rustus helm chart; * [ ] Cloud native rustus operator. diff --git a/src/config.rs b/src/config.rs index fdafc53516f2e9d41c19ce0108f9fe9cadd49ba4..d27bc42a93bb68b020fae394de91cca83d790f84 100644 --- a/src/config.rs +++ b/src/config.rs @@ -122,6 +122,14 @@ pub struct NotificationsOptions { #[cfg(feature = "amqp_notifier")] #[structopt(long, env = "RUSTUS_HOOKS_AMQP_EXCHANGE", default_value = "rustus")] pub hooks_amqp_exchange: String, + + #[cfg(feature = "file_notifiers")] + #[structopt(long, env = "RUSTUS_HOOKS_DIR")] + pub hooks_dir: Option<PathBuf>, + + #[cfg(feature = "file_notifiers")] + #[structopt(long, env = "RUSTUS_HOOKS_FILE")] + pub hooks_file: Option<String>, } #[derive(Debug, StructOpt, Clone)] diff --git a/src/errors.rs b/src/errors.rs index 18bd0244af8030fc08a05990c12148bde48e84fa..af844fff668818d7c6ef573f3890888d85c73260 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -55,6 +55,8 @@ pub enum RustusError { #[cfg(feature = "amqp_notifier")] #[error("AMQP error: {0}")] AMQPPoolError(#[from] mobc_lapin::mobc::Error<lapin::Error>), + #[error("Std error: {0}")] + StdError(#[from] std::io::Error), } /// This conversion allows us to use `RustusError` in the `main` function. diff --git a/src/info_storages/file_info_storage.rs b/src/info_storages/file_info_storage.rs index fe8ccf14ea9368f6187e1cadf9ab1a2fa0f60c44..338ac6800406dfe8166fab78f8028eb3fe8105eb 100644 --- a/src/info_storages/file_info_storage.rs +++ b/src/info_storages/file_info_storage.rs @@ -59,6 +59,7 @@ impl InfoStorage for FileInfoStorage { .to_string(), ) })?; + file.sync_data().await?; Ok(()) } diff --git a/src/notifiers/dir_notifier.rs b/src/notifiers/dir_notifier.rs new file mode 100644 index 0000000000000000000000000000000000000000..670d1e1bd04aeb9839ce3691932d89b7f0474d50 --- /dev/null +++ b/src/notifiers/dir_notifier.rs @@ -0,0 +1,48 @@ +use crate::errors::RustusError; +use crate::notifiers::{Hook, Notifier}; +use crate::RustusResult; +use actix_web::http::header::HeaderMap; +use async_process::{Command, Stdio}; +use async_trait::async_trait; +use futures::AsyncWriteExt; +use log::debug; +use std::path::PathBuf; + +pub struct DirNotifier { + pub dir: PathBuf, +} + +impl DirNotifier { + pub fn new(dir: PathBuf) -> Self { + Self { dir } + } +} + +#[async_trait] +impl Notifier for DirNotifier { + async fn prepare(&mut self) -> RustusResult<()> { + Ok(()) + } + + async fn send_message( + &self, + message: String, + hook: Hook, + _headers_map: &HeaderMap, + ) -> RustusResult<()> { + let hook_path = self.dir.join(hook.to_string()); + debug!("Running hook: {}", hook_path.as_path().display()); + let mut command = Command::new(hook_path).stdin(Stdio::piped()).spawn()?; + command + .stdin + .as_mut() + .unwrap() + .write_all(message.as_bytes()) + .await?; + let stat = command.status().await?; + if !stat.success() { + return Err(RustusError::HookError("Returned wrong status code".into())); + } + Ok(()) + } +} diff --git a/src/notifiers/file_notifier.rs b/src/notifiers/file_notifier.rs new file mode 100644 index 0000000000000000000000000000000000000000..a502592c24fdc68fc3ef2db125598e3f1b1f2f75 --- /dev/null +++ b/src/notifiers/file_notifier.rs @@ -0,0 +1,49 @@ +use crate::errors::RustusError; +use crate::notifiers::{Hook, Notifier}; +use crate::RustusResult; +use actix_web::http::header::HeaderMap; +use async_process::{Command, Stdio}; +use async_trait::async_trait; +use futures::AsyncWriteExt; +use log::debug; + +pub struct FileNotifier { + pub command: String, +} + +impl FileNotifier { + pub fn new(command: String) -> Self { + Self { command } + } +} + +#[async_trait] +impl Notifier for FileNotifier { + async fn prepare(&mut self) -> RustusResult<()> { + Ok(()) + } + + async fn send_message( + &self, + message: String, + hook: Hook, + _headers_map: &HeaderMap, + ) -> RustusResult<()> { + debug!("Running command: {}", self.command.as_str()); + let mut command = Command::new(self.command.as_str()) + .arg(hook.to_string()) + .stdin(Stdio::piped()) + .spawn()?; + command + .stdin + .as_mut() + .unwrap() + .write_all(message.as_bytes()) + .await?; + let stat = command.status().await?; + if !stat.success() { + return Err(RustusError::HookError("Returned wrong status code".into())); + } + Ok(()) + } +} diff --git a/src/notifiers/mod.rs b/src/notifiers/mod.rs index fb273b487f06f6d7a6728deec52c4afc36984281..86ac9c692379c510f4fb15ee8cc7012f21f84131 100644 --- a/src/notifiers/mod.rs +++ b/src/notifiers/mod.rs @@ -1,5 +1,9 @@ #[cfg(feature = "amqp_notifier")] pub mod amqp_notifier; +#[cfg(feature = "file_notifiers")] +pub mod dir_notifier; +#[cfg(feature = "file_notifiers")] +mod file_notifier; #[cfg(feature = "http_notifier")] pub mod http_notifier; pub mod models; diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs index 2e5da50288fdfca9adbb8c8a46a73f799adb020f..79b538bbad316c533db78f90fd5b6854d9e37fda 100644 --- a/src/notifiers/models/notification_manager.rs +++ b/src/notifiers/models/notification_manager.rs @@ -1,6 +1,10 @@ use crate::errors::RustusResult; #[cfg(feature = "amqp_notifier")] use crate::notifiers::amqp_notifier; +#[cfg(feature = "file_notifiers")] +use crate::notifiers::dir_notifier::DirNotifier; +#[cfg(feature = "file_notifiers")] +use crate::notifiers::file_notifier::FileNotifier; #[cfg(feature = "http_notifier")] use crate::notifiers::http_notifier; use crate::notifiers::{Hook, Notifier}; @@ -18,6 +22,19 @@ impl NotificationManager { notifiers: Vec::new(), }; debug!("Initializing notification manager."); + if tus_config.notification_opts.hooks_file.is_some() { + debug!("Found hooks file"); + manager.notifiers.push(Box::new(FileNotifier::new( + tus_config.notification_opts.hooks_file.clone().unwrap(), + ))); + } + #[cfg(feature = "file_notifiers")] + if tus_config.notification_opts.hooks_dir.is_some() { + debug!("Found hooks directory"); + manager.notifiers.push(Box::new(DirNotifier::new( + tus_config.notification_opts.hooks_dir.clone().unwrap(), + ))); + } #[cfg(feature = "http_notifier")] if !tus_config.notification_opts.hooks_http_urls.is_empty() { debug!("Found http hook urls."); diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs index a8dbd6e37c379591c5c51033c0075f154517ee66..b6b7419cdb6223fd0d07e1304ae85a2fa20b0958 100644 --- a/src/storages/file_storage.rs +++ b/src/storages/file_storage.rs @@ -144,6 +144,7 @@ impl Storage for FileStorage { error!("{:?}", err); RustusError::UnableToWrite(info.path.clone().unwrap()) })?; + file.sync_data().await?; // Updating information about file. info.offset += bytes.len(); self.info_storage.set_info(&info, false).await?; @@ -180,7 +181,7 @@ impl Storage for FileStorage { error!("{:?}", err); RustusError::UnableToWrite(file_path.display().to_string()) })?; - + file.sync_all().await?; // Creating new FileInfo object and saving it. let file_info = FileInfo::new( file_id.as_str(),