From cc9c7e50facd229295789d77237d22f57b16746d Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Wed, 25 Dec 2024 15:33:10 +0100 Subject: [PATCH] First working poc --- Cargo.lock | 1 + Cargo.toml | 3 +- src/main.rs | 162 +++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 125 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c60cd72..923ef15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -437,6 +437,7 @@ dependencies = [ "env_logger", "futures-util", "http", + "http-body-util", "log", "m3u8-rs", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index f0c35df..86e5886 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,11 @@ clap = { version = "4.5", features = [ "derive" ] } env_logger = "0.11" futures-util = "0.3" http = "1.2" +http-body-util = "0.1" log = "0.4" m3u8-rs = "6.0" reqwest = "0.12" tokio = { version = "1.42", features = [ "macros", "rt-multi-thread" ] } -tower = { version = "0.5", features = [ "buffer", "limit" ] } +tower = { version = "0.5", features = [ "buffer", "limit", "timeout" ] } tower-http-client = "0.4" tower-reqwest = "0.4" diff --git a/src/main.rs b/src/main.rs index 7a55e24..637eed5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,16 @@ -use std::time::Duration; +use std::{path::Path, time::Duration}; use anyhow::anyhow; use clap::Parser; use env_logger::Env; +use futures_util::StreamExt; use http::{uri::{Authority, Scheme}, Request, Response, Uri}; +use http_body_util::BodyDataStream; use m3u8_rs::Playlist; use reqwest::Body; +use tokio::{fs::File, io::AsyncWriteExt, task::{JoinError, JoinHandle}, time::timeout}; use tower::{ServiceBuilder, ServiceExt as _}; -use tower_http_client::{client::BodyReader, ServiceExt as _}; +use tower_http_client::ServiceExt as _; use tower_reqwest::HttpClientLayer; use log::{log, Level}; @@ -19,6 +22,7 @@ struct DownloadMessage { url: String, } + #[derive(Debug, Parser)] struct Args { #[arg(short, long)] @@ -31,37 +35,103 @@ type HttpClient = tower::util::BoxCloneService, Response, an struct State { scheme: Scheme, auth: Authority, + base_path: String, client: HttpClient, } impl State { - async fn get(&mut self, path_and_query: &str) -> anyhow::Result<()> { + async fn get_m3u8_segment_uris(&mut self, path_and_query: &str) -> anyhow::Result> { let uri = Uri::builder() . scheme(self.scheme.clone()) . authority(self.auth.clone()) . path_and_query(path_and_query) . build()?; + let filename = Path::new(uri.path()).file_name().ok_or(anyhow!("name error"))?; + let mut file = File::create(filename).await?; - let mut response = self - . client - . get(uri) - . send()? - . await?; + let mut response = self.client.get(uri).send()?.await?; anyhow::ensure!(response.status().is_success(), "response failed"); - log!(Level::Debug, "-> Response: {:?}", response); - let body = BodyReader::new(response.body_mut()).bytes().await?; - match m3u8_rs::parse_playlist(&body) { - Result::Ok((_, Playlist::MasterPlaylist(pl))) => - log!(Level::Info, "Master playlist: {:?}", pl), - Result::Ok((_, Playlist::MediaPlaylist(pl))) => - log!(Level::Info, "Media playlist: {:?}", pl), - Result::Err(e) => - Err(anyhow!("Parsing error: {}", e))?, + let mut body = vec![]; + let mut body_stream = BodyDataStream::new(response.body_mut()); + 'label: loop { + let data = timeout(Duration::from_secs(10), body_stream.next()).await?; + match data { + None => break 'label, + Some(Err(_)) => break 'label, + Some(Ok(data)) => { + body.append(&mut Vec::from(data.as_ref())); + }, + } }; + file.write_all(&body).await?; + + match m3u8_rs::parse_playlist(&body) { + Result::Err(e) => Err(anyhow!("m3u8 parse error: {}", e)), + Result::Ok((_, Playlist::MasterPlaylist(_))) => + Err(anyhow!("Master playlist not supported now")), + Result::Ok((_, Playlist::MediaPlaylist(pl))) => { + let uris: anyhow::Result> = pl.segments.iter().map(|s| { + match Uri::try_from(s.uri.clone()) { + Ok(uri) => { + let scheme = uri.scheme() + . or(Some(&self.scheme)) + . ok_or(anyhow!("No scheme in Uri"))?; + let auth = uri.authority() + . or(Some(&self.auth)) + . ok_or(anyhow!("No authority in Uri"))?; + let path_and_query = uri.path_and_query() + . ok_or(anyhow!("No path in Uri"))?; + Ok(Uri::builder() . scheme(scheme.clone()) + . authority(auth.clone()) + . path_and_query(path_and_query.clone()) + . build()?) } + + Err(e) => { + log!(Level::Debug, "Uri parse error: {:?} - {}", e, s.uri); + Ok(Uri::builder() . scheme(self.scheme.clone()) + . authority(self.auth.clone()) + . path_and_query(self.base_path.clone() + "/" + &s.uri) + . build()?) } + } + }).collect(); + + uris + }, + } + } - Ok(()) + async fn get_m3u8_segment(&mut self, uri: &Uri) -> Result { + if let Ok(send_fut) = self.client.get(uri).send() { + if let Ok(mut response) = send_fut.await { + if response.status().is_success() { + let path_and_query = uri.path_and_query().expect("No path and query").as_str(); + let filename = Path::new(path_and_query).file_name().ok_or(uri.clone())?; + let mut file = File::create(filename).await.or(Err(uri.clone()))?; + + let mut body_stream = BodyDataStream::new(response.body_mut()); + 'label: loop { + let data = timeout(Duration::from_secs(10), body_stream.next()).await.or(Err(uri.clone()))?; + match data { + None => break 'label, + Some(Err(_)) => return Err(uri.clone()), + Some(Ok(data)) => { + file.write_all(data.as_ref()).await.or(Err(uri.clone()))?; + }, + } + }; + + Ok(uri.clone()) + } else { + Err(uri.clone()) + } + } else { + Err(uri.clone()) + } + } else { + Err(uri.clone()) + } } } @@ -87,16 +157,21 @@ async fn main() -> anyhow::Result<()> { . ok_or(anyhow!("Problem scheme in m3u8 uri"))?; let m3u8_auth = m3u8_uri.authority() . ok_or(anyhow!("Problem authority in m3u8 uri"))?; + let m3u8_base_path = Path::new(m3u8_uri.path()).parent() + . ok_or(anyhow!("Path problem"))? + . to_str() + . ok_or(anyhow!("Path problem"))?; let m3u8_path_and_query = m3u8_uri.path_and_query() . ok_or(anyhow!("Problem path and query in m3u8 uri"))?; - let state = State { + let mut state = State { scheme: m3u8_scheme.clone(), auth: m3u8_auth.clone(), + base_path: m3u8_base_path.to_string(), client: ServiceBuilder::new() // Add some layers. . buffer(64) . rate_limit(10, Duration::from_secs(1)) - . concurrency_limit(50) + . concurrency_limit(10) // Make client compatible with the `tower-http` layers. . layer(HttpClientLayer) . service(reqwest::Client::new()) @@ -108,32 +183,39 @@ async fn main() -> anyhow::Result<()> { // I think about a worker pool... probably of concurrency_limit amount. // The worker needs to get the url. Our first target is to store the // data on a file with the same name as the last part of the URL. + // CURRENTLY I just create a task for each download. + + log!(Level::Info, "-> Get segments..."); + + let mut segments = state.get_m3u8_segment_uris(m3u8_path_and_query.as_str()).await?; log!(Level::Info, "-> Sending concurrent requests..."); - let tasks = (0..1).map({ - |i| { + 'working: while ! segments.is_empty() { + let mut tasks: Vec>> = vec![]; + while let Some(segment) = segments.pop() { let state = state.clone(); - let m3u8_path_and_query = m3u8_path_and_query.clone(); - tokio::spawn(async move { + tasks.push(tokio::spawn(async move { let mut state = state.clone(); - state.get(m3u8_path_and_query.as_str()).await?; - log!( Level::Info - , "[task {}]: {} completed successfully!" - , i - , m3u8_path_and_query ); - - anyhow::Ok(()) - }) + state.get_m3u8_segment(&segment).await + })); } - }); - - let results = futures_util::future::join_all(tasks).await; - for result in &results { - match result { - Err(e) => log!(Level::Error, "{}", e), - Ok(Err(e)) => log!(Level::Error, "{}", e), - _ => (), + + let results: Vec, JoinError>> = futures_util::future::join_all(tasks).await; + for result in &results { + match result { + Err(e) => { + log!(Level::Error, "FATAL Join failed: {}", e); + break 'working + }, + + Ok(Err(e)) => { + log!(Level::Info, "Retry failed download: {}", e); + segments.push(e.clone()); + }, + + _ => (), + } } }