diff --git a/src/client.rs b/src/client.rs deleted file mode 100644 index f2fd4a2..0000000 --- a/src/client.rs +++ /dev/null @@ -1,363 +0,0 @@ -mod download; -mod data; -mod util; -mod error; -mod message; - -use std::{collections::HashMap, path::Path, str::FromStr, time::Duration}; - -use clap::{crate_name, crate_version}; -use download::{file_size, open_outfile, store_body}; -use error::{ClientError, RequestError}; -use http::{ - header::{CONTENT_LENGTH, CONTENT_TYPE, ORIGIN, RANGE, USER_AGENT}, request::Builder as RequestBuilder, HeaderMap, HeaderName, HeaderValue, Request, Response, StatusCode, Uri -}; -use http_body_util::BodyDataStream; -use message::{ClientActorMessage, ClientActorMessageHandle}; -use reqwest::{redirect::Policy, Body}; -use tokio::{ - fs::File, - select, - sync::{mpsc, oneshot}, - task::JoinSet, time::timeout, -}; -use tower::{util::BoxCloneService, ServiceBuilder, ServiceExt as _}; -use tower_http::decompression::{DecompressionBody, DecompressionLayer}; -use tower_reqwest::HttpClientLayer; - -use log::{debug, error, info}; -use util::head; - -use crate::map_dlerror; - - -#[derive(Clone, Debug)] -pub enum DownloadState { - GotHead, - #[allow(dead_code)] - Partial { content_type: Option }, - Done { content_type: Option }, -} - - -type ActionIndex = u64; -type DownloadResult = Result, ClientError>; -type JoinSetResult = Result, ClientError>; -type HttpClient = BoxCloneService, Response>, anyhow::Error>; - - - -#[derive(Debug)] -struct ClientActor { - body_timeout: Option, - client: HttpClient, - tasks: JoinSet, - actions: HashMap, - actions_idx: ActionIndex, - default_headers: HeaderMap, - - receiver: mpsc::Receiver, - max_tasks: usize, -} - - -pub(super) struct ClientActorHandle { - sender: mpsc::Sender, - abort: oneshot::Sender, -} - -async fn process_next_result(mut actor: ClientActor, result: JoinSetResult) -> ClientActor { - use ClientActorMessageHandle::{Download, GetData}; - - match result { - Err(e) => { - info!("Retry failed download: {:?}", e); - // retry ... instead of responing here we could also respond - // with something that in turn would be used to retry... - let client = actor.client.clone(); - let default_headers = actor.default_headers.clone(); - actor.tasks.spawn(async move { - match e.action { - Download { .. } => { - download::download( client - , default_headers - , e.action - , actor.body_timeout ).await - }, - GetData { .. } => - data::data(client, e.action).await, - } - }); - }, - - // when the task finishes - Ok(Some(action)) => { - match action { - Download { ref uri, ref state, ref message, .. } => { - info!("Done download: {:?}", uri); - if let Some((_, message)) = actor.actions.remove_entry(message) { - use ClientActorMessage::Download; - - match message { - Download { respond_to, .. } => { - let _ = respond_to.send(Ok(state.clone())); - }, - _ => panic!("Wrong variant ... this should never happen"), - } - } else { - panic!("Lost a message"); - } - }, - - GetData { ref uri, ref buffer, ref message } => { - info!("Done get_data: {:?}", uri); - if let Some((_, message)) = actor.actions.remove_entry(message) { - use ClientActorMessage::GetData; - match message { - GetData { respond_to, .. } => { - let _ = respond_to.send(buffer.clone()); - }, - _ => panic!("Wrong variant ... this should never happen"), - } - } else { - panic!("Lost a message"); - } - }, - } - }, - - // Got a stop message...here we still continue procession until the - // JoinSet is empty. - Ok(None) => (), - }; - - actor -} - -async fn run_client(mut actor: ClientActor) { - loop { - if actor.tasks.len() >= actor.max_tasks { - if let Some(join) = actor.tasks.join_next().await { - match join { - Err(e) => { - error!("FATAL Join failed: {}", e); - break - }, - Ok(result) => actor = process_next_result(actor, result).await, - } - }; - } else { - select! { - Some(join) = actor.tasks.join_next() => { - match join { - Err(e) => { - error!("FATAL Join failed: {}", e); - break - }, - Ok(result) => actor = process_next_result(actor, result).await, - } - } - - Some(message) = actor.receiver.recv() => { - actor.handle_message(message).await; - } - - else => {} - } - } - } -} - - -impl ClientActor { - fn new( buffer: usize - , rate_limit: u64 - , concurrency_limit: usize - , timeout: Duration - , receiver: mpsc::Receiver - , abort_rx: oneshot::Receiver ) -> anyhow::Result { - let client = ServiceBuilder::new() - // Add some layers. - . buffer(buffer) - . rate_limit(rate_limit, Duration::from_secs(1)) - . concurrency_limit(concurrency_limit) - . timeout(timeout) - . layer(DecompressionLayer::new()) - // Make client compatible with the `tower-http` layers. - . layer(HttpClientLayer) - . service( reqwest::Client::builder() - . redirect(Policy::limited(5)) - . build()? ) - . map_err(anyhow::Error::msg) - . boxed_clone(); - - let mut tasks = JoinSet::new(); - - tasks.spawn(async move { - let _ = abort_rx.await; - Ok(None) - }); - - let actions = HashMap::new(); - let actions_idx = 0; - let tasks_left = buffer + concurrency_limit; - let body_timeout = None; - let mut default_headers = HeaderMap::new(); - default_headers.insert( - USER_AGENT, - HeaderValue::from_str(&( crate_name!().to_string() + "/" - + crate_version!() )).unwrap() ); - - Ok(Self { - body_timeout, - client, - tasks, - receiver, - actions, - default_headers, - actions_idx, - max_tasks: tasks_left }) - } - - fn set_body_timeout(mut self, timeout: Option) -> Self { - self.body_timeout = timeout; - self - } - - fn set_origin(mut self, origin: Option) -> Self { - if let Some(origin) = origin { - self.default_headers.insert( - ORIGIN, - HeaderValue::from_str(origin.as_str()).unwrap() ); - } else { - self.default_headers.remove(ORIGIN); - } - self - } - - fn set_user_agent(mut self, user_agent: Option) -> Self { - if let Some(user_agent) = user_agent { - self.default_headers.insert( - USER_AGENT, - HeaderValue::from_str(user_agent.as_str()).unwrap() ); - } else { - self.default_headers.remove(USER_AGENT); - } - self - } - - async fn handle_message(&mut self, message: ClientActorMessage) { - self.actions.insert(self.actions_idx, message); - - use ClientActorMessage::{Download, GetData}; - - match self.actions.get(&self.actions_idx) { - Some(Download { ref filename, ref uri, .. }) => { - // spawn a task that does the work - let client = self.client.clone(); - let timeout = self.body_timeout; - - let handle = ClientActorMessageHandle::Download { - filename: filename.to_path_buf(), - uri: uri.clone(), - state: None, - message: self.actions_idx, - }; - - let default_headers = self.default_headers.clone(); - self.tasks.spawn(async move { - download::download( client - , default_headers - , handle - , timeout ).await - }); - - self.actions_idx += 1; - }, - - Some(GetData { ref uri, .. }) => { - // spawn a task that does the work - let client = self.client.clone(); - - let handle = ClientActorMessageHandle::GetData { - uri: uri.clone(), - buffer: None, - message: self.actions_idx, - }; - - self.tasks.spawn(async move { - data::data(client, handle).await - }); - - self.actions_idx += 1; - }, - - None => (), - } - } -} - -impl ClientActorHandle { - pub(super) fn new( buffer: usize - , rate_limit: u64 - , concurrency_limit: usize - , timeout: Duration - , use_body_timeout: bool - , origin: Option - , user_agent: Option ) -> Self { - let (sender, receiver) = mpsc::channel(1); - let (abort, abort_rx) = oneshot::channel::(); - let actor = ClientActor::new( buffer - , rate_limit - , concurrency_limit - , timeout - , receiver - , abort_rx ) - . expect("Client create error") - . set_origin(origin); - - let actor = if let Some(user_agent) = user_agent { - actor.set_user_agent(Some(user_agent)) - } else { - actor - }; - - let actor = if use_body_timeout { - actor.set_body_timeout(Some(timeout)) - } else { - actor - }; - - debug!("-> actor: {:?}", actor); - - tokio::spawn(run_client(actor)); - - Self { sender, abort } - } - - pub(super) fn stop(self) { - let _ = self.abort.send(Ok(None)); - drop(self.sender); - } - - pub(super) async fn download( &self - , filename: impl AsRef - , uri: &Uri ) -> DownloadResult { - let filename = filename.as_ref().to_path_buf(); - let uri = uri.to_owned(); - let (send, receive) = oneshot::channel(); - let msg = ClientActorMessage::Download { filename, uri, respond_to: send }; - - let _ = self.sender.send(msg).await; - receive.await.expect("Actor cancelled unexpected") - } - - pub(super) async fn body_bytes(&self, uri: &Uri) -> Option> { - let uri = uri.to_owned(); - let (send, receive) = oneshot::channel(); - let msg = ClientActorMessage::GetData { uri, respond_to: send }; - - let _ = self.sender.send(msg).await; - receive.await.expect("Actor cancelled unexpected") - } -} diff --git a/src/client/data.rs b/src/client/data.rs deleted file mode 100644 index dde19ec..0000000 --- a/src/client/data.rs +++ /dev/null @@ -1,31 +0,0 @@ -use http::HeaderMap; -use tower_http_client::client::BodyReader; - -use crate::map_dlerror; - -use super::{util, ClientActorMessageHandle, HttpClient, JoinSetResult}; - - -pub(super) async fn data( mut client: HttpClient - , mut message: ClientActorMessageHandle ) -> JoinSetResult { - let uri = message.uri(); - - let mut response = util::request( &mut client - , "GET" - , &uri - , &HeaderMap::new() ) - . await - . map_err(map_dlerror!(message))?; - - // read body into Vec - let body: Vec = BodyReader::new(response.body_mut()) - . bytes() - . await - . map_err(map_dlerror!(message))? - . to_vec(); - - let buffer = message.buffer_mut(); - *buffer = Some(body); - - Ok(Some(message)) -} diff --git a/src/client/download.rs b/src/client/download.rs deleted file mode 100644 index 0c7a935..0000000 --- a/src/client/download.rs +++ /dev/null @@ -1,132 +0,0 @@ -use std::{io::ErrorKind, path::Path, time::Duration}; - -use anyhow::anyhow; -use futures_util::StreamExt as _; -use http::{header::RANGE, HeaderMap, StatusCode}; -use http_body_util::BodyDataStream; -use reqwest::Body; -use tokio::{ - fs::{symlink_metadata, File}, - io::AsyncWriteExt as _, - time::timeout -}; -use tower_http::decompression::DecompressionBody; - -use crate::map_dlerror; - -use super::{ - util, - ClientActorMessageHandle, - DownloadState, - HttpClient, - JoinSetResult, -}; - - -pub(super) async fn download( mut client: HttpClient - , headers: HeaderMap - , mut message: ClientActorMessageHandle - , io_timeout: Option ) -> JoinSetResult { - let filename = message.filename(); - let uri = message.uri(); - - // - get all informations to eventually existing file - let mut from = file_size(&filename).await; - - // - get infos to uri - let response_headers = util::head(&mut client, &uri, &headers).await - . map_err(map_dlerror!(message))?; - let content_length = util::content_length(&response_headers).ok(); - let content_type = util::content_type(&response_headers).ok(); - - message.set_state(DownloadState::GotHead); - - if let Some(content_length) = content_length { - if from != 0 && content_length - 1 <= from { - return Ok(None); - } - } else { - from = 0; - } - - // - do the neccessry request. - let mut headers = headers; - headers.insert(RANGE, format!("bytes={}-", from).parse().unwrap()); - - let mut response = util::request(&mut client, "GET", &uri, &headers).await - . map_err(map_dlerror!(message))?; - - // - open or create file - // - download Data - store_body( &mut open_outfile(&response.status(), &filename).await - , response.body_mut() - , io_timeout ) - . await - . map_err(map_dlerror!(message))?; - - message.set_state(DownloadState::Done { content_type }); - - Ok(Some(message)) -} - -pub(crate) async fn file_size(filename: impl AsRef) -> u64 { - // - get all informations to eventually existing file - let filename = filename.as_ref(); - let metadata = match symlink_metadata(filename).await { - Ok(metadata) => Some(metadata), - Err(error) => match error.kind() { - // If we can't write to a file we need to ... well theres nothing we can do - ErrorKind::PermissionDenied => panic!("Permission denied on: {:?}", filename), - _ => None, - } - }; - - metadata.map_or(0, |m| m.len()) -} - -pub(crate) async fn open_outfile(status: &StatusCode, filename: impl AsRef) -> File { - let filename = filename.as_ref(); - match status { - &StatusCode::PARTIAL_CONTENT => - // Here we assume that this response only comes if the requested - // range can be fullfilled and thus is the data range in the - // response. Thats why I do not check the content-range header. - // If that assumption does not hold this needs to be fixed. - File::options() . create(true) - . append(true) - . open(filename) - . await - . expect("can not create file for writing"), - - _ => - File::create(filename) . await - . expect("can not create file for writing"), - } -} - -pub(crate) async fn store_body( file: &mut File - , body: &mut DecompressionBody - , io_timeout: Option ) -> anyhow::Result<()> { - let mut body = BodyDataStream::new(body); - - loop { - let data_future = body.next(); - let data = if let Some(io_timeout) = io_timeout { - // give timeout somehow... probably from client. - timeout(io_timeout, body.next()).await? - } else { - data_future.await - }; - - match data { - None => break, - Some(Err(e)) => Err(anyhow!(e))?, - Some(Ok(data)) => { - file . write_all(data.as_ref()).await?; - file . flush().await?; - }, - } - }; - - Ok(()) -} diff --git a/src/client/error.rs b/src/client/error.rs deleted file mode 100644 index 4718a72..0000000 --- a/src/client/error.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::{error, fmt}; - -use http::Response; - -use super::ClientActorMessageHandle; - - -#[macro_export] -macro_rules! mk_dlerror { - ($message:ident, $($err:tt)*) => {{ - use $crate::client::error; - error::ClientError::new( $message.clone() - , Some(anyhow::anyhow!($($err)*) )) - }}; -} - -#[macro_export] -macro_rules! map_dlerror { - ($message:ident) => {{ - use $crate::client::error; - |e| error::ClientError::new( $message.clone() - , Some(anyhow::anyhow!(format!("{:?}", e)) )) - }}; -} - - -#[derive(Debug)] -pub(crate) struct ClientError { - pub(super) action: ClientActorMessageHandle, - pub(super) source: Option, -} - -#[derive(Debug)] -pub(super) struct RequestError { - pub(super) response: Option>, - pub(super) source: Option, -} - - -impl ClientError { - pub(super) fn new( action: ClientActorMessageHandle - , source: Option ) -> Self { - let action = action.to_owned(); - Self { action, source } - } -} - -impl error::Error for ClientError { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match &self.source { - None => None, - Some(e) => Some(e.as_ref()), - } - } -} - -impl fmt::Display for ClientError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.source { - None => write!(f, "download error: {:?}", self.action), - Some(err) => write!(f, "download error ({:?}): {}", self.action, err), - } - } -} - -impl RequestError { - pub(super) fn new( response: Option> - , source: Option ) -> Self { - Self { response, source } - } -} - -impl error::Error for RequestError { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match &self.source { - None => None, - Some(e) => Some(e.as_ref()), - } - } -} - -impl fmt::Display for RequestError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.source { - None => write!(f, "request error: {:?}", self.response), - Some(err) => write!(f, "request error ({:?}): {}", self.response, err), - } - } -} diff --git a/src/client/message.rs b/src/client/message.rs deleted file mode 100644 index ded4f93..0000000 --- a/src/client/message.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::path::PathBuf; - -use http::Uri; -use tokio::sync::oneshot; - -use super::{ActionIndex, DownloadResult, DownloadState}; - - -#[derive(Debug)] -pub(super) enum ClientActorMessage { - Download { - filename: PathBuf, - uri: Uri, - respond_to: oneshot::Sender, - }, - GetData { - uri: Uri, - respond_to: oneshot::Sender>>, - }, -} - -#[derive(Clone, Debug)] -pub(super) enum ClientActorMessageHandle { - Download { - filename: PathBuf, - uri: Uri, - state: Option, - message: ActionIndex, - }, - GetData { - uri: Uri, - buffer: Option>, - message: ActionIndex, - }, -} - - -impl ClientActorMessageHandle { - pub(super) fn set_state(&mut self, new_state: DownloadState) { - if let Self::Download { ref mut state, .. } = self { - *state = Some(new_state); - } else { - panic!("Called with invalid variant"); - } - } - - pub(super) fn filename(&self) -> PathBuf { - if let Self::Download { ref filename, .. } = self { - filename.clone() - } else { - panic!("called with invalid variant"); - } - } - - pub(super) fn uri(&self) -> Uri { - match self { - Self::Download { ref uri, .. } => uri.clone(), - Self::GetData { ref uri, .. } => uri.clone(), - } - } - - pub(super) fn buffer_mut(&mut self) -> &mut Option> { - if let Self::GetData { ref mut buffer, .. } = self { - buffer - } else { - panic!("called with invalid variant"); - } - } -} diff --git a/src/client/util.rs b/src/client/util.rs deleted file mode 100644 index 04f328e..0000000 --- a/src/client/util.rs +++ /dev/null @@ -1,67 +0,0 @@ -use anyhow::anyhow; -use http::{ - header::{CONTENT_LENGTH, CONTENT_TYPE}, - request::Builder as RequestBuilder, - HeaderMap, - Response, - Uri -}; -use reqwest::Body; -use tower_http::decompression::DecompressionBody; -use tower_http_client::ServiceExt as _; - -use super::{error::RequestError, HttpClient}; - -use log::debug; - - -pub(super) async fn request( client: &mut HttpClient - , method: &str - , uri: &Uri - , headers: &HeaderMap ) -> Result>, RequestError> { - let mut request = RequestBuilder::new() - . method(method) - . uri(uri) - . body(Body::default()) - . map_err(|e| RequestError::new(None, Some(e.into())))?; - - request.headers_mut().extend(headers.clone()); - - debug!("Request: {:?}", request); - - match client.execute(request).await { - Err(e) => Err(RequestError::new(None, Some(e))), - Ok(response) => { - debug!("Response: {:?}", response.headers()); - - if response.status().is_success() { - Ok(response) - } else { - Err(RequestError::new(Some(response.map(|_| ())), None)) - } - }, - } -} - -pub(super) async fn head( client: &mut HttpClient - , uri: &Uri - , headers: &HeaderMap ) -> Result { - Ok( request( client - , "HEAD" - , uri - , headers ).await?.headers().clone() ) -} - -pub(super) fn content_length(headers: &HeaderMap) -> anyhow::Result { - Ok( headers . get(CONTENT_LENGTH) - . ok_or(anyhow!("unable to get CONTENT-LENGTH value"))? - . to_str()? - . parse()? ) -} - -pub(super) fn content_type(headers: &HeaderMap) -> anyhow::Result { - Ok( headers . get(CONTENT_TYPE) - . ok_or(anyhow!("unable to get CONTENT-TYPE value"))? - . to_str()? - . to_string() ) -} diff --git a/src/main.rs b/src/main.rs index cbd03ec..189ba2e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,3 @@ -mod client; mod process; mod m3u8_download; mod client_new;