diff --git a/src/main.rs b/src/main.rs index 637eed5..d8a2ec5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,10 +4,10 @@ use anyhow::anyhow; use clap::Parser; use env_logger::Env; use futures_util::StreamExt; -use http::{uri::{Authority, Scheme}, Request, Response, Uri}; +use http::{header::CONTENT_TYPE, uri::{Authority, Scheme}, Request, Response, Uri}; use http_body_util::BodyDataStream; use m3u8_rs::Playlist; -use reqwest::Body; +use reqwest::{redirect::Policy, Body}; use tokio::{fs::File, io::AsyncWriteExt, task::{JoinError, JoinHandle}, time::timeout}; use tower::{ServiceBuilder, ServiceExt as _}; use tower_http_client::ServiceExt as _; @@ -49,7 +49,7 @@ impl State { let filename = Path::new(uri.path()).file_name().ok_or(anyhow!("name error"))?; let mut file = File::create(filename).await?; - let mut response = self.client.get(uri).send()?.await?; + let mut response = timeout(Duration::from_secs(10), self.client.get(uri).send()?).await??; anyhow::ensure!(response.status().is_success(), "response failed"); @@ -65,13 +65,36 @@ impl State { }, } }; - file.write_all(&body).await?; match m3u8_rs::parse_playlist(&body) { Result::Err(e) => Err(anyhow!("m3u8 parse error: {}", e)), Result::Ok((_, Playlist::MasterPlaylist(_))) => Err(anyhow!("Master playlist not supported now")), Result::Ok((_, Playlist::MediaPlaylist(pl))) => { + let segments: anyhow::Result> = pl.segments.iter().map(|s| { + Ok(match Uri::try_from(s.uri.clone()) { + Ok(uri) => { + let mut new_segment = s.clone(); + let filename = Path::new(uri.path()) + . file_name() + . ok_or(anyhow!("name error"))? + . to_str() + . ok_or(anyhow!("Error getting filename from uri"))?; + let query = uri.query() + . map(|q| "?".to_owned() + q) + . unwrap_or("".to_string()); + new_segment.uri = (filename.to_owned() + &query).to_string(); + new_segment } + + Err(_) => s.clone() + }) + }).collect(); + let mut out_pl = pl.clone(); + out_pl.segments = segments?; + let mut file_data: Vec = Vec::new(); + out_pl.write_to(&mut file_data)?; + file.write_all(&file_data).await?; + let uris: anyhow::Result> = pl.segments.iter().map(|s| { match Uri::try_from(s.uri.clone()) { Ok(uri) => { @@ -104,8 +127,21 @@ impl State { async fn get_m3u8_segment(&mut self, uri: &Uri) -> Result { if let Ok(send_fut) = self.client.get(uri).send() { - if let Ok(mut response) = send_fut.await { + if let Ok(mut response) = timeout(Duration::from_secs(10), send_fut).await.or(Err(uri.clone()))? { if response.status().is_success() { + let content_type = match response.headers()[CONTENT_TYPE].to_str() { + Err(_) => { + log!(Level::Debug, "Error getting content-type"); + Err(uri.clone()) + }, + Ok(h) => Ok(h) + }?; + log!(Level::Debug, "CONTENT-TYPE: {}", content_type); + if content_type != "video/MP2T" { + log!(Level::Error, "{} is not video/MP2T", content_type); + return Err(uri.clone()); + } + let path_and_query = uri.path_and_query().expect("No path and query").as_str(); let filename = Path::new(path_and_query).file_name().ok_or(uri.clone())?; let mut file = File::create(filename).await.or(Err(uri.clone()))?; @@ -174,7 +210,9 @@ async fn main() -> anyhow::Result<()> { . concurrency_limit(10) // Make client compatible with the `tower-http` layers. . layer(HttpClientLayer) - . service(reqwest::Client::new()) + . service(reqwest::Client::builder() + . redirect(Policy::limited(5)) + . build()? ) . map_err(anyhow::Error::msg) . boxed_clone(), };