From cac83fb29e8f7878e6c221b0afd027c5511357ee Mon Sep 17 00:00:00 2001
From: Pavel Kirilin <win10@list.ru>
Date: Mon, 27 Dec 2021 01:11:45 +0400
Subject: [PATCH] Added AMQP notifier.

Signed-off-by: Pavel Kirilin <win10@list.ru>
---
 Cargo.lock                                   | 122 +++++++++++++++++++
 Cargo.toml                                   |   9 +-
 README.md                                    |   6 +-
 src/config.rs                                |  12 +-
 src/errors.rs                                |   6 +
 src/main.rs                                  |   2 +-
 src/notifiers/amqp_notifier.rs               |  81 ++++++++++++
 src/notifiers/mod.rs                         |   2 +
 src/notifiers/models/notification_manager.rs |  18 ++-
 src/storages/models/available_stores.rs      |   1 +
 10 files changed, 249 insertions(+), 10 deletions(-)
 create mode 100644 src/notifiers/amqp_notifier.rs

diff --git a/Cargo.lock b/Cargo.lock
index 3fe75eb..6d72a85 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -232,6 +232,52 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "amq-protocol"
+version = "6.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a3b51c409a2b30e48826a593d56a26a43f37bf3df682821e55cd108e24de9ce7"
+dependencies = [
+ "amq-protocol-tcp",
+ "amq-protocol-types",
+ "amq-protocol-uri",
+ "cookie-factory",
+ "nom",
+]
+
+[[package]]
+name = "amq-protocol-tcp"
+version = "6.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9797cd3a8f1491e4828818341aea5e1378029ec89dc2422cd85d62715f913080"
+dependencies = [
+ "amq-protocol-uri",
+ "tcp-stream",
+ "tracing",
+]
+
+[[package]]
+name = "amq-protocol-types"
+version = "6.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "028cb766932137535fe8a320245e385bac379506d7e9e3d0375be069bb6fb0de"
+dependencies = [
+ "cookie-factory",
+ "nom",
+ "serde",
+ "serde_json",
+]
+
+[[package]]
+name = "amq-protocol-uri"
+version = "6.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b08059afa9495f674408891c5eaee50c5eca3532598c90532c1856990b91da96"
+dependencies = [
+ "percent-encoding",
+ "url",
+]
+
 [[package]]
 name = "ansi_term"
 version = "0.12.1"
@@ -657,6 +703,12 @@ dependencies = [
  "version_check",
 ]
 
+[[package]]
+name = "cookie-factory"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b"
+
 [[package]]
 name = "core-foundation"
 version = "0.9.2"
@@ -823,6 +875,12 @@ version = "1.0.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
 
+[[package]]
+name = "doc-comment"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
+
 [[package]]
 name = "dtoa"
 version = "0.4.8"
@@ -1353,6 +1411,23 @@ version = "0.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388"
 
+[[package]]
+name = "lapin"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ef0a8c145a248b1536cfa06890d480cda7982add59f77973ac7b03db47f349b5"
+dependencies = [
+ "amq-protocol",
+ "async-task",
+ "crossbeam-channel",
+ "futures-core",
+ "log",
+ "mio 0.7.14",
+ "parking_lot",
+ "pinky-swear",
+ "serde",
+]
+
 [[package]]
 name = "lazy_static"
 version = "1.4.0"
@@ -1533,6 +1608,16 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "mobc-lapin"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7c0820bedacbc30154f91261b87d6f92ee40e74592df4e5e815f9b817f290127"
+dependencies = [
+ "lapin",
+ "mobc",
+]
+
 [[package]]
 name = "mobc-redis"
 version = "0.7.0"
@@ -1822,6 +1907,17 @@ version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
 
+[[package]]
+name = "pinky-swear"
+version = "4.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9bf8cda6f8e1500338634e4e3ce90ac59eb7929a1e088b6946c742be1cc44dc1"
+dependencies = [
+ "doc-comment",
+ "parking_lot",
+ "tracing",
+]
+
 [[package]]
 name = "pkg-config"
 version = "0.3.24"
