Browse Source

splitted the code in multiple functions... to make it easier to read

main
Georg Hopp 11 months ago
parent
commit
6d7704a0e7
Signed by: ghopp GPG Key ID: 4C5D226768784538
  1. 178
      src/client.rs
  2. 3
      src/main.rs

178
src/client.rs

@ -5,15 +5,16 @@ use futures_util::StreamExt as _;
use http::{header::CONTENT_TYPE, uri::{Authority, Scheme}, Request, Response, Uri};
use http_body_util::BodyDataStream;
use log::{log, Level};
use m3u8_rs::Playlist;
use m3u8_rs::{MediaPlaylist, MediaSegment, Playlist};
use reqwest::{redirect::Policy, Body};
use tokio::{fs::File, io::AsyncWriteExt as _, time::timeout};
use tower::{ServiceBuilder, ServiceExt as _};
use tower_http_client::ServiceExt as _;
use tower_http_client::{client::BodyReader, ServiceExt as _};
use tower_reqwest::HttpClientLayer;
use crate::download_error::DownloadError;
type HttpClient = tower::util::BoxCloneService<Request<Body>, Response<Body>, anyhow::Error>;
#[derive(Clone, Debug)]
@ -40,7 +41,7 @@ impl State {
scheme: scheme.clone(),
auth: authority.clone(),
base_path: base_path.to_string(),
timeout: Duration::from_secs(30),
timeout: Duration::from_secs(10),
client: ServiceBuilder::new()
// Add some layers.
. buffer(64)
@ -71,82 +72,22 @@ impl State {
. authority(self.auth.clone())
. path_and_query(path_and_query)
. build()?;
let filename = Path::new(uri.path()).file_name().ok_or(anyhow!("name error"))?;
let mut file = File::create(filename).await?;
let mut response = timeout(self.timeout, self.client.get(uri).send()?).await??;
anyhow::ensure!(response.status().is_success(), "response failed");
let mut response = self.request(&uri).await?;
// read body into Vec<u8>
let mut body = vec![];
let mut body_stream = BodyDataStream::new(response.body_mut());
'label: loop {
let data = timeout(self.timeout, body_stream.next()).await?;
match data {
None => break 'label,
Some(Err(_)) => break 'label,
Some(Ok(data)) => {
body.append(&mut Vec::from(data.as_ref()));
},
}
};
let body: Vec<u8> = BodyReader::new(response.body_mut())
. bytes().await?.to_vec();
match m3u8_rs::parse_playlist(&body) {
Result::Err(e) => Err(anyhow!("m3u8 parse error: {}", e)),
Result::Ok((_, Playlist::MasterPlaylist(_))) =>
Err(anyhow!("Master playlist not supported now")),
Result::Ok((_, Playlist::MediaPlaylist(pl))) => {
let segments: anyhow::Result<Vec<_>> = pl.segments.iter().map(|s| {
Ok(match Uri::try_from(s.uri.clone()) {
Ok(uri) => {
let mut new_segment = s.clone();
let filename = Path::new(uri.path())
. file_name()
. ok_or(anyhow!("name error"))?
. to_str()
. ok_or(anyhow!("Error getting filename from uri"))?;
let query = uri.query()
. map(|q| "?".to_owned() + q)
. unwrap_or("".to_string());
new_segment.uri = (filename.to_owned() + &query).to_string();
new_segment }
Err(_) => s.clone()
})
}).collect();
let mut out_pl = pl.clone();
out_pl.segments = segments?;
let mut file_data: Vec<u8> = Vec::new();
out_pl.write_to(&mut file_data)?;
file.write_all(&file_data).await?;
let uris: anyhow::Result<Vec<_>> = pl.segments.iter().map(|s| {
match Uri::try_from(s.uri.clone()) {
Ok(uri) => {
let scheme = uri.scheme()
. or(Some(&self.scheme))
. ok_or(anyhow!("No scheme in Uri"))?;
let auth = uri.authority()
. or(Some(&self.auth))
. ok_or(anyhow!("No authority in Uri"))?;
let path_and_query = uri.path_and_query()
. ok_or(anyhow!("No path in Uri"))?;
Ok(Uri::builder() . scheme(scheme.clone())
. authority(auth.clone())
. path_and_query(path_and_query.clone())
. build()?) }
Err(e) => {
log!(Level::Debug, "Uri parse error: {:?} - {}", e, s.uri);
Ok(Uri::builder() . scheme(self.scheme.clone())
. authority(self.auth.clone())
. path_and_query(self.base_path.clone() + "/" + &s.uri)
. build()?) }
}
}).collect();
uris
Self::write_playlist(&uri, &pl).await?;
pl.segments.iter().map(|s| self.download_uri(s) ).collect()
},
}
}
@ -154,23 +95,8 @@ impl State {
pub(super) async fn get_m3u8_segment(&mut self, uri: &Uri)
-> Result<(), DownloadError>
{
// Send get request with timeout.
let send_fut = self.client
. get(uri)
. send()
. map_err(|e| DownloadError::new(uri.clone(), Some(e.into())))?;
let mut response = timeout(self.timeout, send_fut).await
. map_err(|e| DownloadError::new(uri.clone(), Some(e.into())) )?
. map_err(|e| DownloadError::new(uri.clone(), Some(e)) )?;
// This handling needs to be more elaborate... a distingtion needs to be made
// between temporary and permanent failures.
if ! response.status().is_success() {
return Err(DownloadError::new(
uri.clone()
, Some(anyhow::Error::msg("request unsuccessfull"))
));
}
let mut response = self.request(uri).await
. map_err(|e| DownloadError::new(uri.clone(), Some(e)))?;
// We always need the content-type to be able to decide
let content_type = response.headers()[CONTENT_TYPE].to_str()
@ -180,7 +106,7 @@ impl State {
let message = format!("unexpected content-type: {}", content_type);
log!(Level::Debug, "{}", message);
return Err(DownloadError::new( uri.clone()
, Some(anyhow::Error::msg(message)) ));
, Some(anyhow!(message)) ));
}
// I consider a missing path as fatal... there is absolutely nothing we can do about it
@ -212,4 +138,80 @@ impl State {
Ok(())
}
fn download_uri(&self, segment: &MediaSegment) -> anyhow::Result<Uri>
{
match Uri::try_from(segment.uri.clone()) {
Ok(uri) => {
let scheme = uri.scheme().unwrap_or(&self.scheme);
let auth = uri.authority().unwrap_or(&self.auth);
let path_and_query = uri.path_and_query()
. ok_or(anyhow!("No path in Uri"))?;
Ok(Uri::builder() . scheme(scheme.clone())
. authority(auth.clone())
. path_and_query(path_and_query.clone())
. build()?)
}
Err(_) => {
Ok(Uri::builder() . scheme(self.scheme.clone())
. authority(self.auth.clone())
. path_and_query(self.base_path.clone() + "/" + &segment.uri)
. build()?)
}
}
}
fn uri_relative_path_and_query(uri: &Uri) -> anyhow::Result<String>
{
let filename = Path::new(uri.path())
. file_name()
. ok_or(anyhow!("name error"))?
. to_str()
. ok_or(anyhow!("Error getting filename from uri"))?;
let query = uri.query()
. map(|q| "?".to_owned() + q)
. unwrap_or("".to_string());
Ok((filename.to_owned() + &query).to_string())
}
async fn request(&mut self, uri: &Uri) -> anyhow::Result<Response<Body>>
{
// Send get request with timeout.
let send_fut = self.client.get(uri).send()?;
let response = timeout(self.timeout, send_fut).await??;
anyhow::ensure!( response.status().is_success()
, "resonse status failed: {}"
, response.status() );
Ok(response)
}
async fn write_playlist(uri: &Uri, playlist: &MediaPlaylist) -> anyhow::Result<()> {
let filename = Path::new(uri.path())
. file_name()
. ok_or(anyhow!("can't extract filename from uri"))?;
let mut file = File::create(filename).await?;
let segments: anyhow::Result<Vec<_>> = playlist.segments.iter().map(|s| {
let mut new_segment = s.clone();
Ok(match Uri::try_from(s.uri.clone()) {
Ok(uri) => {
new_segment.uri = Self::uri_relative_path_and_query(&uri)?;
new_segment
}
Err(_) => new_segment
})
}).collect();
let mut out_pl = playlist.clone();
out_pl.segments = segments?;
let mut file_data = vec![];
out_pl.write_to(&mut file_data)?;
file.write_all(&file_data).await?;
Ok(())
}
}

3
src/main.rs

@ -73,6 +73,7 @@ async fn main() -> anyhow::Result<()> {
'working: while ! segments.is_empty() {
let mut tasks = vec![];
while let Some(segment) = segments.pop() {
log!(Level::Info, "download segment: {}", segment);
let state = state.clone();
tasks.push(tokio::spawn(async move {
let mut state = state.clone();
@ -89,7 +90,7 @@ async fn main() -> anyhow::Result<()> {
},
Ok(Err(e)) => {
log!(Level::Info, "Retry failed download: {}", e);
log!(Level::Warn, "Retry failed download: {}", e);
segments.push(e.uri.clone());
},

Loading…
Cancel
Save