From 4cc4ce3b89d344825416d2d2559322ae6f048b36 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin <win10@list.ru> Date: Tue, 21 Dec 2021 01:50:18 +0400 Subject: [PATCH] Added redis info storage. Signed-off-by: Pavel Kirilin <win10@list.ru> --- Cargo.lock | 158 ++++++++++++++++++++++-- Cargo.toml | 3 +- README.md | 14 +++ src/config.rs | 3 +- src/errors.rs | 4 + src/info_storages/db_info_storage.rs | 31 ++--- src/info_storages/mod.rs | 23 ++-- src/info_storages/redis_info_storage.rs | 64 ++++++++++ src/storages/file_storage.rs | 4 +- 9 files changed, 257 insertions(+), 47 deletions(-) create mode 100644 src/info_storages/redis_info_storage.rs diff --git a/Cargo.lock b/Cargo.lock index 77fde66..9b594b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,8 +14,8 @@ dependencies = [ "futures-sink", "log", "pin-project 0.4.28", - "tokio", - "tokio-util", + "tokio 0.2.25", + "tokio-util 0.3.1", ] [[package]] @@ -139,7 +139,7 @@ dependencies = [ "futures-channel", "futures-util", "smallvec", - "tokio", + "tokio 0.2.25", ] [[package]] @@ -155,7 +155,7 @@ dependencies = [ "futures-channel", "futures-util", "log", - "mio", + "mio 0.6.23", "mio-uds", "num_cpus", "slab", @@ -741,6 +741,20 @@ dependencies = [ "vec_map", ] +[[package]] +name = "combine" +version = "4.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b2f5d0ee456f3928812dfc8c6d9a1d592b98678f6d56db9b0cd2b7bc6c8db5" +dependencies = [ + "bytes 1.1.0", + "futures-core", + "memchr", + "pin-project-lite 0.2.7", + "tokio 1.15.0", + "tokio-util 0.6.9", +] + [[package]] name = "concurrent-queue" version = "1.2.2" @@ -929,6 +943,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "dtoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" + [[package]] name = "either" version = "1.6.1" @@ -1123,6 +1143,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.17" @@ -1221,8 +1247,8 @@ dependencies = [ "http", "indexmap", "slab", - "tokio", - "tokio-util", + "tokio 0.2.25", + "tokio-util 0.3.1", "tracing", "tracing-futures", ] @@ -1583,12 +1609,25 @@ dependencies = [ "kernel32-sys", "libc", "log", - "miow", + "miow 0.2.2", "net2", "slab", "winapi 0.2.8", ] +[[package]] +name = "mio" +version = "0.7.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" +dependencies = [ + "libc", + "log", + "miow 0.3.7", + "ntapi", + "winapi 0.3.9", +] + [[package]] name = "mio-uds" version = "0.6.8" @@ -1597,7 +1636,7 @@ checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" dependencies = [ "iovec", "libc", - "mio", + "mio 0.6.23", ] [[package]] @@ -1612,6 +1651,41 @@ dependencies = [ "ws2_32-sys", ] +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi 0.3.9", +] + +[[package]] +name = "mobc" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f76d2f2e2dcbb00a8d3b2b09f026a74a82693ea52cd071647aa6cfa7f1ff37e" +dependencies = [ + "async-std", + "async-trait", + "futures-channel", + "futures-core", + "futures-timer", + "futures-util", + "log", + "tokio 1.15.0", +] + +[[package]] +name = "mobc-redis" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7b5db77b37c9224d5b9949b214041ea3e1c15b6b1e5dd24a5acb8e73975d6d6" +dependencies = [ + "mobc", + "redis", +] + [[package]] name = "net2" version = "0.2.37" @@ -1644,6 +1718,15 @@ dependencies = [ "version_check 0.9.3", ] +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "num-bigint" version = "0.3.3" @@ -2178,6 +2261,27 @@ dependencies = [ "uuid 0.8.2", ] +[[package]] +name = "redis" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a6ddfecac9391fed21cce10e83c65fa4abafd77df05c98b1c647c65374ce9b3" +dependencies = [ + "async-std", + "async-trait", + "bytes 1.1.0", + "combine", + "dtoa", + "futures-util", + "itoa 0.4.8", + "percent-encoding", + "pin-project-lite 0.2.7", + "sha1", + "tokio 1.15.0", + "tokio-util 0.6.9", + "url", +] + [[package]] name = "redox_syscall" version = "0.1.57" @@ -2330,6 +2434,7 @@ dependencies = [ "chrono", "derive_more", "log", + "mobc-redis", "rbatis", "rbson", "serde", @@ -2907,7 +3012,7 @@ dependencies = [ "lazy_static", "libc", "memchr", - "mio", + "mio 0.6.23", "mio-uds", "pin-project-lite 0.1.12", "signal-hook-registry", @@ -2915,6 +3020,21 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "tokio" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbbf1c778ec206785635ce8ad57fe52b3009ae9e0c9f574a728f3049d3e55838" +dependencies = [ + "bytes 1.1.0", + "libc", + "memchr", + "mio 0.7.14", + "num_cpus", + "pin-project-lite 0.2.7", + "winapi 0.3.9", +] + [[package]] name = "tokio-util" version = "0.3.1" @@ -2926,7 +3046,21 @@ dependencies = [ "futures-sink", "log", "pin-project-lite 0.1.12", - "tokio", + "tokio 0.2.25", +] + +[[package]] +name = "tokio-util" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" +dependencies = [ + "bytes 1.1.0", + "futures-core", + "futures-sink", + "log", + "pin-project-lite 0.2.7", + "tokio 1.15.0", ] [[package]] @@ -2976,7 +3110,7 @@ dependencies = [ "rand 0.7.3", "smallvec", "thiserror", - "tokio", + "tokio 0.2.25", "url", ] @@ -2995,7 +3129,7 @@ dependencies = [ "resolv-conf", "smallvec", "thiserror", - "tokio", + "tokio 0.2.25", "trust-dns-proto", ] diff --git a/Cargo.toml b/Cargo.toml index 2a4bec1..c516f8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,4 +26,5 @@ base64 = "^0.13.0" simple-logging = { version = "^2.0.2" } strfmt = "^0.1.6" rbson = "2.0" -rbatis = { version = "^3.0", default-features = false, features = ["runtime-async-std-rustls", "all-database"] } \ No newline at end of file +rbatis = { version = "^3.0", default-features = false, features = ["runtime-async-std-rustls", "all-database"] } +mobc-redis = { version = "0.7.0", features = ["async-std-comp"] } \ No newline at end of file diff --git a/README.md b/README.md index fa5f8f3..9539832 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,18 @@ This implementation has several features to make usage as simple as possible. * You can specify directory structure to organize your uploads; * Highly configurable; +### Supported info storages + +* FileSystem +* PostgresSQL +* Mysql +* SQLite +* Redis + +### Supported data storages + +* FileSystem + ## Installation Since I haven't configured build automation yet, you can build it @@ -55,8 +67,10 @@ All options are listed in `rustus --help`. * [x] Termination extension; * [x] Route to get uploaded files; * [x] Database support for info storage; +* [x] Redis support for info storage; * [ ] S3 as data storage store support; * [ ] Notification interface; +* [ ] Executable files notifications; * [ ] Notifications via http hooks; * [ ] Notifications via RabbitMQ; * [ ] Rustus helm chart; diff --git a/src/config.rs b/src/config.rs index 8040419..bbb6232 100644 --- a/src/config.rs +++ b/src/config.rs @@ -57,6 +57,7 @@ pub struct InfoStoreOptions { #[structopt( long, required_if("info-storage", "db_info_storage"), + required_if("info-storage", "redis_info_storage"), env = "RUSTUS_INFO_DB_DSN" )] pub info_db_dsn: Option<String>, @@ -107,7 +108,7 @@ pub struct RustusConf { /// Enabled extensions for TUS protocol. #[structopt( long, - default_value = "getting,creation,creation-with-upload,creation-defer-length", + default_value = "getting,creation,termination,creation-with-upload,creation-defer-length", env = "RUSTUS_EXTENSIONS" )] pub extensions: String, diff --git a/src/errors.rs b/src/errors.rs index 398176f..76c022e 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -20,6 +20,10 @@ pub enum RustusError { UnableToSerialize(#[from] serde_json::Error), #[error("Database error: {0}")] DatabaseError(#[from] rbatis::error::Error), + #[error("Redis error: {0}")] + RedisError(#[from] mobc_redis::redis::RedisError), + #[error("Redis error: {0}")] + MobcError(#[from] mobc_redis::mobc::Error<mobc_redis::redis::RedisError>), #[error("Unable to get file information")] UnableToReadInfo, #[error("Unable to write file {0}")] diff --git a/src/info_storages/db_info_storage.rs b/src/info_storages/db_info_storage.rs index 58a8637..e96f053 100644 --- a/src/info_storages/db_info_storage.rs +++ b/src/info_storages/db_info_storage.rs @@ -1,8 +1,8 @@ use std::time::Duration; use async_trait::async_trait; -use rbatis::{crud_table, impl_field_name_method}; use rbatis::crud::CRUD; +use rbatis::crud_table; use rbatis::db::DBPoolOptions; use rbatis::executor::Executor; use rbatis::rbatis::Rbatis; @@ -28,8 +28,6 @@ impl TryFrom<&FileInfo> for DbModel { } } -impl_field_name_method!(DbModel { id, info }); - pub struct DBInfoStorage { db: Rbatis, } @@ -43,7 +41,7 @@ impl DBInfoStorage { app_conf.info_storage_opts.info_db_dsn.unwrap().as_str(), opts, ) - .await?; + .await?; Ok(Self { db }) } } @@ -51,7 +49,6 @@ impl DBInfoStorage { #[async_trait] impl InfoStorage for DBInfoStorage { async fn prepare(&mut self) -> RustusResult<()> { - // let builder = self.db(); self.db .exec( "CREATE TABLE IF NOT EXISTS db_model (id VARCHAR(40) PRIMARY KEY, info TEXT);", @@ -66,13 +63,13 @@ impl InfoStorage for DBInfoStorage { if create { self.db.save(&model, &[]).await?; } else { - self.db.update_by_column(DbModel::id(), &model).await?; + self.db.update_by_column("id", &model).await?; } Ok(()) } async fn get_info(&self, file_id: &str) -> RustusResult<FileInfo> { - let model: Option<DbModel> = self.db.fetch_by_column(DbModel::id(), file_id).await?; + let model: Option<DbModel> = self.db.fetch_by_column("id", file_id).await?; if let Some(info) = model { serde_json::from_str(info.info.as_str()).map_err(RustusError::from) } else { @@ -81,21 +78,9 @@ impl InfoStorage for DBInfoStorage { } async fn remove_info(&self, file_id: &str) -> RustusResult<()> { - // let model_opt: Option<db_model::Model> = - // db_model::Entity::find_by_id(String::from(file_id)) - // .one(&self.db) - // .await - // .map_err(RustusError::from)?; - // if let Some(model) = model_opt { - // let active_model: db_model::ActiveModel = model.into(); - // active_model - // .delete(&self.db) - // .await - // .map_err(RustusError::from)?; - // Ok(()) - // } else { - // Err(RustusError::FileNotFound) - // } - todo!() + self.db + .remove_by_column::<DbModel, &str>("id", file_id) + .await?; + Ok(()) } } diff --git a/src/info_storages/mod.rs b/src/info_storages/mod.rs index 6e70664..1c9dd14 100644 --- a/src/info_storages/mod.rs +++ b/src/info_storages/mod.rs @@ -12,13 +12,16 @@ mod file_info; pub mod db_info_storage; pub mod file_info_storage; +pub mod redis_info_storage; #[derive(PartialEq, From, Display, Clone, Debug)] pub enum AvailableInfoStores { - #[display(fmt = "FileInfoStorage")] - FileInfoStorage, - #[display(fmt = "DBInfoStorage")] - DBInfoStorage, + #[display(fmt = "File info storage")] + Files, + #[display(fmt = "DB info storage")] + DB, + #[display(fmt = "Redis info storage")] + Redis, } impl FromStr for AvailableInfoStores { @@ -26,8 +29,9 @@ impl FromStr for AvailableInfoStores { fn from_str(input: &str) -> Result<Self, Self::Err> { match input { - "file_info_storage" => Ok(AvailableInfoStores::FileInfoStorage), - "db_info_storage" => Ok(AvailableInfoStores::DBInfoStorage), + "file_info_storage" => Ok(AvailableInfoStores::Files), + "db_info_storage" => Ok(AvailableInfoStores::DB), + "redis_info_storage" => Ok(AvailableInfoStores::Redis), _ => Err(String::from("Unknown storage type")), } } @@ -44,12 +48,15 @@ impl AvailableInfoStores { config: &RustusConf, ) -> RustusResult<Box<dyn InfoStorage + Sync + Send>> { match self { - Self::FileInfoStorage => Ok(Box::new(file_info_storage::FileInfoStorage::new( + Self::Files => Ok(Box::new(file_info_storage::FileInfoStorage::new( config.clone(), ))), - Self::DBInfoStorage => Ok(Box::new( + Self::DB => Ok(Box::new( db_info_storage::DBInfoStorage::new(config.clone()).await?, )), + AvailableInfoStores::Redis => Ok(Box::new( + redis_info_storage::RedisStorage::new(config.clone()).await?, + )), } } } diff --git a/src/info_storages/redis_info_storage.rs b/src/info_storages/redis_info_storage.rs new file mode 100644 index 0000000..584407d --- /dev/null +++ b/src/info_storages/redis_info_storage.rs @@ -0,0 +1,64 @@ +use async_trait::async_trait; +use mobc_redis::mobc::Pool; +use mobc_redis::redis; +use mobc_redis::RedisConnectionManager; +use redis::aio::Connection; + +use crate::errors::{RustusError, RustusResult}; +use crate::info_storages::{FileInfo, InfoStorage}; +use crate::RustusConf; + +pub struct RedisStorage { + pool: Pool<RedisConnectionManager>, +} + +impl RedisStorage { + pub async fn new(app_conf: RustusConf) -> RustusResult<Self> { + let client = redis::Client::open(app_conf.info_storage_opts.info_db_dsn.unwrap().as_str())?; + let manager = RedisConnectionManager::new(client); + let pool = Pool::builder().max_open(100).build(manager); + Ok(Self { pool }) + } +} + +#[async_trait] +impl InfoStorage for RedisStorage { + async fn prepare(&mut self) -> RustusResult<()> { + Ok(()) + } + + async fn set_info(&self, file_info: &FileInfo, _create: bool) -> RustusResult<()> { + let mut conn = self.pool.get().await?; + redis::cmd("SET") + .arg(file_info.id.as_str()) + .arg(serde_json::to_string(file_info)?.as_str()) + .query_async::<Connection, String>(&mut conn) + .await + .map_err(RustusError::from)?; + Ok(()) + } + + async fn get_info(&self, file_id: &str) -> RustusResult<FileInfo> { + let mut conn = self.pool.get().await?; + let res = redis::cmd("GET") + .arg(file_id) + .query_async::<Connection, Option<String>>(&mut conn) + .await?; + if res.is_none() { + return Err(RustusError::FileNotFound); + } + serde_json::from_str(res.unwrap().as_str()).map_err(RustusError::from) + } + + async fn remove_info(&self, file_id: &str) -> RustusResult<()> { + let mut conn = self.pool.get().await?; + let resp = redis::cmd("DEL") + .arg(file_id) + .query_async::<Connection, Option<usize>>(&mut conn) + .await?; + match resp { + None | Some(0) => Err(RustusError::FileNotFound), + _ => Ok(()), + } + } +} diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs index b7fb659..ccd93bf 100644 --- a/src/storages/file_storage.rs +++ b/src/storages/file_storage.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use std::path::PathBuf; use actix_files::NamedFile; -use async_std::fs::{DirBuilder, OpenOptions, remove_file}; use async_std::fs::create_dir_all; +use async_std::fs::{remove_file, DirBuilder, OpenOptions}; use async_std::prelude::*; use async_trait::async_trait; use log::error; @@ -11,8 +11,8 @@ use uuid::Uuid; use crate::errors::{RustusError, RustusResult}; use crate::info_storages::{FileInfo, InfoStorage}; -use crate::RustusConf; use crate::storages::Storage; +use crate::RustusConf; pub struct FileStorage { app_conf: RustusConf, -- GitLab