|
|
|
@ -4,10 +4,10 @@ use anyhow::anyhow; |
|
|
|
use clap::Parser;
|
|
|
|
use env_logger::Env;
|
|
|
|
use futures_util::StreamExt;
|
|
|
|
use http::{uri::{Authority, Scheme}, Request, Response, Uri};
|
|
|
|
use http::{header::CONTENT_TYPE, uri::{Authority, Scheme}, Request, Response, Uri};
|
|
|
|
use http_body_util::BodyDataStream;
|
|
|
|
use m3u8_rs::Playlist;
|
|
|
|
use reqwest::Body;
|
|
|
|
use reqwest::{redirect::Policy, Body};
|
|
|
|
use tokio::{fs::File, io::AsyncWriteExt, task::{JoinError, JoinHandle}, time::timeout};
|
|
|
|
use tower::{ServiceBuilder, ServiceExt as _};
|
|
|
|
use tower_http_client::ServiceExt as _;
|
|
|
|
@ -49,7 +49,7 @@ impl State { |
|
|
|
let filename = Path::new(uri.path()).file_name().ok_or(anyhow!("name error"))?;
|
|
|
|
let mut file = File::create(filename).await?;
|
|
|
|
|
|
|
|
let mut response = self.client.get(uri).send()?.await?;
|
|
|
|
let mut response = timeout(Duration::from_secs(10), self.client.get(uri).send()?).await??;
|
|
|
|
|
|
|
|
anyhow::ensure!(response.status().is_success(), "response failed");
|
|
|
|
|
|
|
|
@ -65,13 +65,36 @@ impl State { |
|
|
|
},
|
|
|
|
}
|
|
|
|
};
|
|
|
|
file.write_all(&body).await?;
|
|
|
|
|
|
|
|
match m3u8_rs::parse_playlist(&body) {
|
|
|
|
Result::Err(e) => Err(anyhow!("m3u8 parse error: {}", e)),
|
|
|
|
Result::Ok((_, Playlist::MasterPlaylist(_))) =>
|
|
|
|
Err(anyhow!("Master playlist not supported now")),
|
|
|
|
Result::Ok((_, Playlist::MediaPlaylist(pl))) => {
|
|
|
|
let segments: anyhow::Result<Vec<_>> = pl.segments.iter().map(|s| {
|
|
|
|
Ok(match Uri::try_from(s.uri.clone()) {
|
|
|
|
Ok(uri) => {
|
|
|
|
let mut new_segment = s.clone();
|
|
|
|
let filename = Path::new(uri.path())
|
|
|
|
. file_name()
|
|
|
|
. ok_or(anyhow!("name error"))?
|
|
|
|
. to_str()
|
|
|
|
. ok_or(anyhow!("Error getting filename from uri"))?;
|
|
|
|
let query = uri.query()
|
|
|
|
. map(|q| "?".to_owned() + q)
|
|
|
|
. unwrap_or("".to_string());
|
|
|
|
new_segment.uri = (filename.to_owned() + &query).to_string();
|
|
|
|
new_segment }
|
|
|
|
|
|
|
|
Err(_) => s.clone()
|
|
|
|
})
|
|
|
|
}).collect();
|
|
|
|
let mut out_pl = pl.clone();
|
|
|
|
out_pl.segments = segments?;
|
|
|
|
let mut file_data: Vec<u8> = Vec::new();
|
|
|
|
out_pl.write_to(&mut file_data)?;
|
|
|
|
file.write_all(&file_data).await?;
|
|
|
|
|
|
|
|
let uris: anyhow::Result<Vec<_>> = pl.segments.iter().map(|s| {
|
|
|
|
match Uri::try_from(s.uri.clone()) {
|
|
|
|
Ok(uri) => {
|
|
|
|
@ -104,8 +127,21 @@ impl State { |
|
|
|
|
|
|
|
async fn get_m3u8_segment(&mut self, uri: &Uri) -> Result<Uri, Uri> {
|
|
|
|
if let Ok(send_fut) = self.client.get(uri).send() {
|
|
|
|
if let Ok(mut response) = send_fut.await {
|
|
|
|
if let Ok(mut response) = timeout(Duration::from_secs(10), send_fut).await.or(Err(uri.clone()))? {
|
|
|
|
if response.status().is_success() {
|
|
|
|
let content_type = match response.headers()[CONTENT_TYPE].to_str() {
|
|
|
|
Err(_) => {
|
|
|
|
log!(Level::Debug, "Error getting content-type");
|
|
|
|
Err(uri.clone())
|
|
|
|
},
|
|
|
|
Ok(h) => Ok(h)
|
|
|
|
}?;
|
|
|
|
log!(Level::Debug, "CONTENT-TYPE: {}", content_type);
|
|
|
|
if content_type != "video/MP2T" {
|
|
|
|
log!(Level::Error, "{} is not video/MP2T", content_type);
|
|
|
|
return Err(uri.clone());
|
|
|
|
}
|
|
|
|
|
|
|
|
let path_and_query = uri.path_and_query().expect("No path and query").as_str();
|
|
|
|
let filename = Path::new(path_and_query).file_name().ok_or(uri.clone())?;
|
|
|
|
let mut file = File::create(filename).await.or(Err(uri.clone()))?;
|
|
|
|
@ -174,7 +210,9 @@ async fn main() -> anyhow::Result<()> { |
|
|
|
. concurrency_limit(10)
|
|
|
|
// Make client compatible with the `tower-http` layers.
|
|
|
|
. layer(HttpClientLayer)
|
|
|
|
. service(reqwest::Client::new())
|
|
|
|
. service(reqwest::Client::builder()
|
|
|
|
. redirect(Policy::limited(5))
|
|
|
|
. build()? )
|
|
|
|
. map_err(anyhow::Error::msg)
|
|
|
|
. boxed_clone(),
|
|
|
|
};
|
|
|
|
|