From afdbce061ae12c7116108f194d828aa79686f44b Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Wed, 8 Jan 2025 01:06:29 +0100 Subject: [PATCH] Started with ClientActor --- src/client.rs | 25 ++- src/m3u8_download.rs | 45 +++++ src/main.rs | 40 +++-- src/new_client.rs | 348 ++++++++++++++++++++++++++++++++++++++ src/new_download_error.rs | 37 ++++ 5 files changed, 472 insertions(+), 23 deletions(-) create mode 100644 src/m3u8_download.rs create mode 100644 src/new_client.rs create mode 100644 src/new_download_error.rs diff --git a/src/client.rs b/src/client.rs index fe02576..d12d785 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,12 +2,25 @@ use std::{io::ErrorKind, path::Path, time::Duration}; use anyhow::anyhow; use futures_util::StreamExt as _; -use http::{header::{CONTENT_LENGTH, CONTENT_TYPE, RANGE}, request::Builder as RequestBuilder, uri::{Authority, Scheme}, HeaderValue, Request, Response, StatusCode, Uri}; +use http::{ + header::{CONTENT_LENGTH, CONTENT_TYPE, RANGE}, + request::Builder as RequestBuilder, + uri::{Authority, Scheme}, + HeaderValue, + Request, + Response, + StatusCode, + Uri +}; use http_body_util::BodyDataStream; use m3u8_rs::{MediaPlaylist, MediaSegment, Playlist}; use reqwest::{redirect::Policy, Body}; -use tokio::{fs::{symlink_metadata, File}, io::AsyncWriteExt as _, time::timeout}; -use tower::{ServiceBuilder, ServiceExt as _}; +use tokio::{ + fs::{symlink_metadata, File}, + io::AsyncWriteExt as _, + time::timeout +}; +use tower::{util::BoxCloneService, ServiceBuilder, ServiceExt as _}; use tower_http_client::{client::BodyReader, ServiceExt as _}; use tower_reqwest::HttpClientLayer; @@ -16,7 +29,7 @@ use crate::download_error::DownloadError; use log::{log, Level}; -type HttpClient = tower::util::BoxCloneService, Response, anyhow::Error>; +type HttpClient = BoxCloneService, Response, anyhow::Error>; #[derive(Clone, Debug)] pub struct State { @@ -27,9 +40,6 @@ pub struct State { client: HttpClient, } -unsafe impl Send for State {} -unsafe impl Sync for State {} - impl State { pub fn new(uri: &Uri, concurrency_limit: usize, timeout: Duration) -> anyhow::Result { @@ -41,6 +51,7 @@ impl State { . ok_or(anyhow!("Path problem"))? . to_str() . ok_or(anyhow!("Path problem"))?; + let state = State { scheme: scheme.clone(), auth: authority.clone(), diff --git a/src/m3u8_download.rs b/src/m3u8_download.rs new file mode 100644 index 0000000..46a77b6 --- /dev/null +++ b/src/m3u8_download.rs @@ -0,0 +1,45 @@ +use std::{ffi::OsString, path::Path}; + +use anyhow::anyhow; +use http::{uri::{Authority, Scheme}, Uri}; + + +#[derive(Debug)] +pub(super) enum DownloadState { + Open, // Nothing is done. + Prepared, // The Uris to all .ts downloads are prepared. + Ready, // All .ts downloads are done. +} + +struct TsPart { + filename: OsString, + url: Uri, + state: DownloadState, +} + +struct M3u8Download { + scheme: Scheme, + auth: Authority, + base_path: String, + ts_parts: Vec, +} + + +impl M3u8Download { + pub(super) fn new(uri: Uri) -> anyhow::Result { + let scheme = uri.scheme() + . ok_or(anyhow!("Problem scheme in m3u8 uri"))? + . clone(); + let auth= uri.authority() + . ok_or(anyhow!("Problem authority in m3u8 uri"))? + . clone(); + let base_path = Path::new(uri.path()).parent() + . ok_or(anyhow!("Path problem"))? + . to_str() + . ok_or(anyhow!("Path problem"))? + . to_string(); + let ts_parts = vec![]; + + Ok(Self {scheme, auth, base_path, ts_parts}) + } +} diff --git a/src/main.rs b/src/main.rs index 6b7ef74..a65e539 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,9 @@ mod download_error; mod client; mod process; +mod m3u8_download; +mod new_client; +mod new_download_error; use std::{ ffi::OsStr, @@ -12,9 +15,10 @@ use anyhow::anyhow; use clap::Parser; use env_logger::Env; use http::Uri; +use new_client::ClientActorHandle; use tokio::task::JoinSet; -use log::{log, Level}; +use log::{debug, error, info}; use process::{enter_download_dir, ffmpeg, remove_download_dir}; @@ -48,10 +52,10 @@ async fn main() -> anyhow::Result<()> { . write_style_or("LOG_STYLE", "always"); env_logger::init_from_env(env); - log!(Level::Info, "Parse command line arguments..."); + info!("Parse command line arguments..."); let args = Args::parse(); - log!(Level::Debug, "Arguments: {:?}", args); + debug!("Arguments: {:?}", args); let name = args.name.as_path(); @@ -71,38 +75,42 @@ async fn main() -> anyhow::Result<()> { let m3u8_path_and_query = m3u8_uri.path_and_query() . ok_or(anyhow!("Problem path and query in m3u8 uri"))?; - log!(Level::Info, "Create and chdir into temporary download dir..."); + info!("Create and chdir into temporary download dir..."); let basename = enter_download_dir(&name).await?; - log!(Level::Info, "Creating an HTTP client with Tower layers..."); + info!("Creating an HTTP client with Tower layers..."); let mut state = client::State::new(&m3u8_uri, concurrency, timeout)?; - log!(Level::Info, "Get segments..."); + let handle = ClientActorHandle::new(timeout); + handle.download("foo.m3u8", &m3u8_uri).await; + handle.stop(); + + info!("Get segments..."); let mut segments = state.get_m3u8_segment_uris(m3u8_path_and_query.as_str()).await?; - log!(Level::Info, "Sending concurrent requests..."); + info!("Sending concurrent requests..."); let mut join_set = JoinSet::new(); while let Some(segment) = segments.pop() { - log!(Level::Info, "Spawn task for: {}", segment); + info!("Spawn task for: {}", segment); let mut state = state.clone(); join_set.spawn(async move { state.get_m3u8_segment(&segment).await }); } - 'working: while let Some(result) = join_set.join_next().await { + while let Some(result) = join_set.join_next().await { match result { Err(e) => { - log!(Level::Error, "FATAL Join failed: {}", e); - break 'working + error!("FATAL Join failed: {}", e); + break }, Ok(Err(e)) => { - log!(Level::Info, "Retry failed download: {:?}", e); + info!("Retry failed download: {:?}", e); let mut state = state.clone(); join_set.spawn(async move { state.get_m3u8_segment(&e.uri).await @@ -110,18 +118,18 @@ async fn main() -> anyhow::Result<()> { }, Ok(Ok(v)) => { - log!(Level::Info, "Done download: {}", v); + info!("Done download: {}", v); }, } } - log!(Level::Info, "Call ffmpeg to join ts files to single mp4..."); + info!("Call ffmpeg to join ts files to single mp4..."); let status = ffmpeg(&name, &m3u8_uri).await?; - log!(Level::Info, "ffmpeg status: {}", status); + debug!("ffmpeg status: {}", status); - log!(Level::Info, "Leave and remove temporary download dir..."); + info!("Leave and remove temporary download dir..."); remove_download_dir(&basename).await?; diff --git a/src/new_client.rs b/src/new_client.rs new file mode 100644 index 0000000..8fa929d --- /dev/null +++ b/src/new_client.rs @@ -0,0 +1,348 @@ +use std::{collections::HashMap, io::ErrorKind, path::{Path, PathBuf}, time::Duration}; + +use futures_util::StreamExt as _; +use http::{ + header::{CONTENT_LENGTH, RANGE}, + request::Builder as RequestBuilder, + HeaderMap, + HeaderValue, + Request, + Response, + StatusCode, + Uri +}; +use http_body_util::BodyDataStream; +use reqwest::{redirect::Policy, Body}; +use tokio::{ + fs::{symlink_metadata, File}, + io::AsyncWriteExt as _, + select, + sync::{mpsc, oneshot}, + task::JoinSet, + time::timeout +}; +use tower::{util::BoxCloneService, ServiceBuilder, ServiceExt as _}; +use tower_http_client::ServiceExt as _; +use tower_reqwest::HttpClientLayer; + +use crate::{ + new_download_error::DownloadError, + m3u8_download::DownloadState +}; + +use log::{debug, error, info}; + + +#[derive(Debug)] +pub(super) enum ClientActorMessage { + Download { + filename: PathBuf, + uri: Uri, + respond_to: oneshot::Sender, + }, +} + +#[derive(Clone, Debug)] +pub(super) enum ClientActorMessageHandle { + Download { + filename: PathBuf, + uri: Uri, + message: ActionIndex, + }, +} + +pub(super) type ActionIndex = u64; +type JoinSetResult = Result, DownloadError>; +type HttpClient = BoxCloneService, Response, anyhow::Error>; + + +#[derive(Debug)] +struct ClientActor { + timeout: Duration, + client: HttpClient, + tasks: JoinSet, + actions: HashMap, + actions_idx: ActionIndex, + + receiver: mpsc::Receiver, +} + + +pub(super) struct ClientActorHandle { + sender: mpsc::Sender, + abort: oneshot::Sender, +} + +async fn run_client(mut actor: ClientActor) { + loop { + select! { + Some(join) = actor.tasks.join_next() => { + match join { + Err(e) => { + error!("FATAL Join failed: {}", e); + break + }, + + Ok(Err(e)) => { + info!("Retry failed download: {:?}", e); + // retry ... instead of responing here we could also respond + // with something that in turn would be used to retry... + let client = actor.client.clone(); + actor.tasks.spawn(async move { + download(client, e.action, actor.timeout).await + }); + }, + + // when the task finishes + Ok(Ok(Some(action))) => { + info!("Done download: {:?}", action); + use ClientActorMessageHandle::Download; + + match action { + Download { filename: _, uri: _, ref message } => { + if let Some((_, message)) = actor.actions.remove_entry(message) { + use ClientActorMessage::Download; + let Download { filename: _, uri: _, respond_to } = message; + let _ = respond_to.send(DownloadState::Ready); + } else { + panic!("Lost a message"); + } + } + } + }, + + // Got a stop message...here we still continue procession until the + // JoinSet is empty. + Ok(Ok(None)) => (), + }; + } + + Some(message) = actor.receiver.recv() => { + actor.handle_message(message).await; + } + + else => {} + } + } +} + +async fn file_size(filename: &Path) -> u64 { + // - get all informations to eventually existing file + let metadata = match symlink_metadata(filename).await { + Ok(metadata) => Some(metadata), + Err(error) => match error.kind() { + // If we can't write to a file we need to ... well theres nothing we can do + ErrorKind::PermissionDenied => panic!("Permission denied on: {:?}", filename), + _ => None, + } + }; + + metadata.map_or(0, |m| m.len()) +} + +async fn request( client: &mut HttpClient + , method: &str + , uri: &Uri + , headers: HeaderMap ) -> anyhow::Result> { + let mut request = RequestBuilder::new() + . method(method) + . uri(uri) + . body(Body::default())?; + + request.headers_mut().extend(headers); + + debug!("New Request: {:?}", request); + + let response = client.execute(request).await?; + + debug!("New Response: {:?}", response); + + anyhow::ensure!( response.status().is_success() + , "resonse status failed: {}" + , response.status() ); + + Ok(response) +} + +async fn content_length( client: &mut HttpClient + , uri: &Uri ) -> anyhow::Result> { + let head = request(client, "HEAD", uri, HeaderMap::new()).await?; + + Ok(head . headers().get(CONTENT_LENGTH) + . map(|v| v . to_str() + . expect("unable to get CONTENT-LENGTH value") + . parse::() + . expect("unable to parse CONTENT-LENGTH value"))) +} + +async fn open_outfile(status: &StatusCode, filename: &Path) -> File { + match status { + &StatusCode::PARTIAL_CONTENT => + // Here we assume that this response only comes if the requested + // range was fullfillable and thus is the data range in the + // response. Thats why I do not check the content-range header. + // If that assumption does not hold this needs to be fixec. + File::options() . create(true) + . append(true) + . open(filename) + . await + . expect("can not create file for writing"), + + _ => + File::create(filename) . await + . expect("can not create file for writing"), + } +} + +async fn store_body( file: &mut File + , body: &mut Body + , io_timeout: Duration ) -> anyhow::Result<()> { + let mut body = BodyDataStream::new(body); + + loop { + // give timeout somehow... probably from client. + let data = timeout(io_timeout, body.next()).await?; + match data { + None => break, + Some(Err(e)) => return Err(e.into()), + Some(Ok(data)) => { + file . write_all(data.as_ref()).await?; + file . flush().await?; + }, + } + }; + + Ok(()) +} + +async fn download( mut client: HttpClient + , message: ClientActorMessageHandle + , io_timeout: Duration ) -> JoinSetResult { + let ClientActorMessageHandle::Download { ref filename, ref uri, message: _ } = message; + + // - get all informations to eventually existing file + let mut from = file_size(filename).await; + + // - get infos to uri + let content_length = content_length(&mut client, uri).await + . map_err(|e| DownloadError::new(message.clone(), Some(e)))?; + + if let Some(content_length) = content_length { + if from != 0 && content_length - 1 <= from { + return Ok(None); + } + } else { + from = 0; + } + + // - do the neccessry request. + let range_value: HeaderValue = format!("bytes={}-", from) + . parse() + . expect("Error creating range header value"); + let mut headers = HeaderMap::new(); + headers.insert(RANGE, range_value); + + let mut response = request(&mut client, "GET", uri, headers).await + . map_err(|e| DownloadError::new(message.clone(), Some(e)))?; + + // - open or create file + // - download Data + store_body( &mut open_outfile(&response.status(), filename).await + , response.body_mut() + , io_timeout ) + . await + . map_err(|e| DownloadError::new(message.clone(), Some(e)))?; + + Ok(Some(message)) +} + + +impl ClientActor { + pub(super) fn new( concurrency_limit: usize + , timeout: Duration + , receiver: mpsc::Receiver + , abort_rx: oneshot::Receiver ) -> anyhow::Result { + let client = ServiceBuilder::new() + // Add some layers. + . concurrency_limit(concurrency_limit) + . timeout(timeout) + // Make client compatible with the `tower-http` layers. + . layer(HttpClientLayer) + . service( reqwest::Client::builder() + . redirect(Policy::limited(5)) + . build()? ) + . map_err(anyhow::Error::msg) + . boxed_clone(); + + debug!("-> client: {:?}", client); + + let mut tasks = JoinSet::new(); + + tasks.spawn(async move { + let _ = abort_rx.await; + Ok(None) + }); + + let actions = HashMap::new(); + let actions_idx = 0; + + Ok(Self {timeout, client, tasks, receiver, actions, actions_idx}) + } + + async fn handle_message(&mut self, message: ClientActorMessage) { + self.actions.insert(self.actions_idx, message); + + use ClientActorMessage::Download; + + match self.actions.get(&self.actions_idx) { + Some(Download { ref filename, ref uri, respond_to: _ }) => { + // spawn a task that does the work + let client = self.client.clone(); + let timeout = self.timeout; + + let handle = ClientActorMessageHandle::Download { + filename: filename.to_path_buf(), + uri: uri.clone(), + message: self.actions_idx, + }; + + self.tasks.spawn(async move { + download(client, handle, timeout).await + }); + + self.actions_idx += 1; + }, + + None => (), + } + } +} + +impl ClientActorHandle { + pub(super) fn new(timeout: Duration) -> Self { + let (sender, receiver) = mpsc::channel(1); + let (abort, abort_rx) = oneshot::channel::(); + let actor = ClientActor::new( 20 + , timeout + , receiver + , abort_rx ) + . expect("Client create error"); + tokio::spawn(run_client(actor)); + + Self { sender, abort } + } + + pub(super) fn stop(self) { + let _ = self.abort.send(Ok(None)); + } + + pub(super) async fn download(&self, filename: impl AsRef, uri: &Uri) { + let filename = filename.as_ref().to_path_buf(); + let uri = uri.to_owned(); + let (send, receive) = oneshot::channel(); + let msg = ClientActorMessage::Download { filename, uri, respond_to: send }; + + let _ = self.sender.send(msg).await; + receive.await.expect("Actor cancelled unexpected"); + } +} diff --git a/src/new_download_error.rs b/src/new_download_error.rs new file mode 100644 index 0000000..b026140 --- /dev/null +++ b/src/new_download_error.rs @@ -0,0 +1,37 @@ +use std::{error, fmt}; + +use crate::new_client::ClientActorMessageHandle; + + +#[derive(Debug)] +pub(super) struct DownloadError { + pub(super) action: ClientActorMessageHandle, + pub(super) source: Option, +} + + +impl DownloadError { + pub(super) fn new( action: ClientActorMessageHandle + , source: Option ) -> Self { + let action = action.to_owned(); + Self { action, source } + } +} + +impl error::Error for DownloadError { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match &self.source { + None => None, + Some(e) => Some(e.as_ref()), + } + } +} + +impl fmt::Display for DownloadError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.source { + None => write!(f, "download error: {:?}", self.action), + Some(err) => write!(f, "download error ({:?}): {}", self.action, err), + } + } +}