From e9c5cb93a884cfce9ad6a4bf4003d16101b5af4e Mon Sep 17 00:00:00 2001
From: Pavel Kirilin <win10@list.ru>
Date: Thu, 17 Feb 2022 03:00:50 +0400
Subject: [PATCH] Added integration tests (#34)

* Added info_redis_storage tests.
* Added db_info_storage integration tests.
* Added AMQP integration tests.

Closes #15.

Signed-off-by: Pavel Kirilin <win10@list.ru>
---
 .github/workflows/test.yml                    | 27 +++++-
 .pre-commit-config.yaml                       | 24 +++---
 Cargo.toml                                    |  6 ++
 src/config.rs                                 |  8 ++
 src/info_storages/db_info_storage.rs          | 74 +++++++++++++++--
 .../models/available_info_storages.rs         | 20 ++++-
 src/info_storages/models/file_info.rs         | 11 +++
 src/info_storages/redis_info_storage.rs       | 83 ++++++++++++++++++-
 src/notifiers/amqp_notifier.rs                | 83 +++++++++++++++++--
 src/notifiers/models/notification_manager.rs  | 29 ++++---
 10 files changed, 322 insertions(+), 43 deletions(-)

diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 0c4388b..300c468 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -55,6 +55,27 @@ jobs:
     needs: pre_job
     if: ${{ needs.pre_job.outputs.should_skip != 'true' }}
     runs-on: ubuntu-latest
+    services:
+      redis:
+        image: redis:6.2-alpine3.15
+        ports:
+          - 6379/tcp
+      pg:
+        image: postgres:13.1
+        ports:
+          - 5432/tcp
+        env:
+          POSTGRES_PASSWORD: "rustus"
+          POSTGRES_USER: "rustus"
+          POSTGRES_DB: "rustus"
+      rabbit:
+        image: rabbitmq:3.8.27-alpine
+        ports:
+          - 5672/tcp
+        env:
+          RABBITMQ_DEFAULT_USER: "guest"
+          RABBITMQ_DEFAULT_PASS: "guest"
+          RABBITMQ_DEFAULT_VHOST: "/"
     steps:
       - uses: actions/checkout@v2
         with:
@@ -64,7 +85,11 @@ jobs:
       - name: Install cargo-llvm-cov
         uses: taiki-e/install-action@cargo-llvm-cov
       - name: Generate code coverage
-        run: cargo llvm-cov --features=all --lcov --output-path lcov.info
+        run: cargo llvm-cov --features=all,integration_tests --lcov --output-path lcov.info -- --test-threads 1
+        env:
+          TEST_REDIS_URL: redis://localhost:${{ job.services.redis.ports['6379'] }}/0
+          TEST_DB_URL: postgresql://rustus:rustus@localhost:${{ job.services.pg.ports['5432'] }}/rustus
+          TEST_AMQP_URL: amqp://guest:guest@localhost:${{ job.services.rabbit.ports['5672'] }}
       - name: Coveralls
         uses: coverallsapp/github-action@master
         with:
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 14f6fe7..ffc5f67 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -6,17 +6,6 @@ repos:
   - repo: local
     hooks:
 
-      - id: build
-        types:
-          - rust
-        name: cargo build
-        language: system
-        entry: cargo
-        pass_filenames: false
-        args:
-          - build
-          - --features=all
-
       - id: fmt
         types:
           - rust
@@ -46,4 +35,15 @@ repos:
           - -W
           - clippy::pedantic
           - -D
-          - warnings
\ No newline at end of file
+          - warnings
+
+      - id: build
+        types:
+          - rust
+        name: cargo build
+        language: system
+        entry: cargo
+        pass_filenames: false
+        args:
+          - build
+          - --features=all
diff --git a/Cargo.toml b/Cargo.toml
index 579f2fe..a083d0d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -106,6 +106,12 @@ default = []
 http_notifier = ["reqwest"]
 redis_info_storage = ["mobc-redis"]
 
+### For testing
+test_redis = []
+test_db = []
+test_rmq = []
+integration_tests = ["test_redis", "test_db", "test_rmq"]
+
 [dev-dependencies]
 tempdir = "0.3.7"
 actix-rt = "2.6.0"
diff --git a/src/config.rs b/src/config.rs
index 32e44d9..f772c6f 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -109,6 +109,14 @@ pub struct NotificationsOptions {
     #[structopt(long, env = "RUSTUS_HOOKS_AMQP_EXCHANGE", default_value = "rustus")]
     pub hooks_amqp_exchange: String,
 
+    #[cfg(feature = "amqp_notifier")]
+    #[structopt(
+        long,
+        env = "RUSTUS_HOOKS_AMQP_QUEUES_PREFIX",
+        default_value = "rustus"
+    )]
+    pub hooks_amqp_queues_prefix: String,
+
     #[structopt(long, env = "RUSTUS_HOOKS_DIR")]
     pub hooks_dir: Option<PathBuf>,
 
