diff --git a/Cargo.lock b/Cargo.lock index 87fef6ff310ee084a3db447431ce9e287a7b4a54..425ae9466742016d961f9e06eb05a88eaa43402d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2482,7 +2482,7 @@ dependencies = [ [[package]] name = "rustus" -version = "0.3.0" +version = "0.3.1" dependencies = [ "actix-files", "actix-web", diff --git a/Cargo.toml b/Cargo.toml index b2a78e2ed13cab41014c09c3ae7a932db5642e27..b6abc7d5ab542ba01d60ba6e6604c0928ece5ae6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rustus" -version = "0.3.0" +version = "0.3.1" edition = "2021" description = "TUS protocol implementation written in Rust." diff --git a/README.md b/README.md index ce46e03c3d430c6ceb2d846898cf4e34d052def5..7cb97ee7f5db3f4e896bf0e0e0535b30325e103c 100644 --- a/README.md +++ b/README.md @@ -9,45 +9,67 @@ This implementation has several features to make usage as simple as possible. * Rustus is robust, since it uses asynchronous Rust; * It can store information about files in databases; * You can specify directory structure to organize your uploads; +* It has a lot of hooks options, and hooks can be combined. * Highly configurable; -### Supported info storages +## Installation -* FileSystem -* PostgresSQL -* Mysql -* SQLite -* Redis +You can download binaries from a [releases page](https://github.com/s3rius/rustus/releases). -### Supported data storages +If you want to use docker, you can use official images from [s3rius/rustus](https://hub.docker.com/r/s3rius/rustus/): +```bash +docker run --rm -it -p 1081:1081 s3rius/rustus:latest +``` -* FileSystem +If we don't have a binary file for your operating system you can build it with [cargo](https://doc.rust-lang.org/cargo/getting-started/installation.html). -## Installation +```bash +git clone https://github.com/s3rius/rustus.git +cd rustus +cargo install --path . --features=all +``` + +### Supported data storages -Since I haven't configured build automation yet, you can build it -from source using `cargo`. +Right now you can only use `file-storage` to store uploads data. +The only two options you can adjust are: +* uploads directory +* directory structure + +To upload files in a custom directory other than `./data` +you can provide a `--data-dir` parameter. ```bash -cargo install --path . +rustus --data-dir "./files" ``` -Or you can use a docker image. +If you have a lot of uploads, you don't want to store all your files in +a flat structure. So you can set a directory structure for your uploads. ```bash -docker run --rm -it -p 1081:1081 s3rius/rustus:latest +rustus --dir-structure="{env[HOSTNAME]}/{year}/{month}/{day}" ``` -Docker image and binaries will be available soon. +```bash +tree data +data +├── 0bd911d4054d41c6a3ad54be67ee3e66.info +├── 5bc9c62384494c439e2a064b82a39cc6.info +└── rtus-68cb5b8746-5mgw9 + └── 2022 + └── 1 + └── 8 + ├── 0bd911d4054d41c6a3ad54be67ee3e66 + └── 5bc9c62384494c439e2a064b82a39cc6 -## Architecture +``` -Files and info about them are separated from each other. -In order to modify original file rustus searches for information about -the file in information storage. +**Important note:** if you use variable that doesn't exist or incorrect like invalid env variable, it +results in an error and the directory structure will become flat again. -However, automatic migration between different information -storages is not supported yet. +As you can see all info files are stored in a flat structure. It cannot be changed if +you use file info storage. In order to get rid of those `.info` files use different +info storages. ## Info storages @@ -55,7 +77,7 @@ The info storage is a database or directory. The main goal is to keep track of uploads. Rustus stores information about download in json format inside database. -File storage is a default one. You can customize the directory of an .info files +File storage is used by default. You can customize the directory of an .info files by providing `--info-dir` parameter. ```bash @@ -90,27 +112,225 @@ you have to use webhooks or File hooks. Hooks have priorities: file hooks are the most important, then goes webhooks and AMQP hooks have the least priority. If pre-create hook failed, the upload would not start. -Of course, since AMQP is a protocol that doesn't allow you to track responses. -We can't validate anything to stop uploading. - - -### Roadmap - -* [x] Data storage interface; -* [x] Info storage interface; -* [x] Core TUS protocol; -* [x] Extensions interface; -* [x] Creation extension; -* [x] Creation-defer-length extension; -* [x] Creation-with-upload extension; -* [x] Termination extension; -* [x] Route to get uploaded files; -* [x] Database support for info storage; -* [x] Redis support for info storage; -* [x] Notification interface; -* [x] Notifications via http hooks; -* [x] Notifications via RabbitMQ; -* [X] Executable files notifications; -* [ ] S3 as data storage store support; -* [ ] Rustus helm chart; -* [ ] Cloud native rustus operator. +Of course, since AMQP is a protocol that doesn't allow you to track responses +we can't validate anything to stop uploading. + +Hooks can have 2 formats + +default: +```json +{ + "upload": { + "id": "", + "offset": 0, + "length": 39729945, + "path": null, + "created_at": 1641620821, + "deferred_size": false, + "metadata": { + "filename": "38MB_video.mp4", + "meme": "hehe2" + } + }, + "request": { + "URI": "/files", + "method": "POST", + "remote_addr": "127.0.0.1", + "headers": { + "accept-encoding": "gzip, deflate", + "connection": "keep-alive", + "host": "localhost:1081", + "upload-metadata": "meme aGVoZTI=,filename MzhNQl92aWRlby5tcDQ=", + "tus-resumable": "1.0.0", + "content-length": "0", + "upload-length": "39729945", + "user-agent": "python-requests/2.26.0", + "accept": "*/*" + } + } +} +``` + +tusd: +```json +{ + "Upload": { + "ID": "", + "Offset": 0, + "Size": 39729945, + "IsFinal": true, + "IsPartial": false, + "PartialUploads": null, + "SizeIsDeferred": false, + "Metadata": { + "filename": "38MB_video.mp4", + "meme": "hehe2" + }, + "Storage": { + "Type": "filestore", + "Path": null + } + }, + "HTTPRequest": { + "URI": "/files", + "Method": "POST", + "RemoteAddr": "127.0.0.1", + "Header": { + "host": [ + "localhost:1081" + ], + "user-agent": [ + "python-requests/2.26.0" + ], + "accept": [ + "*/*" + ], + "content-length": [ + "0" + ], + "upload-metadata": [ + "meme aGVoZTI=,filename MzhNQl92aWRlby5tcDQ=" + ], + "connection": [ + "keep-alive" + ], + "tus-resumable": [ + "1.0.0" + ], + "upload-length": [ + "39729945" + ], + "accept-encoding": [ + "gzip, deflate" + ] + } + } +} +``` + +### File hooks + +Rustus can work with two types of file hooks. + +1. Single file hook; +2. Hooks directory. + +The main difference is that hook name is passed as a command line parameter to a +single file hook, but if you use hooks directory then hook name is used to determine a +file to call. Let's take a look at the examples + +Example of a single file hook: + +```bash +#!/bin/bash + +# Hook name would be "pre-create", "post-create" and so on. +HOOK_NAME="$1" +MEME="$(cat /dev/stdin | jq ".upload .metadata .meme" | xargs)" + +# Here we check if name in metadata is equal to pepe. +if [[ $MEME = "pepe" ]]; then + echo "This meme isn't allowed" 1>&2; + exit 1 +fi +``` + +As you can see it uses first CLI parameter as a hook name and all hook data is received from stdin. + +Let's make it executable +```bash +chmod +x "hooks/unified_hook" +``` + +To use it you can add parameter +```bash +rustus --hooks-file "hooks/unified_hook" +``` + +This hook is going to ignore any file that has "pepe" in metadata. + +Let's create a hook directory. + +```bash +⯠tree hooks +hooks +├── post-create +├── post-finish +├── post-receive +├── post-terminate +└── pre-create +``` + +Every file in this directory has an executable flag. +So you can specify a parameter to use hooks directory. + +```bash +rustus --hooks-dir "hooks" +``` + +In this case rustus will append a hook name to the directory you pointed at and call it as +an executable. + +Information about hook can be found in stdin. + +### Http Hooks + +Http hooks use http protocol to notify you about an upload. +You can use HTTP hooks to verify Authorization. + + +Let's create a FastAPI application that listens to hooks and checks the +authorization header. + +```bash +# Installing dependencies +pip install fastapi uvicorn +``` + +```python +# server.py +from fastapi import FastAPI, Header, HTTPException +from typing import Optional + +app = FastAPI() + +@app.post("/hooks") +def hook( + authorization: Optional[str] = Header(None), + hook_name: Optional[str] = Header(None), +): + print(f"Received: {hook_name}") + if authorization != "Bearer jwt": + raise HTTPException(401) + return None +``` + +Now we can start a server. +```bash +uvicorn server:app --port 8080 +``` + +Now you can start rustus, and it will check if Authorization header has a correct value. +```bash +rustus --hooks-http-urls "http://localhost:8000/hooks" --hooks-http-proxy-headers "Authorization" +``` + + +### AMQP hooks + +All hooks can be sent with an AMQP protocol. + +For example if you have a rabbitMQ you can use it. + +```bash +rustus --hooks-amqp-url "amqp://guest:guest@localhost" --hooks-amqp-exchange "my_exchange" +``` + +This command will create an exchange called "rustus" and queues for every hook. + +Every hook is published with routing key "rustus.{hook_name}" like +"rustus.post-create" or "rustus.pre-create" and so on. + +The problem with AMQP hooks is that you can't block the upload. +To do this you have to use HTTP or File hooks. But with AMQP your +uploads become non-blocking which is definitely a good thing. \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index d27bc42a93bb68b020fae394de91cca83d790f84..15c1ab7b567e88378086832fb1594725e39e4e24 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; use chrono::{Datelike, Timelike}; use lazy_static::lazy_static; +use log::error; use structopt::StructOpt; use crate::info_storages::AvailableInfoStores; @@ -36,11 +37,11 @@ pub struct StorageOptions { /// /// This directory is used to store files /// for all *file_storage storages. - #[structopt(long, default_value = "./data")] + #[structopt(long, env = "RUSTUS_DATA_DIR", default_value = "./data")] pub data_dir: PathBuf, - #[structopt(long, default_value = "")] - pub dis_structure: String, + #[structopt(long, env = "RUSTUS_DIR_STRUCTURE", default_value = "")] + pub dir_structure: String, } #[derive(StructOpt, Debug, Clone)] @@ -91,8 +92,8 @@ pub struct NotificationsOptions { /// /// This format will be used in all /// messages about hooks. - #[structopt(long, default_value = "default", env = "RUSTUS_NOTIFICATION_FORMAT")] - pub notification_format: Format, + #[structopt(long, default_value = "default", env = "RUSTUS_HOOKS_FORMAT")] + pub hooks_format: Format, /// Enabled hooks for notifications. #[structopt( @@ -236,8 +237,10 @@ impl RustusConf { vars.insert("year".into(), now.year().to_string()); vars.insert("hour".into(), now.hour().to_string()); vars.insert("minute".into(), now.minute().to_string()); - strfmt::strfmt(self.storage_opts.dis_structure.as_str(), &vars) - .unwrap_or_else(|_| "".into()) + strfmt::strfmt(self.storage_opts.dir_structure.as_str(), &vars).unwrap_or_else(|err| { + error!("{}", err); + "".into() + }) } /// List of extensions. diff --git a/src/errors.rs b/src/errors.rs index af844fff668818d7c6ef573f3890888d85c73260..19be4062cb8e175be17b2094c0b2c237626bed2c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -2,6 +2,7 @@ use std::io::{Error, ErrorKind}; use actix_web::http::StatusCode; use actix_web::{HttpResponse, HttpResponseBuilder, ResponseError}; +use log::error; pub type RustusResult<T> = Result<T, RustusError>; @@ -69,6 +70,7 @@ impl From<RustusError> for Error { /// Trait to convert errors to http-responses. impl ResponseError for RustusError { fn error_response(&self) -> HttpResponse { + error!("{}", self); HttpResponseBuilder::new(self.status_code()) .insert_header(("Content-Type", "text/html; charset=utf-8")) .body(format!("{}", self)) diff --git a/src/info_storages/models/file_info.rs b/src/info_storages/models/file_info.rs index bcab553c9a387a80313d6a9b07949587848a1f99..c593df4ecf595d3e0be5b73a8a8e3ecd512ad8f7 100644 --- a/src/info_storages/models/file_info.rs +++ b/src/info_storages/models/file_info.rs @@ -15,6 +15,7 @@ pub struct FileInfo { #[serde(with = "ts_seconds")] pub created_at: DateTime<Utc>, pub deferred_size: bool, + pub storage: String, pub metadata: HashMap<String, String>, } @@ -32,6 +33,7 @@ impl FileInfo { file_id: &str, length: Option<usize>, path: Option<String>, + storage: String, initial_metadata: Option<HashMap<String, String>>, ) -> FileInfo { let id = String::from(file_id); @@ -49,6 +51,7 @@ impl FileInfo { id, path, length, + storage, metadata, deferred_size, offset: 0, diff --git a/src/main.rs b/src/main.rs index fdddec43820a7e1a556674ba7b32f226711a1f74..80a3b6123f817a7e91ce3aff296596f445c6208a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,7 @@ use fern::Dispatch; use log::LevelFilter; use crate::errors::RustusResult; +use crate::info_storages::InfoStorage; use crate::notifiers::models::notification_manager::NotificationManager; use config::RustusConf; @@ -65,6 +66,7 @@ fn greeting(app_conf: &RustusConf) { /// given address. pub fn create_server( storage: Box<dyn Storage + Send + Sync>, + info_storage: Box<dyn InfoStorage + Send + Sync>, app_conf: RustusConf, notification_manager: NotificationManager, ) -> Result<Server, std::io::Error> { @@ -72,6 +74,8 @@ pub fn create_server( let port = app_conf.port; let workers = app_conf.workers; let app_conf_data = web::Data::new(app_conf.clone()); + let info_storage_data: web::Data<Box<dyn InfoStorage + Send + Sync>> = + web::Data::from(Arc::new(info_storage)); let storage_data: web::Data<Box<dyn Storage + Send + Sync>> = web::Data::from(Arc::new(storage)); let manager_data: web::Data<Box<NotificationManager>> = @@ -81,6 +85,7 @@ pub fn create_server( .app_data(app_conf_data.clone()) .app_data(storage_data.clone()) .app_data(manager_data.clone()) + .app_data(info_storage_data.clone()) // Adds all routes. .configure(protocol::setup(app_conf.clone())) // Main middleware that appends TUS headers. @@ -163,7 +168,7 @@ async fn main() -> std::io::Result<()> { info_storage.prepare().await?; // Creating file storage. - let mut storage = app_conf.storage_opts.storage.get(&app_conf, info_storage); + let mut storage = app_conf.storage_opts.storage.get(&app_conf); // Preparing it. storage.prepare().await?; @@ -171,6 +176,6 @@ async fn main() -> std::io::Result<()> { let notification_manager = NotificationManager::new(&app_conf).await?; // Creating actual server and running it. - let server = create_server(storage, app_conf, notification_manager)?; + let server = create_server(storage, info_storage, app_conf, notification_manager)?; server.await } diff --git a/src/notifiers/models/message_format.rs b/src/notifiers/models/message_format.rs index 578c74cd5fec5194f28fbef2b2790f1c785eab1b..6c9fb9c2ffa0508b4ae0889ef5649ec76a6dcafa 100644 --- a/src/notifiers/models/message_format.rs +++ b/src/notifiers/models/message_format.rs @@ -68,7 +68,7 @@ impl From<FileInfo> for TusdFileInfo { partial_uploads: None, metadata: file_info.metadata, storage: TusdStorageInfo { - storage_type: "filestore".into(), + storage_type: file_info.storage, path: file_info.path, }, } diff --git a/src/protocol/core/routes.rs b/src/protocol/core/routes.rs index c81e32a304006001f4c50cd9887bd545cea4e64a..f9f282db5896bcede50652a7e037a8b785bffc73 100644 --- a/src/protocol/core/routes.rs +++ b/src/protocol/core/routes.rs @@ -1,9 +1,10 @@ -use actix_web::{http::StatusCode, web, web::Bytes, HttpRequest, HttpResponse}; +use actix_web::{web, web::Bytes, HttpRequest, HttpResponse}; +use crate::errors::RustusError; use crate::notifiers::Hook; use crate::protocol::extensions::Extensions; use crate::utils::headers::{check_header, parse_header}; -use crate::{NotificationManager, RustusConf, Storage}; +use crate::{InfoStorage, NotificationManager, RustusConf, Storage}; #[allow(clippy::needless_pass_by_value)] pub fn server_info(app_conf: web::Data<RustusConf>) -> HttpResponse { @@ -19,27 +20,34 @@ pub fn server_info(app_conf: web::Data<RustusConf>) -> HttpResponse { } pub async fn get_file_info( + info_storage: web::Data<Box<dyn InfoStorage + Send + Sync>>, storage: web::Data<Box<dyn Storage + Send + Sync>>, request: HttpRequest, ) -> actix_web::Result<HttpResponse> { + // Getting file id from URL. + if request.match_info().get("file_id").is_none() { + return Ok(HttpResponse::NotFound().body("")); + } + let file_id = request.match_info().get("file_id").unwrap(); + + // Getting file info from info_storage. + let file_info = info_storage.get_info(file_id).await?; + if file_info.storage != storage.to_string() { + return Ok(HttpResponse::NotFound().body("")); + } let mut builder = HttpResponse::Ok(); - if let Some(file_id) = request.match_info().get("file_id") { - let file_info = storage.get_file_info(file_id).await?; - builder - .insert_header(("Upload-Offset", file_info.offset.to_string())) - .insert_header(("Content-Length", file_info.offset.to_string())); - // Upload length is known. - if let Some(upload_len) = file_info.length { - builder.insert_header(("Upload-Length", upload_len.to_string())); - } else { - builder.insert_header(("Upload-Defer-Length", "1")); - } - if let Some(meta) = file_info.get_metadata_string() { - builder.insert_header(("Upload-Metadata", meta)); - } + builder + .insert_header(("Upload-Offset", file_info.offset.to_string())) + .insert_header(("Content-Length", file_info.offset.to_string())); + // Upload length is known. + if let Some(upload_len) = file_info.length { + builder.insert_header(("Upload-Length", upload_len.to_string())); } else { - builder.status(StatusCode::NOT_FOUND); - }; + builder.insert_header(("Upload-Defer-Length", "1")); + } + if let Some(meta) = file_info.get_metadata_string() { + builder.insert_header(("Upload-Metadata", meta)); + } Ok(builder.body("")) } @@ -47,18 +55,25 @@ pub async fn write_bytes( request: HttpRequest, bytes: Bytes, storage: web::Data<Box<dyn Storage + Send + Sync>>, + info_storage: web::Data<Box<dyn InfoStorage + Send + Sync>>, notification_manager: web::Data<Box<NotificationManager>>, app_conf: web::Data<RustusConf>, ) -> actix_web::Result<HttpResponse> { + // Checking if request has required headers. if !check_header(&request, "Content-Type", "application/offset+octet-stream") { return Ok(HttpResponse::UnsupportedMediaType().body("")); } - let offset = parse_header(&request, "Upload-Offset"); + // Getting current offset. + let offset: Option<usize> = parse_header(&request, "Upload-Offset"); if offset.is_none() { return Ok(HttpResponse::UnsupportedMediaType().body("")); } + if request.match_info().get("file_id").is_none() { + return Ok(HttpResponse::NotFound().body("")); + } + // New upload length. // Parses header `Upload-Length` only if the creation-defer-length extension is enabled. let updated_len = if app_conf @@ -70,30 +85,69 @@ pub async fn write_bytes( None }; - if let Some(file_id) = request.match_info().get("file_id") { - let file_info = storage - .add_bytes(file_id, offset.unwrap(), updated_len, bytes.as_ref()) - .await?; - let mut hook = Hook::PostReceive; - if file_info.length == Some(file_info.offset) { - hook = Hook::PostFinish; + let file_id = request.match_info().get("file_id").unwrap(); + // Getting file info. + let mut file_info = info_storage.get_info(file_id).await?; + + // Checking if file was stored in the same storage. + if file_info.storage != storage.to_string() { + return Ok(HttpResponse::NotFound().body("")); + } + // Checking if offset from request is the same as the real offset. + if offset.unwrap() != file_info.offset { + return Ok(HttpResponse::Conflict().body("")); + } + + // If someone want to update file length. + // This required by Upload-Defer-Length extension. + if let Some(new_len) = updated_len { + // Whoop, someone gave us total file length + // less that he had already uploaded. + if new_len < file_info.offset { + return Err(RustusError::WrongOffset.into()); } - if app_conf.hook_is_active(hook) { - let message = app_conf - .notification_opts - .notification_format - .format(&request, &file_info)?; - let headers = request.headers().clone(); - tokio::spawn(async move { - notification_manager - .send_message(message, hook, &headers) - .await - }); + // We already know the exact size of a file. + // Someone want to update it. + // Anyway, it's not allowed, heh. + if file_info.length.is_some() { + return Err(RustusError::SizeAlreadyKnown.into()); } - Ok(HttpResponse::NoContent() - .insert_header(("Upload-Offset", file_info.offset.to_string())) - .body("")) - } else { - Ok(HttpResponse::NotFound().body("")) + + // All checks are ok. Now our file will have exact size. + file_info.deferred_size = false; + file_info.length = Some(new_len); + } + + // Checking if the size of the upload is already equals + // to calculated offset. It means that all bytes were already written. + if Some(file_info.offset) == file_info.length { + return Err(RustusError::FrozenFile.into()); + } + + // Appending bytes to file. + storage.add_bytes(&file_info, bytes.as_ref()).await?; + // Updating offset. + file_info.offset += bytes.len(); + // Saving info to info storage. + info_storage.set_info(&file_info, false).await?; + + let mut hook = Hook::PostReceive; + if file_info.length == Some(file_info.offset) { + hook = Hook::PostFinish; + } + if app_conf.hook_is_active(hook) { + let message = app_conf + .notification_opts + .hooks_format + .format(&request, &file_info)?; + let headers = request.headers().clone(); + tokio::spawn(async move { + notification_manager + .send_message(message, hook, &headers) + .await + }); } + Ok(HttpResponse::NoContent() + .insert_header(("Upload-Offset", file_info.offset.to_string())) + .body("")) } diff --git a/src/protocol/creation/routes.rs b/src/protocol/creation/routes.rs index f9ee9cca7410bc419890bdbc9a418f1307d9d5ec..61bbdca8f54e698b888d1683da499d7c82075176 100644 --- a/src/protocol/creation/routes.rs +++ b/src/protocol/creation/routes.rs @@ -7,7 +7,7 @@ use crate::info_storages::FileInfo; use crate::notifiers::Hook; use crate::protocol::extensions::Extensions; use crate::utils::headers::{check_header, parse_header}; -use crate::{NotificationManager, RustusConf, Storage}; +use crate::{InfoStorage, NotificationManager, RustusConf, Storage}; /// Get metadata info from request. /// @@ -49,8 +49,17 @@ fn get_metadata(request: &HttpRequest) -> Option<HashMap<String, String>> { }) } +/// Create file. +/// +/// This method allows you to create file to start uploading. +/// +/// This method supports defer-length if +/// you don't know actual file length and +/// you can upload first bytes if creation-with-upload +/// extension is enabled. pub async fn create_file( storage: web::Data<Box<dyn Storage + Send + Sync>>, + info_storage: web::Data<Box<dyn InfoStorage + Send + Sync>>, notification_manager: web::Data<Box<NotificationManager>>, app_conf: web::Data<RustusConf>, request: HttpRequest, @@ -75,20 +84,28 @@ pub async fn create_file( let meta = get_metadata(&request); + let file_id = uuid::Uuid::new_v4().to_string(); + let mut file_info = FileInfo::new( + file_id.as_str(), + length, + None, + storage.to_string(), + meta.clone(), + ); + if app_conf.hook_is_active(Hook::PreCreate) { - let initial_file_info = FileInfo::new("", length, None, meta.clone()); let message = app_conf .notification_opts - .notification_format - .format(&request, &initial_file_info)?; + .hooks_format + .format(&request, &file_info)?; let headers = request.headers(); notification_manager .send_message(message, Hook::PreCreate, headers) .await?; } - // Create file and get the id. - let mut file_info = storage.create_file(length, meta).await?; + // Create file and get the it's path. + file_info.path = Some(storage.create_file(&file_info).await?); // Create upload URL for this file. let upload_url = request.url_for("core:write_bytes", &[file_info.id.clone()])?; @@ -102,15 +119,16 @@ pub async fn create_file( return Ok(HttpResponse::BadRequest().body("")); } // Writing first bytes. - file_info = storage - .add_bytes(file_info.id.as_str(), 0, None, bytes.as_ref()) - .await?; + storage.add_bytes(&file_info, bytes.as_ref()).await?; + file_info.offset += bytes.len(); } + info_storage.set_info(&file_info, true).await?; + if app_conf.hook_is_active(Hook::PostCreate) { let message = app_conf .notification_opts - .notification_format + .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); // Adding send_message task to tokio reactor. diff --git a/src/protocol/getting/routes.rs b/src/protocol/getting/routes.rs index fdd918d7dc9ddad17c42c2e155209299cacbba1e..44186c81c6bbb64727aaad6a82444edeb550c741 100644 --- a/src/protocol/getting/routes.rs +++ b/src/protocol/getting/routes.rs @@ -1,15 +1,23 @@ use actix_web::{web, HttpRequest, Responder}; use crate::errors::RustusError; -use crate::Storage; +use crate::{InfoStorage, Storage}; +/// Retrieve actual file. +/// +/// This method allows you to download files directly from storage. pub async fn get_file( request: HttpRequest, storage: web::Data<Box<dyn Storage + Send + Sync>>, + info_storage: web::Data<Box<dyn InfoStorage + Send + Sync>>, ) -> impl Responder { let file_id_opt = request.match_info().get("file_id").map(String::from); if let Some(file_id) = file_id_opt { - storage.get_contents(file_id.as_str()).await + let file_info = info_storage.get_info(file_id.as_str()).await?; + if file_info.storage != storage.to_string() { + return Err(RustusError::FileNotFound); + } + storage.get_contents(&file_info).await } else { Err(RustusError::FileNotFound) } diff --git a/src/protocol/termination/routes.rs b/src/protocol/termination/routes.rs index 272fc2154a43c7fc4b383df1512246925cb3545d..16d809406a955c404a8ae0d19383b5b1fb171114 100644 --- a/src/protocol/termination/routes.rs +++ b/src/protocol/termination/routes.rs @@ -2,25 +2,31 @@ use actix_web::{web, HttpRequest, HttpResponse}; use crate::errors::RustusResult; use crate::notifiers::Hook; -use crate::{NotificationManager, RustusConf, Storage}; +use crate::{InfoStorage, NotificationManager, RustusConf, Storage}; /// Terminate uploading. /// -/// This method will remove all -/// files by id. +/// This method will remove all data by id. +/// It removes info and actual data. pub async fn terminate( storage: web::Data<Box<dyn Storage + Send + Sync>>, + info_storage: web::Data<Box<dyn InfoStorage + Send + Sync>>, request: HttpRequest, notification_manager: web::Data<Box<NotificationManager>>, app_conf: web::Data<RustusConf>, ) -> RustusResult<HttpResponse> { let file_id_opt = request.match_info().get("file_id").map(String::from); if let Some(file_id) = file_id_opt { - let file_info = storage.remove_file(file_id.as_str()).await?; + let file_info = info_storage.get_info(file_id.as_str()).await?; + if file_info.storage != storage.to_string() { + return Ok(HttpResponse::NotFound().body("")); + } + info_storage.remove_info(file_id.as_str()).await?; + storage.remove_file(&file_info).await?; if app_conf.hook_is_active(Hook::PostTerminate) { let message = app_conf .notification_opts - .notification_format + .hooks_format .format(&request, &file_info)?; let headers = request.headers().clone(); tokio::spawn(async move { diff --git a/src/storages/file_storage.rs b/src/storages/file_storage.rs index b6b7419cdb6223fd0d07e1304ae85a2fa20b0958..8d8c989255b689b8d32f8ada2f2599511e82d599 100644 --- a/src/storages/file_storage.rs +++ b/src/storages/file_storage.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::path::PathBuf; use actix_files::NamedFile; @@ -6,27 +5,22 @@ use async_std::fs::{remove_file, DirBuilder, OpenOptions}; use async_std::prelude::*; use async_trait::async_trait; use log::error; -use uuid::Uuid; use crate::errors::{RustusError, RustusResult}; -use crate::info_storages::{FileInfo, InfoStorage}; +use crate::info_storages::FileInfo; use crate::storages::Storage; use crate::RustusConf; +use derive_more::Display; +#[derive(Display)] +#[display(fmt = "file_storage")] pub struct FileStorage { app_conf: RustusConf, - info_storage: Box<dyn InfoStorage + Send + Sync>, } impl FileStorage { - pub fn new( - app_conf: RustusConf, - info_storage: Box<dyn InfoStorage + Send + Sync>, - ) -> FileStorage { - FileStorage { - app_conf, - info_storage, - } + pub fn new(app_conf: RustusConf) -> FileStorage { + FileStorage { app_conf } } pub async fn data_file_path(&self, file_id: &str) -> RustusResult<PathBuf> { @@ -68,17 +62,11 @@ impl Storage for FileStorage { Ok(()) } - async fn get_file_info(&self, file_id: &str) -> RustusResult<FileInfo> { - // I don't think comments are convenient here. - self.info_storage.get_info(file_id).await - } - - async fn get_contents(&self, file_id: &str) -> RustusResult<NamedFile> { - let info = self.info_storage.get_info(file_id).await?; - if info.path.is_none() { + async fn get_contents(&self, file_info: &FileInfo) -> RustusResult<NamedFile> { + if file_info.path.is_none() { return Err(RustusError::FileNotFound); } - NamedFile::open_async(info.path.unwrap().as_str()) + NamedFile::open_async(file_info.path.clone().unwrap().as_str()) .await .map_err(|err| { error!("{:?}", err); @@ -86,47 +74,12 @@ impl Storage for FileStorage { }) } - async fn add_bytes( - &self, - file_id: &str, - request_offset: usize, - updated_length: Option<usize>, - bytes: &[u8], - ) -> RustusResult<FileInfo> { - let mut info = self.info_storage.get_info(file_id).await?; - // Checking that provided offset is equal to offset provided by request. - if info.offset != request_offset { - return Err(RustusError::WrongOffset); - } + async fn add_bytes(&self, info: &FileInfo, bytes: &[u8]) -> RustusResult<()> { // In normal situation this `if` statement is not // gonna be called, but what if it is ... if info.path.is_none() { return Err(RustusError::FileNotFound); } - // This thing is only applicable in case - // if tus-extension `creation-defer-length` is enabled. - if let Some(new_len) = updated_length { - // Whoop, someone gave us total file length - // less that he had already uploaded. - if new_len < info.offset { - return Err(RustusError::WrongOffset); - } - // We already know the exact size of a file. - // Someone want to update it. - // Anyway, it's not allowed, heh. - if info.length.is_some() { - return Err(RustusError::SizeAlreadyKnown); - } - - // All checks are ok. Now our file will have exact size. - info.deferred_size = false; - info.length = Some(new_len); - } - // Checking if the size of the upload is already equals - // to calculated offset. It means that all bytes were already written. - if Some(info.offset) == info.length { - return Err(RustusError::FrozenFile); - } // Opening file in w+a mode. // It means that we're going to append some // bytes to the end of a file. @@ -146,23 +99,12 @@ impl Storage for FileStorage { })?; file.sync_data().await?; // Updating information about file. - info.offset += bytes.len(); - self.info_storage.set_info(&info, false).await?; - Ok(info) + Ok(()) } - async fn create_file( - &self, - file_size: Option<usize>, - metadata: Option<HashMap<String, String>>, - ) -> RustusResult<FileInfo> { - // Let's create a new file ID. - // I guess the algo for generating new upload-id's can be - // configurable. But for now I don't really care, since UUIv4 works fine. - // Maybe update it later. - let file_id = Uuid::new_v4().simple().to_string(); + async fn create_file(&self, file_info: &FileInfo) -> RustusResult<String> { // New path to file. - let file_path = self.data_file_path(file_id.as_str()).await?; + let file_path = self.data_file_path(file_info.id.as_str()).await?; // Creating new file. let mut file = OpenOptions::new() .write(true) @@ -172,7 +114,7 @@ impl Storage for FileStorage { .await .map_err(|err| { error!("{:?}", err); - RustusError::FileAlreadyExists(file_id.clone()) + RustusError::FileAlreadyExists(file_info.id.clone()) })?; // Let's write an empty string to the beginning of the file. @@ -182,41 +124,20 @@ impl Storage for FileStorage { RustusError::UnableToWrite(file_path.display().to_string()) })?; file.sync_all().await?; - // Creating new FileInfo object and saving it. - let file_info = FileInfo::new( - file_id.as_str(), - file_size, - Some(file_path.display().to_string()), - metadata, - ); - - self.info_storage.set_info(&file_info, true).await?; - Ok(file_info) + Ok(file_path.display().to_string()) } - async fn remove_file(&self, file_id: &str) -> RustusResult<FileInfo> { - let info = self.info_storage.get_info(file_id).await?; - // Whoops, someone forgot to update the path field. - if info.path.is_none() { - return Err(RustusError::FileNotFound); - } - // Let's remove info first, so file won't show up - // In get_contents function. - self.info_storage.remove_info(file_id).await?; - + async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()> { // Let's remove the file itself. - let data_path = PathBuf::from(info.path.as_ref().unwrap().clone()); + let data_path = PathBuf::from(file_info.path.as_ref().unwrap().clone()); if !data_path.exists() { - // Maybe we don't need error here, - // since if file doesn't exist, we're done. - // FIXME: Find it out. return Err(RustusError::FileNotFound); } remove_file(data_path).await.map_err(|err| { error!("{:?}", err); - RustusError::UnableToRemove(String::from(file_id)) + RustusError::UnableToRemove(file_info.id.clone()) })?; - Ok(info) + Ok(()) } } diff --git a/src/storages/models/available_stores.rs b/src/storages/models/available_stores.rs index 34880c18dd35383de0982d226c036f5d3431a572..c4f8d1b788a84fecb0bf29cd00bd89944a2c5761 100644 --- a/src/storages/models/available_stores.rs +++ b/src/storages/models/available_stores.rs @@ -1,4 +1,3 @@ -use crate::info_storages::InfoStorage; use crate::storages::file_storage; use crate::{from_str, RustusConf, Storage}; use derive_more::{Display, From}; @@ -20,16 +19,10 @@ impl AvailableStores { /// `config` - Rustus configuration. /// `info_storage` - Storage for information about files. /// - pub fn get( - &self, - config: &RustusConf, - info_storage: Box<dyn InfoStorage + Sync + Send>, - ) -> Box<dyn Storage + Send + Sync> { + pub fn get(&self, config: &RustusConf) -> Box<dyn Storage + Send + Sync> { #[allow(clippy::single_match)] match self { - Self::FileStorage => { - Box::new(file_storage::FileStorage::new(config.clone(), info_storage)) - } + Self::FileStorage => Box::new(file_storage::FileStorage::new(config.clone())), } } } diff --git a/src/storages/models/storage.rs b/src/storages/models/storage.rs index 31df02e7e22c7765d09baba5c07f93d6c9508495..e1294433f1dfff8fbbb8fdd30a7ba7a323a12e3b 100644 --- a/src/storages/models/storage.rs +++ b/src/storages/models/storage.rs @@ -2,10 +2,10 @@ use crate::errors::RustusResult; use crate::info_storages::FileInfo; use actix_files::NamedFile; use async_trait::async_trait; -use std::collections::HashMap; +use std::fmt::Display; #[async_trait] -pub trait Storage { +pub trait Storage: Display { /// Prepare storage before starting up server. /// /// Function to check if configuration is correct @@ -17,24 +17,15 @@ pub trait Storage { /// be a problem later on. async fn prepare(&mut self) -> RustusResult<()>; - /// Get file information. - /// - /// This method returns all information about file. - /// - /// # Params - /// `file_id` - unique file identifier. - async fn get_file_info(&self, file_id: &str) -> RustusResult<FileInfo>; - /// Get contents of a file. /// /// This method must return NamedFile since it /// is compatible with ActixWeb files interface. - /// - /// This method basically must call info storage method. + /// FIXME: change return type to stream. /// /// # Params - /// `file_id` - unique file identifier. - async fn get_contents(&self, file_id: &str) -> RustusResult<NamedFile>; + /// `file_info` - info about current file. + async fn get_contents(&self, file_info: &FileInfo) -> RustusResult<NamedFile>; /// Add bytes to the file. /// @@ -44,24 +35,13 @@ pub trait Storage { /// # Errors /// /// Implementations MUST throw errors at following cases: - /// * Offset for request doesn't match offset we have in info storage. - /// * Updated length is provided, but we already know total size in bytes. /// * If the info about the file can't be found. /// * If the storage is offline. /// /// # Params - /// `file_id` - unique file identifier; - /// `request_offset` - offset from the client. - /// `updated_length` - total file size in bytes. - /// This value is used by creation-defer-length extension. + /// `file_info` - info about current file. /// `bytes` - bytes to append to the file. - async fn add_bytes( - &self, - file_id: &str, - request_offset: usize, - updated_length: Option<usize>, - bytes: &[u8], - ) -> RustusResult<FileInfo>; + async fn add_bytes(&self, file_info: &FileInfo, bytes: &[u8]) -> RustusResult<()>; /// Create file in storage. /// @@ -70,13 +50,8 @@ pub trait Storage { /// This function must use info storage to store information about the upload. /// /// # Params - /// `file_size` - Size of a file. It may be None if size is deferred; - /// `metadata` - Optional file meta-information; - async fn create_file( - &self, - file_size: Option<usize>, - metadata: Option<HashMap<String, String>>, - ) -> RustusResult<FileInfo>; + /// `file_info` - info about current file. + async fn create_file(&self, file_info: &FileInfo) -> RustusResult<String>; /// Remove file from storage /// @@ -84,6 +59,6 @@ pub trait Storage { /// object if any. /// /// # Params - /// `file_id` - unique file identifier; - async fn remove_file(&self, file_id: &str) -> RustusResult<FileInfo>; + /// `file_info` - info about current file. + async fn remove_file(&self, file_info: &FileInfo) -> RustusResult<()>; } diff --git a/src/utils/enums.rs b/src/utils/enums.rs index 0ae1ae356c8ce37dff91115f368d390b861e88da..5109aeef353f609c345e4b70dde0a42ea0477ad3 100644 --- a/src/utils/enums.rs +++ b/src/utils/enums.rs @@ -1,3 +1,4 @@ +/// Implement `FromStr` for enums with `EnumIterator` trait from strum. #[macro_export] macro_rules! from_str { ($enum_name:ty, $name:literal) => {