diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 1a65f141496e8852187bd9528e0411480c550378..031c3c1946e1e8bc07785c394522636a15776afa 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -124,7 +124,16 @@ jobs: context: . push: true file: deploy/Dockerfile + target: base tags: s3rius/rustus:latest,s3rius/rustus:${{env.APP_VERSION}} + - name: Build and push rootless + uses: docker/build-push-action@v2 + with: + context: . + push: true + file: deploy/Dockerfile + target: rootless + tags: s3rius/rustus:${{env.APP_VERSION}}-rootless upload_helm: runs-on: ubuntu-latest @@ -208,6 +217,17 @@ jobs: context: . push: true file: deploy/alpine.Dockerfile - tags: s3rius/rustus:${{env.APP_VERSION}}-alpine + target: base + tags: s3rius/rustus:latest-alpine,s3rius/rustus:${{env.APP_VERSION}}-alpine + build-args: | + app_version=${{env.APP_VERSION}} + - name: Build and push + uses: docker/build-push-action@v2 + with: + context: . + push: true + file: deploy/alpine.Dockerfile + target: rootless + tags: s3rius/rustus:${{env.APP_VERSION}}-rootless-alpine build-args: | app_version=${{env.APP_VERSION}} diff --git a/Cargo.lock b/Cargo.lock index b7636517249376d030c1ac3aca38701996addf0b..fab2b00721db58ed049ce0644bbe73e47e8c7db9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2419,7 +2419,7 @@ dependencies = [ [[package]] name = "rustus" -version = "0.4.11" +version = "0.4.12" dependencies = [ "actix-files", "actix-rt", diff --git a/Cargo.toml b/Cargo.toml index 56dabcb73cd27a9ac6c7df9305af3b932f562ab6..224d6676dcef3e0f5bf050610bf236d5d46f3bae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rustus" -version = "0.4.11" +version = "0.4.12" edition = "2021" description = "TUS protocol implementation written in Rust." keywords = [ diff --git a/README.md b/README.md index d82157f3e6dca39f8b8ed9db3e824dae654bb740..406520600fc25918705bb74557c16a60b22954ed 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ This implementation has several features to make usage as simple as possible. * It has a lot of hooks options, and hooks can be combined. * Highly configurable; -Please check out [docs](https://s3rius.github.io/rustus/) for more information about configuration and deploy. +Please check out [Documentation](https://s3rius.github.io/rustus/) for more information about configuration and deploy. ## Installation diff --git a/deploy/Dockerfile b/deploy/Dockerfile index acd2de40abef2f59e3b7839c4ac7eacc6e8555e4..18726fdced231591fae4c4470428289fb6ef221d 100644 --- a/deploy/Dockerfile +++ b/deploy/Dockerfile @@ -11,9 +11,14 @@ COPY --from=planner /app/recipe.json recipe.json RUN cargo chef cook --release --features=all,metrics --recipe-path recipe.json # Build application COPY . . -RUN cargo build --release --bin rustus --features=all +RUN cargo build --release --bin rustus --features=all,metrics -FROM debian:bullseye-20211201-slim AS runtime -WORKDIR /app +FROM debian:bullseye-20211201-slim AS base COPY --from=builder /app/target/release/rustus /usr/local/bin/ ENTRYPOINT ["/usr/local/bin/rustus"] + +FROM base as rootless + +RUN useradd --create-home -u 1000 --user-group rustus +WORKDIR /home/rustus +USER rustus \ No newline at end of file diff --git a/deploy/alpine.Dockerfile b/deploy/alpine.Dockerfile index deb1275586ae45ec5589b63f3accbe2ff820d2ee..f2309ca81a1d100dd734eca05ac837586c412779 100644 --- a/deploy/alpine.Dockerfile +++ b/deploy/alpine.Dockerfile @@ -1,4 +1,4 @@ -FROM alpine:3.15.0 +FROM alpine:3.15.0 as base ARG app_version @@ -7,6 +7,11 @@ ADD "https://github.com/s3rius/rustus/releases/download/${app_version}/rustus-${ RUN tar xvf *.tar.gz RUN rm *.tar.gz RUN mv rustus /bin -WORKDIR /app -ENTRYPOINT ["/bin/rustus"] \ No newline at end of file +ENTRYPOINT ["/bin/rustus"] + +FROM base as rootless + +RUN adduser -u 1000 --disabled-password rustus +WORKDIR /home/rustus +USER rustus \ No newline at end of file diff --git a/deploy/helm/Chart.yaml b/deploy/helm/Chart.yaml index a39382bde94081f02b04d368116d9a6817d00a09..4c72aab0193ff6fe1453dc61b5971e490a225d3a 100644 --- a/deploy/helm/Chart.yaml +++ b/deploy/helm/Chart.yaml @@ -16,7 +16,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.0 +version: 0.2.0 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/deploy/helm/templates/servicemonitor.yaml b/deploy/helm/templates/servicemonitor.yaml new file mode 100644 index 0000000000000000000000000000000000000000..08d920236e3c6fb0b7bd3bff1991e77f0cd7db48 --- /dev/null +++ b/deploy/helm/templates/servicemonitor.yaml @@ -0,0 +1,34 @@ +{{- if .Values.service_monitor.enabled }} +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: {{ include "rustus.fullname" . }}-monitor + labels: + {{- include "rustus.labels" . | nindent 4 }} + {{- if .Values.service_monitor.additionalLabels }} + {{- .Values.service_monitor.additionalLabels | toYaml | nindent 4 }} + {{- end }} +spec: + endpoints: + - port: http + {{- if .Values.service_monitor.interval }} + interval: {{ .Values.service_monitor.interval }} + {{- end }} + {{- if .Values.service_monitor.scrapeTimeout }} + scrapeTimeout: {{ .Values.service_monitor.scrapeTimeout }} + {{- end }} + {{- if .Values.service_monitor.honorLabels }} + honorLabels: {{ .Values.service_monitor.honorLabels }} + {{- end }} + {{- if .Values.service_monitor.relabellings }} + relabelings: {{- toYaml .Values.service_monitor.relabellings | nindent 6 }} + {{- end }} + {{- if .Values.service_monitor.metricRelabelings }} + metricRelabelings: {{- toYaml .Values.service_monitor.metricRelabelings | nindent 6 }} + {{- end }} + namespaceSelector: + matchNames: + - {{ .Release.Namespace }} + selector: + matchLabels: {{- include "rustus.selectorLabels" . | nindent 6 }} +{{- end }} \ No newline at end of file diff --git a/deploy/helm/values.yaml b/deploy/helm/values.yaml index a062c48f8467c8e97f5085561e5bebc78aefffb7..643881640a5374b9fe625350c403b28e3ce26c33 100644 --- a/deploy/helm/values.yaml +++ b/deploy/helm/values.yaml @@ -178,3 +178,17 @@ mysql: database: rustus username: rustus password: rustus + + +# Configuration for prometheus operator's ServiceMonitor. +# You can read more about operator and custom resources +# here: https://operatorhub.io/operator/prometheus +service_monitor: + enabled: false + + additionalLabels: {} + interval: 5s + scrapeTimeout: "" + honorLabels: "" + relabellings: {} + metricRelabelings: {} \ No newline at end of file diff --git a/docs/configuration.md b/docs/configuration.md index 9f7bf213ea93f0524269edabde783dd072b0a286..e954b9e211029f802655960eba19bb35ef43b7d3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -5,9 +5,6 @@ description: "How to configure Rusts" Rustus is highly configurable. You can adjust it with CLI or you can use environment variables. -!!! warning - Some options can be passed only through as CLI parameters - !!! info Information about hooks you can find on [Hooks page](../hooks). @@ -57,14 +54,13 @@ Also you can configure number of actix `workers` that handle connections. Currently only file storage is available, so if you pass to `--storage` parameter other than `file-storage` you will get an error. - Also you **can not** pass `--force-fsync` through environment variables. - - -`--storage` is a type of data storage to be used. +Available parameters: -`--data-dir` is a path to the directory where all files are stored. - -`--dir-structure` is a pattern of a directory structure inside data dir. +* `--storage` - type of data storage to be used; +* `--data-dir` - path to the directory where all files are stored; +* `--dir-structure` - pattern of a directory structure inside data dir; +* `--force-fsync` - calls fsync system call after every write to disk. +`` You can use variables within the pattern. Available variables: @@ -98,7 +94,7 @@ data === "CLI" ``` bash - rustus --force-fsync \ + rustus --force-fsync "yes" \ --storage "file-storage" \ --data-dir "./data/" \ --dir-structure "{year}/{month}/{day}" @@ -110,8 +106,9 @@ data export RUSTUS_STORAGE="file-storage" export RUSTUS_DATA_DIR="./data/" export RUSTUS_DIR_STRUCTURE="{year}/{month}/{day}" + export RUSTUS_FORCE_FSYNC="yes" - rustus --force-fsync + rustus ``` ## Configuring info storage @@ -237,7 +234,7 @@ By default all extensions are enabled. === "CLI" ``` bash - rustus --remove-parts \ + rustus --remove-parts "yes" \ --tus-extensions "getting,creation,termination,creation-with-upload,creation-defer-length,concatenation,checksum" ``` @@ -245,6 +242,7 @@ By default all extensions are enabled. ``` bash export RUSTUS_TUS_EXTENSIONS="getting,creation,termination,creation-with-upload,creation-defer-length,concatenation,checksum" + export RUSTUS_REMOVE_PARTS="yes" - rustus --remove-parts + rustus ``` diff --git a/docs/hooks.md b/docs/hooks.md index 56e3ff2397806c60ca65e7f2228fa06eef09c6ea..8eba60b2ebb1b4c1acae1dbbafe877448a68325e 100644 --- a/docs/hooks.md +++ b/docs/hooks.md @@ -39,10 +39,10 @@ You can disable some hooks by using `--hooks` parameter. ``` -## Fomat +## Format -Information about every hook using `JSON` format. -Format can be configured using `--hooks-format` parameter or `RUSTUS_HOOKS_FORMAT` environment variable. +Information about every event is sent using `JSON` format. +Format can be configured with `--hooks-format` parameter or `RUSTUS_HOOKS_FORMAT` environment variable. Available formats: @@ -303,32 +303,137 @@ Configuration parameters: * `--hooks-amqp-url` - connection string to RabbitMQ; * `--hooks-amqp-queues-prefix` - prefix for queues for every event queue; -* `--hooks-amqp-exchange` - name of exchange to use. - -This hook will send every message in an exchange with routing keys -like queues names. - -Queues are named like `{prefix}.{event type}`. Eg `rustus.pre-create` and so on. +* `--hooks-amqp-exchange` - name of exchange to use; +* `--hooks-amqp-declare-exchange` - creates exchange on startup; +* `--hooks-amqp-exchange-kind` - kind of exchange to connect to; +* `--hooks-amqp-declare-queues` - creates all queues and binds them to exchange; +* `--hooks-amqp-durable-exchange` - adds durability to created exchange; +* `--hooks-amqp-durable-queues` - adds durability to created; +* `--hooks-amqp-celery` - adds headers required by [Celery](https://docs.celeryq.dev/en/stable/index.html); +* `--hooks-amqp-routing-key` - routing key for all messages passed to exchange. + +If no hooks_amqp_routing_key specified, rustus will send all messages with +different routing keys. Named like `{prefix}.{event type}`. Eg `rustus.pre-create` and so on. +Otherwise, it will use only one routing key and only one queue! !!! warning Since we can't really track message delivery and responses - Rustus doesn't stop in any case. + Rustus won't stop a current upload in any case. === "CLI" ``` bash rustus --hooks-amqp-url "amqp://guest:guest@localhost:5672" \ - --hooks-amqp-queues-prefix "rustus_queue" \ - --hooks-amqp-exchange "rustus" + --hooks-amqp-queues-prefix "rustus_prefix" \ + --hooks-amqp-exchange "rustus" \ + --hooks-amqp-exchange-kind "topic" \ + --hooks-amqp-routing-key "route66" \ + --hooks-amqp-declare-exchange "yes" \ + --hooks-amqp-declare-queues "yes" \ + --hooks-amqp-durable-exchange "yes" \ + --hooks-amqp-durable-queues "yes" \ + --hooks-amqp-celery "yes" ``` === "ENV" ``` bash export RUSTUS_HOOKS_AMQP_URL="amqp://guest:guest@localhost:5672" - export RUSTUS_HOOKS_AMQP_QUEUES_PREFIX="rustus_queue" + export RUSTUS_HOOKS_AMQP_QUEUES_PREFIX="rustus_prefix" export RUSTUS_HOOKS_AMQP_EXCHANGE="rustus" + export RUSTUS_HOOKS_AMQP_EXCHANGE_KIND="topic" + export RUSTUS_HOOKS_AMQP_ROUTING_KEY="route66" + export RUSTUS_HOOKS_AMQP_DECLARE_EXCHANGE="yes" + export RUSTUS_HOOKS_AMQP_DECLARE_QUEUES="yes" + export RUSTUS_HOOKS_AMQP_DURABLE_EXCHANGE="yes" + export RUSTUS_HOOKS_AMQP_DURABLE_QUEUES="yes" + export RUSTUS_HOOKS_AMQP_CELERY="yes" rustus - ``` \ No newline at end of file + ``` + +#### Using Rustus with Celery + +Rustus has a cool integration with [Celery](https://docs.celeryq.dev/en/stable/index.html). +Let's build a Celery application that handles rustus hooks. + +At first, we need to install Celery itself. +```bash +pip install celery +``` + +Now we can create a file called "celery.py" in a directory "rustus_celery". +This file contains code that handles celery tasks. + +```python title="rustus_celery/celery.py" +import celery + +app = celery.Celery("rustus_celery") +app.conf.update( + broker_url="amqp://guest:guest@localhost:5672", +) + + +@app.task(name="rustus.pre-create") +def post_create(data): + print(f"PRE CREATE: {data}") + + +@app.task(name="rustus.post-create") +def post_create(data): + print(f"POST CREATE: {data}") + + +@app.task(name="rustus.post-finish") +def post_finish(data): + print(f"POST FINISH: {data}") + + +@app.task(name="rustus.post-terminate") +def post_terminate(data): + print(f"POST TERMINATE: {data}") + + +@app.task(name="rustus.post-receive") +def post_recieve(data): + print(f"POST RECIEVE: {data}") +``` + +!!! info + Every task has its name. You must use these names + in order to handle tasks. + +Now we can run our celery worker to start executing tasks. + +``` +celery -A rustus_celery +``` + +After starting celery worker you can run Rustus with these +parameters. + +The most important parameter is `--hooks-amqp-celery`, because it +adds required by Celery headers to every message. + +=== "CLI" + + ``` bash + rustus --hooks-amqp-url "amqp://guest:guest@localhost:5672" \ + --hooks-amqp-exchange "celery" \ + --hooks-amqp-exchange-kind "direct" \ + --hooks-amqp-routing-key "celery" \ + --hooks-amqp-celery "yes" + ``` + +=== "ENV" + + ``` bash + export RUSTUS_HOOKS_AMQP_URL="amqp://guest:guest@localhost:5672" + export RUSTUS_HOOKS_AMQP_EXCHANGE="celery" + export RUSTUS_HOOKS_AMQP_EXCHANGE_KIND="direct" + export RUSTUS_HOOKS_AMQP_ROUTING_KEY="celery" + export RUSTUS_HOOKS_AMQP_CELERY="yes" + + rustus + ``` diff --git a/docs/index.md b/docs/index.md index f80c599b0ba488de59649b2647b7469624d9edca..8a66816b2b7706f7a4caca862ed05699294407f3 100644 --- a/docs/index.md +++ b/docs/index.md @@ -15,7 +15,7 @@ description: Rustus docs Rustus is a [TUS](https://tus.io) protocol implementation that helps you handle file uploads. -This project has many features that makes it easy to integrate with your application. +This project has many features that make it easy to integrate with your application. ## Installation diff --git a/src/config.rs b/src/config.rs index ae38eaa1b8800262d71fcf95d29069057812e2bd..3daf87faef435ace93d25d6a85d2549df007539b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,7 +41,7 @@ pub struct StorageOptions { /// everything is written on disk correctly. /// /// In most cases this parameter is redundant. - #[structopt(long, parse(from_flag))] + #[structopt(long, env = "RUSTUS_FORCE_FSYNC")] pub force_fsync: bool, } @@ -88,6 +88,7 @@ pub struct InfoStoreOptions { } #[derive(StructOpt, Debug, Clone)] +#[allow(clippy::struct_excessive_bools)] pub struct NotificationsOptions { /// Notifications format. /// @@ -120,11 +121,47 @@ pub struct NotificationsOptions { #[structopt(long, env = "RUSTUS_HOOKS_AMQP_URL")] pub hooks_amqp_url: Option<String>, + /// Rustus will create exchange if enabled. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_DECLARE_EXCHANGE")] + pub hooks_amqp_declare_exchange: bool, + + /// Rustus will create all queues for communication and bind them + /// to exchange if enabled. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_DECLARE_QUEUES")] + pub hooks_amqp_declare_queues: bool, + + /// Durability type of exchange. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_DURABLE_EXCHANGE")] + pub hooks_amqp_durable_exchange: bool, + + /// Durability type of queues. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_DURABLE_QUEUES")] + pub hooks_amqp_durable_queues: bool, + + /// Adds celery specific headers. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_CELERY")] + pub hooks_amqp_celery: bool, + /// Name of amqp exchange. #[cfg(feature = "amqp_notifier")] #[structopt(long, env = "RUSTUS_HOOKS_AMQP_EXCHANGE", default_value = "rustus")] pub hooks_amqp_exchange: String, + /// Exchange kind. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_EXCHANGE_KIND", default_value = "topic")] + pub hooks_amqp_exchange_kind: String, + + /// Routing key to use when sending message to an exchange. + #[cfg(feature = "amqp_notifier")] + #[structopt(long, env = "RUSTUS_HOOKS_AMQP_ROUTING_KEY")] + pub hooks_amqp_routing_key: Option<String>, + /// Prefix for all AMQP queues. #[cfg(feature = "amqp_notifier")] #[structopt( @@ -199,7 +236,7 @@ pub struct RustusConf { /// By default rustus does nothing with part files after concatenation. /// /// This parameter is only needed if concatenation extension is enabled. - #[structopt(long, parse(from_flag))] + #[structopt(long, env = "RUSTUS_REMOVE_PARTS")] pub remove_parts: bool, #[structopt(flatten)] diff --git a/src/notifiers/amqp_notifier.rs b/src/notifiers/amqp_notifier.rs index 8810a3f76ad9e00ad5fd755bf3b6f96e13946ae4..ea94fccab2be27c1afce9f72c58018ef3bc9388b 100644 --- a/src/notifiers/amqp_notifier.rs +++ b/src/notifiers/amqp_notifier.rs @@ -6,21 +6,42 @@ use actix_web::http::header::HeaderMap; use async_trait::async_trait; use lapin::{ options::{BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions}, - types::FieldTable, + types::{AMQPValue, FieldTable, LongString}, BasicProperties, ConnectionProperties, ExchangeKind, }; use mobc_lapin::{mobc::Pool, RMQConnectionManager}; use strum::IntoEnumIterator; use tokio_amqp::LapinTokioExt; +#[allow(clippy::struct_excessive_bools)] +pub struct DeclareOptions { + pub declare_exchange: bool, + pub durable_exchange: bool, + pub declare_queues: bool, + pub durable_queues: bool, +} + pub struct AMQPNotifier { exchange_name: String, pool: Pool<RMQConnectionManager>, queues_prefix: String, + exchange_kind: String, + routing_key: Option<String>, + declare_options: DeclareOptions, + celery: bool, } impl AMQPNotifier { - pub fn new(amqp_url: &str, exchange: &str, queues_prefix: &str) -> Self { + #[allow(clippy::fn_params_excessive_bools)] + pub fn new( + amqp_url: &str, + exchange: &str, + queues_prefix: &str, + exchange_kind: &str, + routing_key: Option<String>, + declare_options: DeclareOptions, + celery: bool, + ) -> Self { let manager = RMQConnectionManager::new( amqp_url.into(), ConnectionProperties::default().with_tokio(), @@ -28,13 +49,25 @@ impl AMQPNotifier { let pool = Pool::<RMQConnectionManager>::builder().build(manager); Self { pool, + celery, + routing_key, + declare_options, + exchange_kind: exchange_kind.into(), exchange_name: exchange.into(), queues_prefix: queues_prefix.into(), } } + /// Generate queue name based on hook type. + /// + /// If specific routing key is not empty, it returns it. + /// Otherwise it will generate queue name based on hook name. pub fn get_queue_name(&self, hook: Hook) -> String { - format!("{}.{}", self.queues_prefix.as_str(), hook) + if let Some(routing_key) = self.routing_key.as_ref() { + routing_key.into() + } else { + format!("{}.{}", self.queues_prefix.as_str(), hook) + } } } @@ -42,30 +75,40 @@ impl AMQPNotifier { impl Notifier for AMQPNotifier { async fn prepare(&mut self) -> RustusResult<()> { let chan = self.pool.get().await?.create_channel().await?; - chan.exchange_declare( - self.exchange_name.as_str(), - ExchangeKind::Topic, - ExchangeDeclareOptions::default(), - FieldTable::default(), - ) - .await?; - for hook in Hook::iter() { - let queue_name = self.get_queue_name(hook); - chan.queue_declare( - queue_name.as_str(), - QueueDeclareOptions::default(), - FieldTable::default(), - ) - .await?; - chan.queue_bind( - queue_name.as_str(), + if self.declare_options.declare_exchange { + chan.exchange_declare( self.exchange_name.as_str(), - queue_name.as_str(), - QueueBindOptions::default(), + ExchangeKind::Custom(self.exchange_kind.clone()), + ExchangeDeclareOptions { + durable: self.declare_options.durable_exchange, + ..ExchangeDeclareOptions::default() + }, FieldTable::default(), ) .await?; } + if self.declare_options.declare_queues { + for hook in Hook::iter() { + let queue_name = self.get_queue_name(hook); + chan.queue_declare( + queue_name.as_str(), + QueueDeclareOptions { + durable: self.declare_options.durable_queues, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await?; + chan.queue_bind( + queue_name.as_str(), + self.exchange_name.as_str(), + queue_name.as_str(), + QueueBindOptions::default(), + FieldTable::default(), + ) + .await?; + } + } Ok(()) } @@ -77,12 +120,32 @@ impl Notifier for AMQPNotifier { ) -> RustusResult<()> { let chan = self.pool.get().await?.create_channel().await?; let queue = self.get_queue_name(hook); + let routing_key = self.routing_key.as_ref().unwrap_or(&queue); + let payload = if self.celery { + format!("[[{}], {{}}, {{}}]", message).as_bytes().to_vec() + } else { + message.as_bytes().to_vec() + }; + let mut headers = FieldTable::default(); + if self.celery { + headers.insert( + "id".into(), + AMQPValue::LongString(LongString::from(uuid::Uuid::new_v4().to_string())), + ); + headers.insert( + "task".into(), + AMQPValue::LongString(LongString::from(format!("rustus.{}", hook))), + ); + } chan.basic_publish( self.exchange_name.as_str(), - queue.as_str(), + routing_key.as_str(), BasicPublishOptions::default(), - message.as_bytes().to_vec(), - BasicProperties::default().with_content_type("application/json".into()), + payload, + BasicProperties::default() + .with_headers(headers) + .with_content_type("application/json".into()) + .with_content_encoding("utf-8".into()), ) .await?; Ok(()) @@ -93,7 +156,7 @@ impl Notifier for AMQPNotifier { #[cfg(test)] mod tests { use super::AMQPNotifier; - use crate::notifiers::{Hook, Notifier}; + use crate::notifiers::{amqp_notifier::DeclareOptions, Hook, Notifier}; use actix_web::http::header::HeaderMap; use lapin::options::{BasicAckOptions, BasicGetOptions}; @@ -103,6 +166,15 @@ mod tests { amqp_url.as_str(), uuid::Uuid::new_v4().to_string().as_str(), uuid::Uuid::new_v4().to_string().as_str(), + "topic", + None, + DeclareOptions { + declare_exchange: true, + declare_queues: true, + durable_queues: false, + durable_exchange: false, + }, + true, ); notifier.prepare().await.unwrap(); notifier @@ -135,7 +207,7 @@ mod tests { assert!(message.is_some()); assert_eq!( String::from_utf8(message.clone().unwrap().data.clone()).unwrap(), - test_msg + format!("[[{}], {{}}, {{}}]", test_msg) ); message .unwrap() @@ -146,7 +218,20 @@ mod tests { #[actix_rt::test] async fn unknown_url() { - let notifier = AMQPNotifier::new("http://unknown", "test", "test"); + let notifier = AMQPNotifier::new( + "http://unknown", + "test", + "test", + "topic", + None, + DeclareOptions { + declare_exchange: false, + declare_queues: false, + durable_queues: false, + durable_exchange: false, + }, + false, + ); let res = notifier .send_message("Test Message".into(), Hook::PostCreate, &HeaderMap::new()) .await; diff --git a/src/notifiers/models/message_format.rs b/src/notifiers/models/message_format.rs index 9a00077e20f0f4ffaef3594de2efb0e5d89f5b98..64f27d7dfcae29d00dcda21b2f0c460c2adb4e33 100644 --- a/src/notifiers/models/message_format.rs +++ b/src/notifiers/models/message_format.rs @@ -14,8 +14,6 @@ pub enum Format { Default, #[display(fmt = "tusd")] Tusd, - #[display(fmt = "celery")] - Celery, } from_str!(Format, "format"); @@ -25,7 +23,6 @@ impl Format { match self { Self::Default => default_format(request, file_info), Self::Tusd => tusd_format(request, file_info), - Self::Celery => celery_format(request, file_info), } } } @@ -157,7 +154,3 @@ pub fn tusd_format(request: &HttpRequest, file_info: &FileInfo) -> RustusResult< ); Ok(Value::Object(result_map).to_string()) } - -pub fn celery_format(_request: &HttpRequest, _file_info: &FileInfo) -> RustusResult<String> { - todo!() -} diff --git a/src/notifiers/models/notification_manager.rs b/src/notifiers/models/notification_manager.rs index 441c99a389f1781db1ca739acd05842841709809..04a5530387d0af5e5ae494abfcbf55be34c205c7 100644 --- a/src/notifiers/models/notification_manager.rs +++ b/src/notifiers/models/notification_manager.rs @@ -4,7 +4,10 @@ use crate::notifiers::amqp_notifier; use crate::notifiers::http_notifier; use crate::{ errors::RustusResult, - notifiers::{dir_notifier::DirNotifier, file_notifier::FileNotifier, Hook, Notifier}, + notifiers::{ + amqp_notifier::DeclareOptions, dir_notifier::DirNotifier, file_notifier::FileNotifier, + Hook, Notifier, + }, RustusConf, }; use actix_web::http::header::HeaderMap; @@ -61,6 +64,25 @@ impl NotificationManager { .notification_opts .hooks_amqp_queues_prefix .as_str(), + rustus_config + .notification_opts + .hooks_amqp_exchange_kind + .as_str(), + rustus_config + .notification_opts + .hooks_amqp_routing_key + .clone(), + DeclareOptions { + declare_exchange: rustus_config + .notification_opts + .hooks_amqp_declare_exchange, + declare_queues: rustus_config.notification_opts.hooks_amqp_declare_queues, + durable_exchange: rustus_config + .notification_opts + .hooks_amqp_durable_exchange, + durable_queues: rustus_config.notification_opts.hooks_amqp_durable_queues, + }, + rustus_config.notification_opts.hooks_amqp_celery, ))); } for notifier in &mut manager.notifiers.iter_mut() {