diff --git a/src/info_storages/db_info_storage.rs b/src/info_storages/db_info_storage.rs
index e96f053..164f2bc 100644
--- a/src/info_storages/db_info_storage.rs
+++ b/src/info_storages/db_info_storage.rs
@@ -9,7 +9,6 @@ use rbatis::rbatis::Rbatis;
 
 use crate::errors::{RustusError, RustusResult};
 use crate::info_storages::{FileInfo, InfoStorage};
-use crate::RustusConf;
 
 #[crud_table]
 struct DbModel {
@@ -33,15 +32,11 @@ pub struct DBInfoStorage {
 }
 
 impl DBInfoStorage {
-    pub async fn new(app_conf: RustusConf) -> RustusResult<Self> {
+    pub async fn new(dsn: &str) -> RustusResult<Self> {
         let db = Rbatis::new();
         let mut opts = DBPoolOptions::new();
         opts.connect_timeout = Duration::new(2, 0);
-        db.link_opt(
-            app_conf.info_storage_opts.info_db_dsn.unwrap().as_str(),
-            opts,
-        )
-        .await?;
+        db.link_opt(dsn, opts).await?;
         Ok(Self { db })
     }
 }
@@ -84,3 +79,68 @@ impl InfoStorage for DBInfoStorage {
         Ok(())
     }
 }
+
+#[cfg(feature = "test_db")]
+#[cfg(test)]
+mod tests {
+    use super::{DBInfoStorage, DbModel};
+    use crate::info_storages::FileInfo;
+    use crate::InfoStorage;
+    use rbatis::crud::CRUD;
+
+    async fn get_info_storage() -> DBInfoStorage {
+        let db_url = std::env::var("TEST_DB_URL").unwrap();
+        let mut storage = DBInfoStorage::new(db_url.as_str()).await.unwrap();
+        storage.prepare().await.unwrap();
+        storage
+    }
+
+    #[actix_rt::test]
+    async fn success() {
+        let info_storage = get_info_storage().await;
+        let file_info = FileInfo::new_test();
+        info_storage.set_info(&file_info, true).await.unwrap();
+        let info = info_storage
+            .db
+            .fetch_by_column::<Option<DbModel>, &str>("id", file_info.id.as_str())
+            .await
+            .unwrap();
+        assert!(info.is_some());
+        let info = info_storage.get_info(file_info.id.as_str()).await.unwrap();
+        assert_eq!(file_info.id, info.id);
+        assert_eq!(file_info.storage, info.storage);
+        assert_eq!(file_info.length, info.length);
+    }
+
+    #[actix_rt::test]
+    async fn success_deletion() {
+        let info_storage = get_info_storage().await;
+        let file_info = FileInfo::new_test();
+        info_storage.set_info(&file_info, true).await.unwrap();
+        info_storage
+            .remove_info(file_info.id.as_str())
+            .await
+            .unwrap();
+        let info = info_storage
+            .db
+            .fetch_by_column::<Option<DbModel>, &str>("id", file_info.id.as_str())
+            .await
+            .unwrap();
+        assert!(info.is_none());
+    }
+
+    #[actix_rt::test]
+    async fn deletion_not_found() {
+        let info_storage = get_info_storage().await;
+        let res = info_storage.remove_info("unknown").await;
+        // We don't care if it doesn't exist.
+        assert!(res.is_ok());
+    }
+
+    #[actix_rt::test]
+    async fn getting_not_found() {
+        let info_storage = get_info_storage().await;
+        let res = info_storage.get_info("unknown").await;
+        assert!(res.is_err());
+    }
+}
diff --git a/src/info_storages/models/available_info_storages.rs b/src/info_storages/models/available_info_storages.rs
index befef8d..e5e481f 100644
--- a/src/info_storages/models/available_info_storages.rs
+++ b/src/info_storages/models/available_info_storages.rs
@@ -43,11 +43,27 @@ impl AvailableInfoStores {
             ))),
             #[cfg(feature = "db_info_storage")]
             Self::DB => Ok(Box::new(
-                db_info_storage::DBInfoStorage::new(config.clone()).await?,
+                db_info_storage::DBInfoStorage::new(
+                    config
+                        .info_storage_opts
+                        .info_db_dsn
+                        .clone()
+                        .unwrap()
+                        .as_str(),
+                )
+                .await?,
             )),
             #[cfg(feature = "redis_info_storage")]
             AvailableInfoStores::Redis => Ok(Box::new(
-                redis_info_storage::RedisStorage::new(config.clone()).await?,
+                redis_info_storage::RedisStorage::new(
+                    config
+                        .info_storage_opts
+                        .info_db_dsn
+                        .clone()
+                        .unwrap()
+                        .as_str(),
+                )
+                .await?,
             )),
         }
     }
