From 2242aecc17394a8f35584a2adf1027d431c6cbba Mon Sep 17 00:00:00 2001 From: Pavel Kirilin <win10@list.ru> Date: Sun, 10 Apr 2022 14:09:01 +0400 Subject: [PATCH] Added Celery integration. (#74) Added Celery integration. This commit adds ability to connect Rustus to celery. Also it adds more options to adjust amqp hooks. Signed-off-by: Pavel Kirilin <win10@list.ru> --- README.md | 2 +- docs/configuration.md | 26 ++-- docs/hooks.md | 133 +++++++++++++++-- docs/index.md | 2 +- src/config.rs | 41 +++++- src/notifiers/amqp_notifier.rs | 141 +++++++++++++++---- src/notifiers/models/message_format.rs | 7 - src/notifiers/models/notification_manager.rs | 24 +++- 8 files changed, 308 insertions(+), 68 deletions(-) diff --git a/README.md b/README.md index d82157f..4065206 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ This implementation has several features to make usage as simple as possible. * It has a lot of hooks options, and hooks can be combined. * Highly configurable; -Please check out [docs](https://s3rius.github.io/rustus/) for more information about configuration and deploy. +Please check out [Documentation](https://s3rius.github.io/rustus/) for more information about configuration and deploy. ## Installation diff --git a/docs/configuration.md b/docs/configuration.md index 9f7bf21..e954b9e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -5,9 +5,6 @@ description: "How to configure Rusts" Rustus is highly configurable. You can adjust it with CLI or you can use environment variables. -!!! warning - Some options can be passed only through as CLI parameters - !!! info Information about hooks you can find on [Hooks page](../hooks). @@ -57,14 +54,13 @@ Also you can configure number of actix `workers` that handle connections. Currently only file storage is available, so if you pass to `--storage` parameter other than `file-storage` you will get an error. - Also you **can not** pass `--force-fsync` through environment variables. - - -`--storage` is a type of data storage to be used. +Available parameters: -`--data-dir` is a path to the directory where all files are stored. - -`--dir-structure` is a pattern of a directory structure inside data dir. +* `--storage` - type of data storage to be used; +* `--data-dir` - path to the directory where all files are stored; +* `--dir-structure` - pattern of a directory structure inside data dir; +* `--force-fsync` - calls fsync system call after every write to disk. +`` You can use variables within the pattern. Available variables: @@ -98,7 +94,7 @@ data === "CLI" ``` bash - rustus --force-fsync \ + rustus --force-fsync "yes" \ --storage "file-storage" \ --data-dir "./data/" \ --dir-structure "{year}/{month}/{day}" @@ -110,8 +106,9 @@ data export RUSTUS_STORAGE="file-storage" export RUSTUS_DATA_DIR="./data/" export RUSTUS_DIR_STRUCTURE="{year}/{month}/{day}" + export RUSTUS_FORCE_FSYNC="yes" - rustus --force-fsync + rustus ``` ## Configuring info storage @@ -237,7 +234,7 @@ By default all extensions are enabled. === "CLI" ``` bash - rustus --remove-parts \ + rustus --remove-parts "yes" \ --tus-extensions "getting,creation,termination,creation-with-upload,creation-defer-length,concatenation,checksum" ``` @@ -245,6 +242,7 @@ By default all extensions are enabled. ``` bash export RUSTUS_TUS_EXTENSIONS="getting,creation,termination,creation-with-upload,creation-defer-length,concatenation,checksum" + export RUSTUS_REMOVE_PARTS="yes" - rustus --remove-parts + rustus ``` diff --git a/docs/hooks.md b/docs/hooks.md index 56e3ff2..8eba60b 100644 --- a/docs/hooks.md +++ b/docs/hooks.md @@ -39,10 +39,10 @@ You can disable some hooks by using `--hooks` parameter. ``` -## Fomat +## Format -Information about every hook using `JSON` format. -Format can be configured using `--hooks-format` parameter or `RUSTUS_HOOKS_FORMAT` environment variable. +Information about every event is sent using `JSON` format. +Format can be configured with `--hooks-format` parameter or `RUSTUS_HOOKS_FORMAT` environment variable. Available formats: @@ -303,32 +303,137 @@ Configuration parameters: * `--hooks-amqp-url` - connection string to RabbitMQ; * `--hooks-amqp-queues-prefix` - prefix for queues for every event queue; -* `--hooks-amqp-exchange` - name of exchange to use. - -This hook will send every message in an exchange with routing keys -like queues names. - -Queues are named like `{prefix}.{event type}`. Eg `rustus.pre-create` and so on. +* `--hooks-amqp-exchange` - name of exchange to use; +* `--hooks-amqp-declare-exchange` - creates exchange on startup; +* `--hooks-amqp-exchange-kind` - kind of exchange to connect to; +* `--hooks-amqp-declare-queues` - creates all queues and binds them to exchange; +* `--hooks-amqp-durable-exchange` - adds durability to created exchange; +* `--hooks-amqp-durable-queues` - adds durability to created; +* `--hooks-amqp-celery` - adds headers required by [Celery](https://docs.celeryq.dev/en/stable/index.html); +* `--hooks-amqp-routing-key` - routing key for all messages passed to exchange. + +If no hooks_amqp_routing_key specified, rustus will send all messages with +different routing keys. Named like `{prefix}.{event type}`. Eg `rustus.pre-create` and so on. +Otherwise, it will use only one routing key and only one queue! !!! warning Since we can't really track message delivery and responses - Rustus doesn't stop in any case. + Rustus won't stop a current upload in any case. === "CLI" ``` bash rustus --hooks-amqp-url "amqp://guest:guest@localhost:5672" \ - --hooks-amqp-queues-prefix "rustus_queue" \ - --hooks-amqp-exchange "rustus" + --hooks-amqp-queues-prefix "rustus_prefix" \ + --hooks-amqp-exchange "rustus" \ + --hooks-amqp-exchange-kind "topic" \ + --hooks-amqp-routing-key "route66" \ + --hooks-amqp-declare-exchange "yes" \ + --hooks-amqp-declare-queues "yes" \ + --hooks-amqp-durable-exchange "yes" \ + --hooks-amqp-durable-queues "yes" \ + --hooks-amqp-celery "yes" ``` === "ENV" ``` bash export RUSTUS_HOOKS_AMQP_URL="amqp://guest:guest@localhost:5672" - export RUSTUS_HOOKS_AMQP_QUEUES_PREFIX="rustus_queue" + export RUSTUS_HOOKS_AMQP_QUEUES_PREFIX="rustus_prefix" export RUSTUS_HOOKS_AMQP_EXCHANGE="rustus" + export RUSTUS_HOOKS_AMQP_EXCHANGE_KIND="topic" + export RUSTUS_HOOKS_AMQP_ROUTING_KEY="route66" + export RUSTUS_HOOKS_AMQP_DECLARE_EXCHANGE="yes" + export RUSTUS_HOOKS_AMQP_DECLARE_QUEUES="yes" + export RUSTUS_HOOKS_AMQP_DURABLE_EXCHANGE="yes" + export RUSTUS_HOOKS_AMQP_DURABLE_QUEUES="yes" + export RUSTUS_HOOKS_AMQP_CELERY="yes" rustus - ``` \ No newline at end of file + ``` + +#### Using Rustus with Celery + +Rustus has a cool integration with [Celery](https://docs.celeryq.dev/en/stable/index.html). +Let's build a Celery application that handles rustus hooks. + +At first, we need to install Celery itself. +```bash +pip install celery +``` + +Now we can create a file called "celery.py" in a directory "rustus_celery". +This file contains code that handles celery tasks. + +```python title="rustus_celery/celery.py" +import celery + +app = celery.Celery("rustus_celery") +app.conf.update( + broker_url="amqp://guest:guest@localhost:5672", +) + + +@app.task(name="rustus.pre-create") +def post_create(data): + print(f"PRE CREATE: {data}") + + +@app.task(name="rustus.post-create") +def post_create(data): + print(f"POST CREATE: {data}") + + +@app.task(name="rustus.post-finish") +def post_finish(data): + print(f"POST FINISH: {data}") + + +@app.task(name="rustus.post-terminate") +def post_terminate(data): + print(f"POST TERMINATE: {data}") + + +@app.task(name="rustus.post-receive") +def post_recieve(data): + print(f"POST RECIEVE: {data}") +``` + +!!! info + Every task has its name. You must use these names + in order to handle tasks. + +Now we can run our celery worker to start executing tasks. + +``` +celery -A rustus_celery +``` + +After starting celery worker you can run Rustus with these +parameters. + +The most important parameter is `--hooks-amqp-celery`, because it +adds required by Celery headers to every message. + +=== "CLI" + + ``` bash + rustus --hooks-amqp-url "amqp://guest:guest@localhost:5672" \ + --hooks-amqp-exchange "celery" \ + --hooks-amqp-exchange-kind "direct" \ + --hooks-amqp-routing-key "celery" \ + --hooks-amqp-celery "yes" + ``` + +=== "ENV" + + ``` bash + export RUSTUS_HOOKS_AMQP_URL="amqp://guest:guest@localhost:5672" + export RUSTUS_HOOKS_AMQP_EXCHANGE="celery" + export RUSTUS_HOOKS_AMQP_EXCHANGE_KIND="direct" + export RUSTUS_HOOKS_AMQP_ROUTING_KEY="celery" + export RUSTUS_HOOKS_AMQP_CELERY="yes" + + rustus + ``` diff --git a/docs/index.md b/docs/index.md index f80c599..8a66816 100644 --- a/docs/index.md +++ b/docs/index.md @@ -15,7 +15,7 @@ description: Rustus docs Rustus is a [TUS](https://tus.io) protocol implementation that helps you handle file uploads. -This project has many features that makes it easy to integrate with your application. +This project has many features that make it easy to integrate with your application. ## Installation diff --git a/src/config.rs b/src/config.rs index ae38eaa..3daf87f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,7 +41,7 @@ pub struct StorageOptions { /// everything is written on disk correctly. /// /// In most cases this parameter is redundant. - #[structopt(long, parse(from_flag))] + #[structopt(long, env = "RUSTUS_FORCE_FSYNC")] pub force_fsync: bool, } @@ -88,6 +88,7 @@ pub struct InfoStoreOptions { } #[derive(StructOpt, Debug, Clone)] +#[allow(clippy::struct_excessive_bools)] pub struct NotificationsOptions { /// Notifications format. /// @@ -120,11 +121,47 @@ pub struct NotificationsOptions { #[structopt(long, env = "RUSTUS_HOOKS_AMQP_URL")] pub hooks_amqp_url: Option<String>, + /// Rustus will create exchange if enabled. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_DECLARE_EXCHANGE")] + pub hooks_amqp_declare_exchange: bool, + + /// Rustus will create all queues for communication and bind them + /// to exchange if enabled. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_DECLARE_QUEUES")] + pub hooks_amqp_declare_queues: bool, + + /// Durability type of exchange. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_DURABLE_EXCHANGE")] + pub hooks_amqp_durable_exchange: bool, + + /// Durability type of queues. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_DURABLE_QUEUES")] + pub hooks_amqp_durable_queues: bool, + + /// Adds celery specific headers. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_CELERY")] + pub hooks_amqp_celery: bool, + /// Name of amqp exchange. #[cfg(feature = "amqp_notifier")] #[structopt(long, env = "RUSTUS_HOOKS_AMQP_EXCHANGE", default_value = "rustus")] pub hooks_amqp_exchange: String, + /// Exchange kind. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_EXCHANGE_KIND", default_value = "topic")] + pub hooks_amqp_exchange_kind: String, + + /// Routing key to use when sending message to an exchange. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_ROUTING_KEY")] + pub hooks_amqp_routing_key: Option<String>, + /// Prefix for all AMQP queues. #[cfg(feature = "amqp_notifier")] #[structopt( @@ -199,7 +236,7 @@ pub struct RustusConf { /// By default rustus does nothing with part files after concatenation. /// /// This parameter is only needed if concatenation extension is enabled. - #[structopt(long, parse(from_flag))] + #[structopt(long, env = "RUSTUS_REMOVE_PARTS")] pub remove_parts: bool, #[structopt(flatten)] diff --git a/src/notifiers/amqp_notifier.rs b/src/notifiers/amqp_notifier.rs index 8810a3f..ea94fcc 100644 --- a/src/notifiers/amqp_notifier.rs +++ b/src/notifiers/amqp_notifier.rs @@ -6,21 +6,42 @@ use actix_web::http::header::HeaderMap; use async_trait::async_trait; use lapin::{ options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions}, - types::FieldTable, + types::{AMQPValue, FieldTable, LongString}, BasicProperties, ConnectionProperties, ExchangeKind, }; use mobc_lapin::{mobc::Pool, RMQConnectionManager}; use strum::IntoEnumIterator; use tokio_amqp::LapinTokioExt; +#[allow(clippy::struct_excessive_bools)] +pub struct DeclareOptions { + pub declare_exchange: bool, + pub durable_exchange: bool, + pub declare_queues: bool, + pub durable_queues: bool, +} + pub struct AMQPNotifier { exchange_name: String, pool: Pool<RMQConnectionManager>, queues_prefix: String, + exchange_kind: String, + routing_key: Option<String>, + declare_options: DeclareOptions, + celery: bool, } impl AMQPNotifier { - pub fn new(amqp_url: &str, exchange: &str, queues_prefix: &str) -> Self { + #[allow(clippy::fn_params_excessive_bools)] + pub fn new( + amqp_url: &str, + exchange: &str, + queues_prefix: &str, + exchange_kind: &str, + routing_key: Option<String>, + declare_options: DeclareOptions, + celery: bool, + ) -> Self { let manager = RMQConnectionManager::new( amqp_url.into(), ConnectionProperties::default().with_tokio(), @@ -28,13 +49,25 @@ impl AMQPNotifier { let pool = Pool::<RMQConnectionManager>::builder().build(manager); Self { pool, + celery, + routing_key, + declare_options, + exchange_kind: exchange_kind.into(), exchange_name: exchange.into(), queues_prefix: queues_prefix.into(), } } + /// Generate queue name based on hook type. + /// + /// If specific routing key is not empty, it returns it. + /// Otherwise it will generate queue name based on hook name. pub fn get_queue_name(&self, hook: Hook) -> String { - format!("{}.{}", self.queues_prefix.as_str(), hook) + if let Some(routing_key) = self.routing_key.as_ref() { + routing_key.into() + } else { + format!("{}.{}", self.queues_prefix.as_str(), hook) + } } } @@ -42,30 +75,40 @@ impl AMQPNotifier { impl Notifier for AMQPNotifier { async fn prepare(&mut self) -> RustusResult<()> { let chan = self.pool.get().await?.create_channel().await?; - chan.exchange_declare( - self.exchange_name.as_str(), - ExchangeKind::Topic, - ExchangeDeclareOptions::default(), - FieldTable::default(), - ) - .await?; - for hook in Hook::iter() { - let queue_name = self.get_queue_name(hook); - chan.queue_declare( - queue_name.as_str(), - QueueDeclareOptions::default(), - FieldTable::default(), - ) - .await?; - chan.queue_bind( - queue_name.as_str(), + if self.declare_options.declare_exchange { + chan.exchange_declare( self.exchange_name.as_str(), - queue_name.as_str(), - QueueBindOptions::default(), + ExchangeKind::Custom(self.exchange_kind.clone()), + ExchangeDeclareOptions { + durable: self.declare_options.durable_exchange, + ..ExchangeDeclareOptions::default() + }, FieldTable::default(), ) .await?; } + if self.declare_options.declare_queues { + for hook in Hook::iter() { + let queue_name = self.get_queue_name(hook); + chan.queue_declare( + queue_name.as_str(), + QueueDeclareOptions { + durable: self.declare_options.durable_queues, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await?; + chan.queue_bind( + queue_name.as_str(), + self.exchange_name.as_str(), + queue_name.as_str(), + QueueBindOptions::default(), + FieldTable::default(), + ) + .await?; + } + } Ok(()) } @@ -77,12 +120,32 @@ impl Notifier for AMQPNotifier { ) -> RustusResult<()> { let chan = self.pool.get().await?.create_channel().await?; let queue = self.get_queue_name(hook); + let routing_key = self.routing_key.as_ref().unwrap_or(&queue); + let payload = if self.celery { + format!("[[{}], {{}}, {{}}]", message).as_bytes().to_vec() + } else { + message.as_bytes().to_vec() + }; + let mut headers = FieldTable::default(); + if self.celery { + headers.insert( + "id".into(), + AMQPValue::LongString(LongString::from(uuid::Uuid::new_v4().to_string())), + ); + headers.insert( + "task".into(), + AMQPValue::LongString(LongString::from(format!("rustus.{}", hook))), + ); + } chan.basic_publish( self.exchange_name.as_str(), - queue.as_str(), + routing_key.as_str(), BasicPublishOptions::default(), - message.as_bytes().to_vec(), - BasicProperties::default().with_content_type("application/json".into()), + payload, + BasicProperties::default() + .with_headers(headers) + .with_content_type("application/json".into()) + .with_content_encoding("utf-8".into()), ) .await?; Ok(()) @@ -93,7 +156,7 @@ impl Notifier for AMQPNotifier { #[cfg(test)] mod tests { use super::AMQPNotifier; - use crate::notifiers::{Hook, Notifier}; + use crate::notifiers::{amqp_notifier::DeclareOptions, Hook, Notifier}; use actix_web::http::header::HeaderMap; use lapin::options::{BasicAckOptions, BasicGetOptions}; @@ -103,6 +166,15 @@ mod tests { amqp_url.as_str(), uuid::Uuid::new_v4().to_string().as_str(), uuid::Uuid::new_v4().to_string().as_str(), + "topic", + None, + DeclareOptions { + declare_exchange: true, + declare_queues: true, + durable_queues: false, + durable_exchange: false, + }, + true, ); notifier.prepare().await.unwrap(); notifier @@ -135,7 +207,7 @@ mod tests { assert!(message.is_some()); assert_eq!( String::from_utf8(message.clone().unwrap().data.clone()).unwrap(), - test_msg + format!("[[{}], {{}}, {{}}]", test_msg) ); message .unwrap() @@ -146,7 +218,20 @@ mod tests { #[actix_rt::test] async fn unknown_url() { - let notifier = AMQPNotifier::new("http://unknown", "test", "test"); + let notifier = AMQPNotifier::new( + "http://unknown", + "test", + "test", + "topic", + None, + DeclareOptions { + declare_exchange: false, + declare_queues: false, + durable_queues: false, + durable_exchange: false, + }, + false, + ); let res = notifier .send_message("Test Message".into(), Hook::PostCreate, &HeaderMap::new()) .await; diff --git a/src/notifiers/models/message_format.rs b/src/notifiers/models/message_format.rs index 9a00077..64f27d7 100644 --- a/src/notifiers/models/message_format.rs +++ b/src/notifiers/models/message_format.rs @@ -14,8 +14,6 @@ pub enum Format { Default, #[display(fmt = "tusd")] Tusd, - #[display(fmt = "celery")] - Celery, } from_str!(Format, "format"); @@ -25,7 +23,6 @@ impl Format { match self { Self::Default => default_format(request, file_info), Self::Tusd => tusd_format(request, file_info), - Self::Celery => celery_format(request, file_info), } } } @@ -157,7 +154,3 @@ pub fn tusd_format(request: &HttpRequest, file_info: &FileInfo) -> RustusResult< ); Ok(Value::Object(result_map).to_string()) } - -pub fn celery_format(_request: &HttpRequest, _file_info: &FileInfo) -> RustusResult<String> { - todo!() -} diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs index 441c99a..04a5530 100644 --- a/src/notifiers/models/notification_manager.rs +++ b/src/notifiers/models/notification_manager.rs @@ -4,7 +4,10 @@ use crate::notifiers::amqp_notifier; use crate::notifiers::http_notifier; use crate::{ errors::RustusResult, - notifiers::{dir_notifier::DirNotifier, file_notifier::FileNotifier, Hook, Notifier}, + notifiers::{ + amqp_notifier::DeclareOptions, dir_notifier::DirNotifier, file_notifier::FileNotifier, + Hook, Notifier, + }, RustusConf, }; use actix_web::http::header::HeaderMap; @@ -61,6 +64,25 @@ impl NotificationManager { .notification_opts .hooks_amqp_queues_prefix .as_str(), + rustus_config + .notification_opts + .hooks_amqp_exchange_kind + .as_str(), + rustus_config + .notification_opts + .hooks_amqp_routing_key + .clone(), + DeclareOptions { + declare_exchange: rustus_config + .notification_opts + .hooks_amqp_declare_exchange, + declare_queues: rustus_config.notification_opts.hooks_amqp_declare_queues, + durable_exchange: rustus_config + .notification_opts + .hooks_amqp_durable_exchange, + durable_queues: rustus_config.notification_opts.hooks_amqp_durable_queues, + }, + rustus_config.notification_opts.hooks_amqp_celery, ))); } for notifier in &mut manager.notifiers.iter_mut() { -- GitLab