diff --git a/src/client.rs b/src/client.rs index d4b448a..cf9f161 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,15 +5,16 @@ use futures_util::StreamExt as _; use http::{header::CONTENT_TYPE, uri::{Authority, Scheme}, Request, Response, Uri}; use http_body_util::BodyDataStream; use log::{log, Level}; -use m3u8_rs::Playlist; +use m3u8_rs::{MediaPlaylist, MediaSegment, Playlist}; use reqwest::{redirect::Policy, Body}; use tokio::{fs::File, io::AsyncWriteExt as _, time::timeout}; use tower::{ServiceBuilder, ServiceExt as _}; -use tower_http_client::ServiceExt as _; +use tower_http_client::{client::BodyReader, ServiceExt as _}; use tower_reqwest::HttpClientLayer; use crate::download_error::DownloadError; + type HttpClient = tower::util::BoxCloneService, Response, anyhow::Error>; #[derive(Clone, Debug)] @@ -40,7 +41,7 @@ impl State { scheme: scheme.clone(), auth: authority.clone(), base_path: base_path.to_string(), - timeout: Duration::from_secs(30), + timeout: Duration::from_secs(10), client: ServiceBuilder::new() // Add some layers. . buffer(64) @@ -71,82 +72,22 @@ impl State { . authority(self.auth.clone()) . path_and_query(path_and_query) . build()?; - let filename = Path::new(uri.path()).file_name().ok_or(anyhow!("name error"))?; - let mut file = File::create(filename).await?; - - let mut response = timeout(self.timeout, self.client.get(uri).send()?).await??; - anyhow::ensure!(response.status().is_success(), "response failed"); + let mut response = self.request(&uri).await?; // read body into Vec - let mut body = vec![]; - let mut body_stream = BodyDataStream::new(response.body_mut()); - 'label: loop { - let data = timeout(self.timeout, body_stream.next()).await?; - match data { - None => break 'label, - Some(Err(_)) => break 'label, - Some(Ok(data)) => { - body.append(&mut Vec::from(data.as_ref())); - }, - } - }; + let body: Vec = BodyReader::new(response.body_mut()) + . bytes().await?.to_vec(); match m3u8_rs::parse_playlist(&body) { Result::Err(e) => Err(anyhow!("m3u8 parse error: {}", e)), + Result::Ok((_, Playlist::MasterPlaylist(_))) => Err(anyhow!("Master playlist not supported now")), + Result::Ok((_, Playlist::MediaPlaylist(pl))) => { - let segments: anyhow::Result> = pl.segments.iter().map(|s| { - Ok(match Uri::try_from(s.uri.clone()) { - Ok(uri) => { - let mut new_segment = s.clone(); - let filename = Path::new(uri.path()) - . file_name() - . ok_or(anyhow!("name error"))? - . to_str() - . ok_or(anyhow!("Error getting filename from uri"))?; - let query = uri.query() - . map(|q| "?".to_owned() + q) - . unwrap_or("".to_string()); - new_segment.uri = (filename.to_owned() + &query).to_string(); - new_segment } - - Err(_) => s.clone() - }) - }).collect(); - let mut out_pl = pl.clone(); - out_pl.segments = segments?; - let mut file_data: Vec = Vec::new(); - out_pl.write_to(&mut file_data)?; - file.write_all(&file_data).await?; - - let uris: anyhow::Result> = pl.segments.iter().map(|s| { - match Uri::try_from(s.uri.clone()) { - Ok(uri) => { - let scheme = uri.scheme() - . or(Some(&self.scheme)) - . ok_or(anyhow!("No scheme in Uri"))?; - let auth = uri.authority() - . or(Some(&self.auth)) - . ok_or(anyhow!("No authority in Uri"))?; - let path_and_query = uri.path_and_query() - . ok_or(anyhow!("No path in Uri"))?; - Ok(Uri::builder() . scheme(scheme.clone()) - . authority(auth.clone()) - . path_and_query(path_and_query.clone()) - . build()?) } - - Err(e) => { - log!(Level::Debug, "Uri parse error: {:?} - {}", e, s.uri); - Ok(Uri::builder() . scheme(self.scheme.clone()) - . authority(self.auth.clone()) - . path_and_query(self.base_path.clone() + "/" + &s.uri) - . build()?) } - } - }).collect(); - - uris + Self::write_playlist(&uri, &pl).await?; + pl.segments.iter().map(|s| self.download_uri(s) ).collect() }, } } @@ -154,23 +95,8 @@ impl State { pub(super) async fn get_m3u8_segment(&mut self, uri: &Uri) -> Result<(), DownloadError> { - // Send get request with timeout. - let send_fut = self.client - . get(uri) - . send() - . map_err(|e| DownloadError::new(uri.clone(), Some(e.into())))?; - let mut response = timeout(self.timeout, send_fut).await - . map_err(|e| DownloadError::new(uri.clone(), Some(e.into())) )? - . map_err(|e| DownloadError::new(uri.clone(), Some(e)) )?; - - // This handling needs to be more elaborate... a distingtion needs to be made - // between temporary and permanent failures. - if ! response.status().is_success() { - return Err(DownloadError::new( - uri.clone() - , Some(anyhow::Error::msg("request unsuccessfull")) - )); - } + let mut response = self.request(uri).await + . map_err(|e| DownloadError::new(uri.clone(), Some(e)))?; // We always need the content-type to be able to decide let content_type = response.headers()[CONTENT_TYPE].to_str() @@ -180,7 +106,7 @@ impl State { let message = format!("unexpected content-type: {}", content_type); log!(Level::Debug, "{}", message); return Err(DownloadError::new( uri.clone() - , Some(anyhow::Error::msg(message)) )); + , Some(anyhow!(message)) )); } // I consider a missing path as fatal... there is absolutely nothing we can do about it @@ -212,4 +138,80 @@ impl State { Ok(()) } + + fn download_uri(&self, segment: &MediaSegment) -> anyhow::Result + { + match Uri::try_from(segment.uri.clone()) { + Ok(uri) => { + let scheme = uri.scheme().unwrap_or(&self.scheme); + let auth = uri.authority().unwrap_or(&self.auth); + let path_and_query = uri.path_and_query() + . ok_or(anyhow!("No path in Uri"))?; + + Ok(Uri::builder() . scheme(scheme.clone()) + . authority(auth.clone()) + . path_and_query(path_and_query.clone()) + . build()?) + } + + Err(_) => { + Ok(Uri::builder() . scheme(self.scheme.clone()) + . authority(self.auth.clone()) + . path_and_query(self.base_path.clone() + "/" + &segment.uri) + . build()?) + } + } + } + + fn uri_relative_path_and_query(uri: &Uri) -> anyhow::Result + { + let filename = Path::new(uri.path()) + . file_name() + . ok_or(anyhow!("name error"))? + . to_str() + . ok_or(anyhow!("Error getting filename from uri"))?; + let query = uri.query() + . map(|q| "?".to_owned() + q) + . unwrap_or("".to_string()); + Ok((filename.to_owned() + &query).to_string()) + } + + async fn request(&mut self, uri: &Uri) -> anyhow::Result> + { + // Send get request with timeout. + let send_fut = self.client.get(uri).send()?; + let response = timeout(self.timeout, send_fut).await??; + + anyhow::ensure!( response.status().is_success() + , "resonse status failed: {}" + , response.status() ); + Ok(response) + } + + async fn write_playlist(uri: &Uri, playlist: &MediaPlaylist) -> anyhow::Result<()> { + let filename = Path::new(uri.path()) + . file_name() + . ok_or(anyhow!("can't extract filename from uri"))?; + let mut file = File::create(filename).await?; + + let segments: anyhow::Result> = playlist.segments.iter().map(|s| { + let mut new_segment = s.clone(); + Ok(match Uri::try_from(s.uri.clone()) { + Ok(uri) => { + new_segment.uri = Self::uri_relative_path_and_query(&uri)?; + new_segment + } + + Err(_) => new_segment + }) + }).collect(); + + let mut out_pl = playlist.clone(); + out_pl.segments = segments?; + let mut file_data = vec![]; + out_pl.write_to(&mut file_data)?; + file.write_all(&file_data).await?; + + Ok(()) + } } diff --git a/src/main.rs b/src/main.rs index fab7306..16824b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -73,6 +73,7 @@ async fn main() -> anyhow::Result<()> { 'working: while ! segments.is_empty() { let mut tasks = vec![]; while let Some(segment) = segments.pop() { + log!(Level::Info, "download segment: {}", segment); let state = state.clone(); tasks.push(tokio::spawn(async move { let mut state = state.clone(); @@ -89,7 +90,7 @@ async fn main() -> anyhow::Result<()> { }, Ok(Err(e)) => { - log!(Level::Info, "Retry failed download: {}", e); + log!(Level::Warn, "Retry failed download: {}", e); segments.push(e.uri.clone()); },