diff --git a/src/info_storages/models/file_info.rs b/src/info_storages/models/file_info.rs
index 44c55a1..bace48b 100644
--- a/src/info_storages/models/file_info.rs
+++ b/src/info_storages/models/file_info.rs
@@ -87,4 +87,15 @@ impl FileInfo {
             Some(result.join(","))
         }
     }
+
+    #[cfg(test)]
+    pub fn new_test() -> Self {
+        FileInfo::new(
+            uuid::Uuid::new_v4().to_string().as_str(),
+            Some(10),
+            Some("random_path".into()),
+            "random_storage".into(),
+            None,
+        )
+    }
 }
diff --git a/src/info_storages/redis_info_storage.rs b/src/info_storages/redis_info_storage.rs
index 584407d..b9e0b3b 100644
--- a/src/info_storages/redis_info_storage.rs
+++ b/src/info_storages/redis_info_storage.rs
@@ -6,15 +6,14 @@ use redis::aio::Connection;
 
 use crate::errors::{RustusError, RustusResult};
 use crate::info_storages::{FileInfo, InfoStorage};
-use crate::RustusConf;
 
 pub struct RedisStorage {
     pool: Pool<RedisConnectionManager>,
 }
 
 impl RedisStorage {
-    pub async fn new(app_conf: RustusConf) -> RustusResult<Self> {
-        let client = redis::Client::open(app_conf.info_storage_opts.info_db_dsn.unwrap().as_str())?;
+    pub async fn new(db_dsn: &str) -> RustusResult<Self> {
+        let client = redis::Client::open(db_dsn)?;
         let manager = RedisConnectionManager::new(client);
         let pool = Pool::builder().max_open(100).build(manager);
         Ok(Self { pool })
@@ -62,3 +61,81 @@ impl InfoStorage for RedisStorage {
         }
     }
 }
+
+#[cfg(test)]
+#[cfg(feature = "test_redis")]
+mod tests {
+    use super::RedisStorage;
+    use crate::info_storages::FileInfo;
+    use crate::InfoStorage;
+    use mobc_redis::redis;
+    use mobc_redis::redis::AsyncCommands;
+
+    async fn get_storage() -> RedisStorage {
+        let redis_url = std::env::var("TEST_REDIS_URL").unwrap();
+        RedisStorage::new(redis_url.as_str()).await.unwrap()
+    }
+
+    async fn get_redis() -> redis::aio::Connection {
+        let redis_url = std::env::var("TEST_REDIS_URL").unwrap();
+        let redis = redis::Client::open(redis_url).unwrap();
+        redis.get_async_connection().await.unwrap()
+    }
+
+    #[actix_rt::test]
+    async fn success() {
+        let info_storage = get_storage().await;
+        let file_info = FileInfo::new_test();
+        info_storage.set_info(&file_info, true).await.unwrap();
+        let mut redis = get_redis().await;
+        let value: Option<String> = redis.get(file_info.id.as_str()).await.unwrap();
+        assert!(value.is_some());
+
+        let file_info_from_storage = info_storage.get_info(file_info.id.as_str()).await.unwrap();
+
+        assert_eq!(file_info.id, file_info_from_storage.id);
+        assert_eq!(file_info.path, file_info_from_storage.path);
+        assert_eq!(file_info.storage, file_info_from_storage.storage);
+    }
+
+    #[actix_rt::test]
+    async fn no_connection() {
+        let info_storage = RedisStorage::new("redis://unknonwn_url/0").await.unwrap();
+        let file_info = FileInfo::new_test();
+        let res = info_storage.set_info(&file_info, true).await;
+        assert!(res.is_err());
+    }
+
+    #[actix_rt::test]
+    async fn unknown_id() {
+        let info_storage = get_storage().await;
+        let res = info_storage
+            .get_info(uuid::Uuid::new_v4().to_string().as_str())
+            .await;
+        assert!(res.is_err());
+    }
+
+    #[actix_rt::test]
+    async fn deletion_success() {
+        let info_storage = get_storage().await;
+        let mut redis = get_redis().await;
+        let res = info_storage.remove_info("unknown").await;
+        assert!(res.is_err());
+        let file_info = FileInfo::new_test();
+        info_storage.set_info(&file_info, true).await.unwrap();
+        assert!(redis
+            .get::<&str, Option<String>>(file_info.id.as_str())
+            .await
+            .unwrap()
+            .is_some());
+        info_storage
+            .remove_info(file_info.id.as_str())
+            .await
+            .unwrap();
+        assert!(redis
+            .get::<&str, Option<String>>(file_info.id.as_str())
+            .await
+            .unwrap()
+            .is_none());
+    }
+}
diff --git a/src/notifiers/amqp_notifier.rs b/src/notifiers/amqp_notifier.rs
index 4177c61..a7088f9 100644
--- a/src/notifiers/amqp_notifier.rs
+++ b/src/notifiers/amqp_notifier.rs
@@ -1,5 +1,5 @@
 use crate::notifiers::{Hook, Notifier};
-use crate::{RustusConf, RustusResult};
+use crate::RustusResult;
 use actix_web::http::header::HeaderMap;
 use async_trait::async_trait;
 use lapin::options::{
@@ -15,23 +15,25 @@ use tokio_amqp::LapinTokioExt;
 pub struct AMQPNotifier {
     exchange_name: String,
     pool: Pool<RMQConnectionManager>,
+    queues_prefix: String,
 }
 
 impl AMQPNotifier {
-    pub fn new(app_conf: RustusConf) -> Self {
+    pub fn new(amqp_url: &str, exchange: &str, queues_prefix: &str) -> Self {
         let manager = RMQConnectionManager::new(
-            app_conf.notification_opts.hooks_amqp_url.unwrap(),
+            amqp_url.into(),
             ConnectionProperties::default().with_tokio(),
         );
         let pool = Pool::<RMQConnectionManager>::builder().build(manager);
         Self {
             pool,
-            exchange_name: app_conf.notification_opts.hooks_amqp_exchange,
+            exchange_name: exchange.into(),
+            queues_prefix: queues_prefix.into(),
         }
     }
 
-    pub fn get_queue_name(hook: Hook) -> String {
-        format!("rustus.{}", hook)
+    pub fn get_queue_name(&self, hook: Hook) -> String {
+        format!("{}.{}", self.queues_prefix.as_str(), hook)
     }
 }
 
@@ -47,7 +49,7 @@ impl Notifier for AMQPNotifier {
         )
         .await?;
         for hook in Hook::iter() {
-            let queue_name = Self::get_queue_name(hook);
+            let queue_name = self.get_queue_name(hook);
             chan.queue_declare(
                 queue_name.as_str(),
                 QueueDeclareOptions::default(),
@@ -73,7 +75,7 @@ impl Notifier for AMQPNotifier {
         _header_map: &HeaderMap,
     ) -> RustusResult<()> {
         let chan = self.pool.get().await?.create_channel().await?;
-        let queue = Self::get_queue_name(hook);
+        let queue = self.get_queue_name(hook);
         chan.basic_publish(
             self.exchange_name.as_str(),
             queue.as_str(),
@@ -85,3 +87,68 @@ impl Notifier for AMQPNotifier {
         Ok(())
     }
 }
+
+#[cfg(feature = "test_rmq")]
+#[cfg(test)]
+mod tests {
+    use super::AMQPNotifier;
+    use crate::notifiers::{Hook, Notifier};
+    use actix_web::http::header::HeaderMap;
+    use lapin::options::{BasicAckOptions, BasicGetOptions};
+
+    async fn get_notifier() -> AMQPNotifier {
+        let amqp_url = std::env::var("TEST_AMQP_URL").unwrap();
+        let mut notifier = AMQPNotifier::new(
+            amqp_url.as_str(),
+            uuid::Uuid::new_v4().to_string().as_str(),
+            uuid::Uuid::new_v4().to_string().as_str(),
+        );
+        notifier.prepare().await.unwrap();
+        notifier
+    }
+
+    #[actix_rt::test]
+    async fn success() {
+        let notifier = get_notifier().await;
+        let hook = Hook::PostCreate;
+        let test_msg = String::from("Test Message");
+        notifier
+            .send_message(test_msg.clone(), hook.clone(), &HeaderMap::new())
+            .await
+            .unwrap();
+        let chan = notifier
+            .pool
+            .get()
+            .await
+            .unwrap()
+            .create_channel()
+            .await
+            .unwrap();
+        let message = chan
+            .basic_get(
+                format!("{}.{}", notifier.queues_prefix.as_str(), hook).as_str(),
+                BasicGetOptions::default(),
+            )
+            .await
+            .unwrap();
+        assert!(message.is_some());
+        assert_eq!(
+            String::from_utf8(message.clone().unwrap().data.clone()).unwrap(),
+            test_msg
+        );
+        message
+            .unwrap()
+            .ack(BasicAckOptions::default())
+            .await
+            .unwrap();
+    }
+
+    #[actix_rt::test]
+    async fn unknown_url() {
+        let notifier = AMQPNotifier::new("http://unknown", "test", "test");
+        let res = notifier
+            .send_message("Test Message".into(), Hook::PostCreate, &HeaderMap::new())
+            .await;
+        assert!(res.is_err());
+    }
+}
diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs
index b01cc72..68c2ae9 100644
--- a/src/notifiers/models/notification_manager.rs
+++ b/src/notifiers/models/notification_manager.rs
@@ -15,43 +15,52 @@ pub struct NotificationManager {
 }
 
 impl NotificationManager {
-    pub async fn new(tus_config: &RustusConf) -> RustusResult<Self> {
+    pub async fn new(rustus_config: &RustusConf) -> RustusResult<Self> {
         let mut manager = Self {
             notifiers: Vec::new(),
         };
         debug!("Initializing notification manager.");
-        if tus_config.notification_opts.hooks_file.is_some() {
+        if rustus_config.notification_opts.hooks_file.is_some() {
             debug!("Found hooks file");
             manager.notifiers.push(Box::new(FileNotifier::new(
-                tus_config.notification_opts.hooks_file.clone().unwrap(),
+                rustus_config.notification_opts.hooks_file.clone().unwrap(),
             )));
         }
-        if tus_config.notification_opts.hooks_dir.is_some() {
+        if rustus_config.notification_opts.hooks_dir.is_some() {
             debug!("Found hooks directory");
             manager.notifiers.push(Box::new(DirNotifier::new(
-                tus_config.notification_opts.hooks_dir.clone().unwrap(),
+                rustus_config.notification_opts.hooks_dir.clone().unwrap(),
             )));
         }
         #[cfg(feature = "http_notifier")]
-        if !tus_config.notification_opts.hooks_http_urls.is_empty() {
+        if !rustus_config.notification_opts.hooks_http_urls.is_empty() {
             debug!("Found http hook urls.");
             manager
                 .notifiers
                 .push(Box::new(http_notifier::HttpNotifier::new(
-                    tus_config.notification_opts.hooks_http_urls.clone(),
-                    tus_config
+                    rustus_config.notification_opts.hooks_http_urls.clone(),
+                    rustus_config
                         .notification_opts
                         .hooks_http_proxy_headers
                         .clone(),
                 )));
         }
         #[cfg(feature = "amqp_notifier")]
-        if tus_config.notification_opts.hooks_amqp_url.is_some() {
+        if rustus_config.notification_opts.hooks_amqp_url.is_some() {
             debug!("Found AMQP notifier.");
             manager
                 .notifiers
                 .push(Box::new(amqp_notifier::AMQPNotifier::new(
-                    tus_config.clone(),
+                    rustus_config
+                        .notification_opts
+                        .hooks_amqp_url
+                        .as_ref()
+                        .unwrap(),
+                    rustus_config.notification_opts.hooks_amqp_exchange.as_str(),
+                    rustus_config
+                        .notification_opts
+                        .hooks_amqp_queues_prefix
+                        .as_str(),
                 )));
         }
         for notifier in &mut manager.notifiers.iter_mut() {
-- 
GitLab