@@ -2256,8 +2352,10 @@ dependencies = [
  "derive_more",
  "fern",
  "futures",
+ "lapin",
  "lazy_static",
  "log",
+ "mobc-lapin",
  "mobc-redis",
  "rbatis",
  "rbson",
@@ -2269,6 +2367,7 @@ dependencies = [
  "strum",
  "thiserror",
  "tokio",
+ "tokio-amqp",
  "url",
  "uuid 1.0.0-alpha.1",
 ]
@@ -2744,6 +2843,18 @@ dependencies = [
  "unicode-xid",
 ]
 
+[[package]]
+name = "tcp-stream"
+version = "0.20.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3a77f06a7c6a1ecff1bbab0743a8beca305ca1ebcbe5d1bc32390e94117859af"
+dependencies = [
+ "cfg-if",
+ "mio 0.7.14",
+ "native-tls",
+ "pem",
+]
+
 [[package]]
 name = "tempfile"
 version = "3.2.0"
@@ -2880,6 +2991,17 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "tokio-amqp"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4a236324e1e84931e62c22e2ee47688e077aa33a2a95f14d29ab8ec4dbaf9845"
+dependencies = [
+ "lapin",
+ "parking_lot",
+ "tokio",
+]
+
 [[package]]
 name = "tokio-native-tls"
 version = "0.3.0"
diff --git a/Cargo.toml b/Cargo.toml
index 1ea4fde..9fd4336 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,16 +29,21 @@ strfmt = "^0.1.6"
 lazy_static = "1.4.0"
 strum = { version = "0.23", features = ["derive"] }
 futures = { version = "^0.3.19", features = ["executor"] }
-reqwest = { version = "^0.11.8", features = ["json"], optional = true }
 # Deps for info storages.
 rbson = { version = "2.0", optional = true }
 rbatis = { version = "^3.0", default-features = false, features = ["runtime-actix-rustls", "all-database"], optional = true }
 mobc-redis = { version = "^0.7.0", optional = true }
+# Deps for notifiers
+lapin = { version = "^1.9.0", optional = true }
+tokio-amqp = { version = "1.0.0", optional = true }
+mobc-lapin = { version = "0.7.0", optional = true }
+reqwest = { version = "^0.11.8", features = ["json"], optional = true }
 
 [features]
 default = []
-all = ["redis_info_storage", "db_info_storage", "http_notifier"]
+all = ["redis_info_storage", "db_info_storage", "http_notifier", "amqp_notifier"]
 redis_info_storage = ["mobc-redis"]
 db_info_storage = ["rbatis", "rbson"]
+amqp_notifier = ["lapin", "tokio-amqp", "mobc-lapin"]
 
 http_notifier = ["reqwest"]
\ No newline at end of file
diff --git a/README.md b/README.md
index c4c0003..45012b8 100644
--- a/README.md
+++ b/README.md
@@ -68,10 +68,10 @@ All options are listed in `rustus --help`.
 * [x] Route to get uploaded files;
 * [x] Database support for info storage;
 * [x] Redis support for info storage;
-* [ ] S3 as data storage store support;
 * [x] Notification interface;
-* [ ] Executable files notifications;
 * [x] Notifications via http hooks;
-* [ ] Notifications via RabbitMQ;
+* [x] Notifications via RabbitMQ;
+* [ ] S3 as data storage store support;
+* [ ] Executable files notifications;
 * [ ] Rustus helm chart;
 * [ ] Cloud native rustus operator.
diff --git a/src/config.rs b/src/config.rs
index 8a1a6f8..21285ad 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -104,11 +104,19 @@ pub struct NotificationsOptions {
     pub hooks: Vec<Hook>,
 
     /// List of URLS to send webhooks to.
-    ///
-    /// This list will be notified
     #[cfg(feature = "http_notifier")]
     #[structopt(long, env = "RUSTUS_HOOKS_HTTP_URLS", use_delimiter = true)]
     pub hooks_http_urls: Vec<String>,
+
+    /// Url for AMQP server.
+    #[cfg(feature = "amqp_notifier")]
+    #[structopt(long, env = "RUSTUS_HOOKS_AMQP_URL")]
+    pub hooks_amqp_url: Option<String>,
+
+    /// Name of amqp exchange.
+    #[cfg(feature = "amqp_notifier")]
+    #[structopt(long, env = "RUSTUS_HOOKS_AMQP_EXCHANGE", default_value = "rustus")]
+    pub hooks_amqp_exchange: String,
 }
 
 #[derive(Debug, StructOpt, Clone)]
diff --git a/src/errors.rs b/src/errors.rs
index 7dd6475..18bd024 100644
--- a/src/errors.rs
+++ b/src/errors.rs
@@ -49,6 +49,12 @@ pub enum RustusError {
     HookError(String),
     #[error("Unable to configure logging: {0}")]
     LogConfigError(#[from] log::SetLoggerError),
+    #[cfg(feature = "amqp_notifier")]
+    #[error("AMQP error: {0}")]
+    AMQPError(#[from] lapin::Error),
+    #[cfg(feature = "amqp_notifier")]
+    #[error("AMQP error: {0}")]
+    AMQPPoolError(#[from] mobc_lapin::mobc::Error<lapin::Error>),
 }
 
 /// This conversion allows us to use `RustusError` in the `main` function.
diff --git a/src/main.rs b/src/main.rs
index 9c737dc..fdddec4 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -168,7 +168,7 @@ async fn main() -> std::io::Result<()> {
     storage.prepare().await?;
 
     // Creating notification manager.
-    let notification_manager = NotificationManager::new(&app_conf);
+    let notification_manager = NotificationManager::new(&app_conf).await?;
 
     // Creating actual server and running it.
     let server = create_server(storage, app_conf, notification_manager)?;
diff --git a/src/notifiers/amqp_notifier.rs b/src/notifiers/amqp_notifier.rs
new file mode 100644
index 0000000..eb9d7de
--- /dev/null
+++ b/src/notifiers/amqp_notifier.rs
@@ -0,0 +1,81 @@
+use crate::notifiers::{Hook, Notifier};
+use crate::{RustusConf, RustusResult};
+use async_trait::async_trait;
+use lapin::options::{
+    BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
+};
+use lapin::types::FieldTable;
+use lapin::{BasicProperties, ConnectionProperties, ExchangeKind};
+use mobc_lapin::mobc::Pool;
+use mobc_lapin::RMQConnectionManager;
+use strum::IntoEnumIterator;
+use tokio_amqp::LapinTokioExt;
+
+pub struct AMQPNotifier {
+    exchange_name: String,
+    pool: Pool<RMQConnectionManager>,
+}
+
+impl AMQPNotifier {
+    pub fn new(app_conf: RustusConf) -> Self {
+        let manager = RMQConnectionManager::new(
+            app_conf.notification_opts.hooks_amqp_url.unwrap(),
+            ConnectionProperties::default().with_tokio(),
+        );
+        let pool = Pool::<RMQConnectionManager>::builder().build(manager);
+        Self {
+            pool,
+            exchange_name: app_conf.notification_opts.hooks_amqp_exchange,
+        }
+    }
+
+    pub fn get_queue_name(hook: Hook) -> String {
+        format!("rustus.{}", hook)
+    }
+}
+
+#[async_trait]
+impl Notifier for AMQPNotifier {
+    async fn prepare(&mut self) -> RustusResult<()> {
+        let chan = self.pool.get().await?.create_channel().await?;
+        chan.exchange_declare(
+            self.exchange_name.as_str(),
+            ExchangeKind::Topic,
+            ExchangeDeclareOptions::default(),
+            FieldTable::default(),
+        )
+        .await?;
+        for hook in Hook::iter() {
+            let queue_name = Self::get_queue_name(hook);
+            chan.queue_declare(
+                queue_name.as_str(),
+                QueueDeclareOptions::default(),
+                FieldTable::default(),
+            )
+            .await?;
+            chan.queue_bind(
+                queue_name.as_str(),
+                self.exchange_name.as_str(),
+                queue_name.as_str(),
+                QueueBindOptions::default(),
+                FieldTable::default(),
+            )
+            .await?;
+        }
+        Ok(())
+    }
+
+    async fn send_message(&self, message: String, hook: Hook) -> RustusResult<()> {
+        let chan = self.pool.get().await?.create_channel().await?;
+        let queue = Self::get_queue_name(hook);
+        chan.basic_publish(
+            self.exchange_name.as_str(),
+            queue.as_str(),
+            BasicPublishOptions::default(),
+            message.as_bytes().to_vec(),
+            BasicProperties::default().with_content_type("application/json".into()),
+        )
+        .await?;
+        Ok(())
+    }
+}
diff --git a/src/notifiers/mod.rs b/src/notifiers/mod.rs
index 096a525..fb273b4 100644
--- a/src/notifiers/mod.rs
+++ b/src/notifiers/mod.rs
@@ -1,3 +1,5 @@
+#[cfg(feature = "amqp_notifier")]
+pub mod amqp_notifier;
 #[cfg(feature = "http_notifier")]
 pub mod http_notifier;
 pub mod models;
diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs
index 10ccb4f..668ef8c 100644
--- a/src/notifiers/models/notification_manager.rs
+++ b/src/notifiers/models/notification_manager.rs
@@ -1,4 +1,6 @@
 use crate::errors::{RustusError, RustusResult};
+#[cfg(feature = "amqp_notifier")]
+use crate::notifiers::amqp_notifier;
 #[cfg(feature = "http_notifier")]
 use crate::notifiers::http_notifier;
 use crate::notifiers::{Hook, Notifier};
@@ -11,7 +13,7 @@ pub struct NotificationManager {
 }
 
 impl NotificationManager {
-    pub fn new(tus_config: &RustusConf) -> Self {
+    pub async fn new(tus_config: &RustusConf) -> RustusResult<Self> {
         let mut manager = Self {
             notifiers: Vec::new(),
         };
@@ -25,8 +27,20 @@ impl NotificationManager {
                     tus_config.notification_opts.hooks_http_urls.clone(),
                 )));
         }
+        #[cfg(feature = "amqp_notifier")]
+        if tus_config.notification_opts.hooks_amqp_url.is_some() {
+            debug!("Found AMQP notifier.");
+            manager
+                .notifiers
+                .push(Box::new(amqp_notifier::AMQPNotifier::new(
+                    tus_config.clone(),
+                )));
+        }
+        for notifier in &mut manager.notifiers.iter_mut() {
+            notifier.prepare().await?;
+        }
         debug!("Notification manager initialized.");
-        manager
+        Ok(manager)
     }
 
     pub async fn send_message(&self, message: String, hook: Hook) -> RustusResult<()> {
diff --git a/src/storages/models/available_stores.rs b/src/storages/models/available_stores.rs
index 931922a..34880c1 100644
--- a/src/storages/models/available_stores.rs
+++ b/src/storages/models/available_stores.rs
@@ -18,6 +18,7 @@ impl AvailableStores {
     ///
     /// # Params
     /// `config` - Rustus configuration.
+    /// `info_storage` - Storage for information about files.
     ///
     pub fn get(
         &self,
-- 
GitLab