diff --git a/src/config.rs b/src/config.rs index 21285ad7d08111369ffeea64d7acefec94d3ac32..fdafc53516f2e9d41c19ce0108f9fe9cadd49ba4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -108,6 +108,11 @@ pub struct NotificationsOptions { #[structopt(long, env = "RUSTUS_HOOKS_HTTP_URLS", use_delimiter = true)] pub hooks_http_urls: Vec<String>, + // List of headers to forward from client. + #[cfg(feature = "http_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_HTTP_PROXY_HEADERS", use_delimiter = true)] + pub hooks_http_proxy_headers: Vec<String>, + /// Url for AMQP server. #[cfg(feature = "amqp_notifier")] #[structopt(long, env = "RUSTUS_HOOKS_AMQP_URL")] diff --git a/src/notifiers/amqp_notifier.rs b/src/notifiers/amqp_notifier.rs index eb9d7defbb07740432272e7158eb2828ffdb8280..4177c61be55400c20d2c925f9fe798e76be7095e 100644 --- a/src/notifiers/amqp_notifier.rs +++ b/src/notifiers/amqp_notifier.rs @@ -1,5 +1,6 @@ use crate::notifiers::{Hook, Notifier}; use crate::{RustusConf, RustusResult}; +use actix_web::http::header::HeaderMap; use async_trait::async_trait; use lapin::options::{ BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions, @@ -65,7 +66,12 @@ impl Notifier for AMQPNotifier { Ok(()) } - async fn send_message(&self, message: String, hook: Hook) -> RustusResult<()> { + async fn send_message( + &self, + message: String, + hook: Hook, + _header_map: &HeaderMap, + ) -> RustusResult<()> { let chan = self.pool.get().await?.create_channel().await?; let queue = Self::get_queue_name(hook); chan.basic_publish( diff --git a/src/notifiers/http_notifier.rs b/src/notifiers/http_notifier.rs index dcd50b3003184de3e6b00f4bac68d866df69e8cf..d8ea9ac300881b0924622c02100434331e254612 100644 --- a/src/notifiers/http_notifier.rs +++ b/src/notifiers/http_notifier.rs @@ -2,6 +2,7 @@ use crate::errors::RustusResult; use crate::notifiers::{Hook, Notifier}; +use actix_web::http::header::HeaderMap; use async_trait::async_trait; use futures::future::try_join_all; use log::debug; @@ -11,12 +12,17 @@ use std::time::Duration; pub struct HttpNotifier { urls: Vec<String>, client: Client, + forward_headers: Vec<String>, } impl HttpNotifier { - pub fn new(urls: Vec<String>) -> Self { + pub fn new(urls: Vec<String>, forward_headers: Vec<String>) -> Self { let client = Client::new(); - Self { urls, client } + Self { + urls, + client, + forward_headers, + } } } @@ -26,18 +32,28 @@ impl Notifier for HttpNotifier { Ok(()) } - async fn send_message(&self, message: String, hook: Hook) -> RustusResult<()> { + async fn send_message( + &self, + message: String, + hook: Hook, + header_map: &HeaderMap, + ) -> RustusResult<()> { debug!("Starting HTTP Hook."); let idempotency_key = uuid::Uuid::new_v4().to_string(); let requests_vec = self.urls.iter().map(|url| { debug!("Preparing request for {}", url); - self.client + let mut request = self + .client .post(url.as_str()) .header("Idempotency-Key", idempotency_key.as_str()) .header("Hook-Name", hook.clone().to_string()) - .timeout(Duration::from_secs(2)) - .body(message.clone()) - .send() + .timeout(Duration::from_secs(2)); + for item in &self.forward_headers { + if let Some(value) = header_map.get(item.clone()) { + request = request.header(item.clone(), value.as_bytes()); + } + } + request.body(message.clone()).send() }); let responses = try_join_all(requests_vec).await?; for resp in responses { diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs index 668ef8c57121ecf7fdfab9903747bde8f75d1612..2e5da50288fdfca9adbb8c8a46a73f799adb020f 100644 --- a/src/notifiers/models/notification_manager.rs +++ b/src/notifiers/models/notification_manager.rs @@ -1,11 +1,11 @@ -use crate::errors::{RustusError, RustusResult}; +use crate::errors::RustusResult; #[cfg(feature = "amqp_notifier")] use crate::notifiers::amqp_notifier; #[cfg(feature = "http_notifier")] use crate::notifiers::http_notifier; use crate::notifiers::{Hook, Notifier}; use crate::RustusConf; -use futures::future::try_join_all; +use actix_web::http::header::HeaderMap; use log::debug; pub struct NotificationManager { @@ -25,6 +25,10 @@ impl NotificationManager { .notifiers .push(Box::new(http_notifier::HttpNotifier::new( tus_config.notification_opts.hooks_http_urls.clone(), + tus_config + .notification_opts + .hooks_http_proxy_headers + .clone(), ))); } #[cfg(feature = "amqp_notifier")] @@ -43,15 +47,16 @@ impl NotificationManager { Ok(manager) } - pub async fn send_message(&self, message: String, hook: Hook) -> RustusResult<()> { - let mut futures = Vec::new(); + pub async fn send_message( + &self, + message: String, + hook: Hook, + header_map: &HeaderMap, + ) -> RustusResult<()> { for notifier in &self.notifiers { - futures.push(notifier.send_message(message.clone(), hook)); - } - if !futures.is_empty() { - try_join_all(futures) - .await - .map_err(|err| RustusError::HookError(err.to_string()))?; + notifier + .send_message(message.clone(), hook, header_map) + .await?; } Ok(()) } diff --git a/src/notifiers/models/notifier.rs b/src/notifiers/models/notifier.rs index 1b5b9c84ac731154528a465e7c662de2d411eff9..272fa8c2d1e5a16d077b866653a976073bdd5507 100644 --- a/src/notifiers/models/notifier.rs +++ b/src/notifiers/models/notifier.rs @@ -1,4 +1,5 @@ use crate::errors::RustusResult; +use actix_web::http::header::HeaderMap; use crate::notifiers::Hook; use async_trait::async_trait; @@ -6,5 +7,10 @@ use async_trait::async_trait; #[async_trait] pub trait Notifier { async fn prepare(&mut self) -> RustusResult<()>; - async fn send_message(&self, message: String, hook: Hook) -> RustusResult<()>; + async fn send_message( + &self, + message: String, + hook: Hook, + headers_map: &HeaderMap, + ) -> RustusResult<()>; } diff --git a/src/protocol/core/routes.rs b/src/protocol/core/routes.rs index cdb0ecbbde02fc7768a44ead7fd5d39d5d8f8e6a..c81e32a304006001f4c50cd9887bd545cea4e64a 100644 --- a/src/protocol/core/routes.rs +++ b/src/protocol/core/routes.rs @@ -83,7 +83,12 @@ pub async fn write_bytes( .notification_opts .notification_format .format(&request, &file_info)?; - tokio::spawn(async move { notification_manager.send_message(message, hook).await }); + let headers = request.headers().clone(); + tokio::spawn(async move { + notification_manager + .send_message(message, hook, &headers) + .await + }); } Ok(HttpResponse::NoContent() .insert_header(("Upload-Offset", file_info.offset.to_string())) diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs index 59375a08e911ef8ca7a0981ed14bc6b608e8c78c..f9ee9cca7410bc419890bdbc9a418f1307d9d5ec 100644 --- a/src/protocol/creation/routes.rs +++ b/src/protocol/creation/routes.rs @@ -81,8 +81,9 @@ pub async fn create_file( .notification_opts .notification_format .format(&request, &initial_file_info)?; + let headers = request.headers(); notification_manager - .send_message(message, Hook::PreCreate) + .send_message(message, Hook::PreCreate, headers) .await?; } @@ -111,11 +112,12 @@ pub async fn create_file( .notification_opts .notification_format .format(&request, &file_info)?; + let headers = request.headers().clone(); // Adding send_message task to tokio reactor. // Thin function would be executed in background. tokio::spawn(async move { notification_manager - .send_message(message, Hook::PostCreate) + .send_message(message, Hook::PostCreate, &headers) .await }); } diff --git a/src/protocol/termination/routes.rs b/src/protocol/termination/routes.rs index 7222b4f5d69e08e35169c8804e7262e3ed1a21a5..272fc2154a43c7fc4b383df1512246925cb3545d 100644 --- a/src/protocol/termination/routes.rs +++ b/src/protocol/termination/routes.rs @@ -22,9 +22,10 @@ pub async fn terminate( .notification_opts .notification_format .format(&request, &file_info)?; + let headers = request.headers().clone(); tokio::spawn(async move { notification_manager - .send_message(message, Hook::PostTerminate) + .send_message(message, Hook::PostTerminate, &headers) .await }); }