Browse Source

First working poc

main
Georg Hopp 12 months ago
parent
commit
cc9c7e50fa
Signed by: ghopp GPG Key ID: 4C5D226768784538
  1. 1
      Cargo.lock
  2. 3
      Cargo.toml
  3. 162
      src/main.rs

1
Cargo.lock

@ -437,6 +437,7 @@ dependencies = [
"env_logger",
"futures-util",
"http",
"http-body-util",
"log",
"m3u8-rs",
"reqwest",

3
Cargo.toml

@ -9,10 +9,11 @@ clap = { version = "4.5", features = [ "derive" ] }
env_logger = "0.11"
futures-util = "0.3"
http = "1.2"
http-body-util = "0.1"
log = "0.4"
m3u8-rs = "6.0"
reqwest = "0.12"
tokio = { version = "1.42", features = [ "macros", "rt-multi-thread" ] }
tower = { version = "0.5", features = [ "buffer", "limit" ] }
tower = { version = "0.5", features = [ "buffer", "limit", "timeout" ] }
tower-http-client = "0.4"
tower-reqwest = "0.4"

162
src/main.rs

@ -1,13 +1,16 @@
use std::time::Duration;
use std::{path::Path, time::Duration};
use anyhow::anyhow;
use clap::Parser;
use env_logger::Env;
use futures_util::StreamExt;
use http::{uri::{Authority, Scheme}, Request, Response, Uri};
use http_body_util::BodyDataStream;
use m3u8_rs::Playlist;
use reqwest::Body;
use tokio::{fs::File, io::AsyncWriteExt, task::{JoinError, JoinHandle}, time::timeout};
use tower::{ServiceBuilder, ServiceExt as _};
use tower_http_client::{client::BodyReader, ServiceExt as _};
use tower_http_client::ServiceExt as _;
use tower_reqwest::HttpClientLayer;
use log::{log, Level};
@ -19,6 +22,7 @@ struct DownloadMessage {
url: String,
}
#[derive(Debug, Parser)]
struct Args {
#[arg(short, long)]
@ -31,37 +35,103 @@ type HttpClient = tower::util::BoxCloneService<Request<Body>, Response<Body>, an
struct State {
scheme: Scheme,
auth: Authority,
base_path: String,
client: HttpClient,
}
impl State {
async fn get(&mut self, path_and_query: &str) -> anyhow::Result<()> {
async fn get_m3u8_segment_uris(&mut self, path_and_query: &str) -> anyhow::Result<Vec<Uri>> {
let uri = Uri::builder()
. scheme(self.scheme.clone())
. authority(self.auth.clone())
. path_and_query(path_and_query)
. build()?;
let filename = Path::new(uri.path()).file_name().ok_or(anyhow!("name error"))?;
let mut file = File::create(filename).await?;
let mut response = self
. client
. get(uri)
. send()?
. await?;
let mut response = self.client.get(uri).send()?.await?;
anyhow::ensure!(response.status().is_success(), "response failed");
log!(Level::Debug, "-> Response: {:?}", response);
let body = BodyReader::new(response.body_mut()).bytes().await?;
match m3u8_rs::parse_playlist(&body) {
Result::Ok((_, Playlist::MasterPlaylist(pl))) =>
log!(Level::Info, "Master playlist: {:?}", pl),
Result::Ok((_, Playlist::MediaPlaylist(pl))) =>
log!(Level::Info, "Media playlist: {:?}", pl),
Result::Err(e) =>
Err(anyhow!("Parsing error: {}", e))?,
let mut body = vec![];
let mut body_stream = BodyDataStream::new(response.body_mut());
'label: loop {
let data = timeout(Duration::from_secs(10), body_stream.next()).await?;
match data {
None => break 'label,
Some(Err(_)) => break 'label,
Some(Ok(data)) => {
body.append(&mut Vec::from(data.as_ref()));
},
}
};
file.write_all(&body).await?;
match m3u8_rs::parse_playlist(&body) {
Result::Err(e) => Err(anyhow!("m3u8 parse error: {}", e)),
Result::Ok((_, Playlist::MasterPlaylist(_))) =>
Err(anyhow!("Master playlist not supported now")),
Result::Ok((_, Playlist::MediaPlaylist(pl))) => {
let uris: anyhow::Result<Vec<_>> = pl.segments.iter().map(|s| {
match Uri::try_from(s.uri.clone()) {
Ok(uri) => {
let scheme = uri.scheme()
. or(Some(&self.scheme))
. ok_or(anyhow!("No scheme in Uri"))?;
let auth = uri.authority()
. or(Some(&self.auth))
. ok_or(anyhow!("No authority in Uri"))?;
let path_and_query = uri.path_and_query()
. ok_or(anyhow!("No path in Uri"))?;
Ok(Uri::builder() . scheme(scheme.clone())
. authority(auth.clone())
. path_and_query(path_and_query.clone())
. build()?) }
Err(e) => {
log!(Level::Debug, "Uri parse error: {:?} - {}", e, s.uri);
Ok(Uri::builder() . scheme(self.scheme.clone())
. authority(self.auth.clone())
. path_and_query(self.base_path.clone() + "/" + &s.uri)
. build()?) }
}
}).collect();
uris
},
}
}
Ok(())
async fn get_m3u8_segment(&mut self, uri: &Uri) -> Result<Uri, Uri> {
if let Ok(send_fut) = self.client.get(uri).send() {
if let Ok(mut response) = send_fut.await {
if response.status().is_success() {
let path_and_query = uri.path_and_query().expect("No path and query").as_str();
let filename = Path::new(path_and_query).file_name().ok_or(uri.clone())?;
let mut file = File::create(filename).await.or(Err(uri.clone()))?;
let mut body_stream = BodyDataStream::new(response.body_mut());
'label: loop {
let data = timeout(Duration::from_secs(10), body_stream.next()).await.or(Err(uri.clone()))?;
match data {
None => break 'label,
Some(Err(_)) => return Err(uri.clone()),
Some(Ok(data)) => {
file.write_all(data.as_ref()).await.or(Err(uri.clone()))?;
},
}
};
Ok(uri.clone())
} else {
Err(uri.clone())
}
} else {
Err(uri.clone())
}
} else {
Err(uri.clone())
}
}
}
@ -87,16 +157,21 @@ async fn main() -> anyhow::Result<()> {
. ok_or(anyhow!("Problem scheme in m3u8 uri"))?;
let m3u8_auth = m3u8_uri.authority()
. ok_or(anyhow!("Problem authority in m3u8 uri"))?;
let m3u8_base_path = Path::new(m3u8_uri.path()).parent()
. ok_or(anyhow!("Path problem"))?
. to_str()
. ok_or(anyhow!("Path problem"))?;
let m3u8_path_and_query = m3u8_uri.path_and_query()
. ok_or(anyhow!("Problem path and query in m3u8 uri"))?;
let state = State {
let mut state = State {
scheme: m3u8_scheme.clone(),
auth: m3u8_auth.clone(),
base_path: m3u8_base_path.to_string(),
client: ServiceBuilder::new()
// Add some layers.
. buffer(64)
. rate_limit(10, Duration::from_secs(1))
. concurrency_limit(50)
. concurrency_limit(10)
// Make client compatible with the `tower-http` layers.
. layer(HttpClientLayer)
. service(reqwest::Client::new())
@ -108,32 +183,39 @@ async fn main() -> anyhow::Result<()> {
// I think about a worker pool... probably of concurrency_limit amount.
// The worker needs to get the url. Our first target is to store the
// data on a file with the same name as the last part of the URL.
// CURRENTLY I just create a task for each download.
log!(Level::Info, "-> Get segments...");
let mut segments = state.get_m3u8_segment_uris(m3u8_path_and_query.as_str()).await?;
log!(Level::Info, "-> Sending concurrent requests...");
let tasks = (0..1).map({
|i| {
'working: while ! segments.is_empty() {
let mut tasks: Vec<JoinHandle<Result<Uri, Uri>>> = vec![];
while let Some(segment) = segments.pop() {
let state = state.clone();
let m3u8_path_and_query = m3u8_path_and_query.clone();
tokio::spawn(async move {
tasks.push(tokio::spawn(async move {
let mut state = state.clone();
state.get(m3u8_path_and_query.as_str()).await?;
log!( Level::Info
, "[task {}]: {} completed successfully!"
, i
, m3u8_path_and_query );
anyhow::Ok(())
})
state.get_m3u8_segment(&segment).await
}));
}
});
let results = futures_util::future::join_all(tasks).await;
for result in &results {
match result {
Err(e) => log!(Level::Error, "{}", e),
Ok(Err(e)) => log!(Level::Error, "{}", e),
_ => (),
let results: Vec<Result<Result<Uri, Uri>, JoinError>> = futures_util::future::join_all(tasks).await;
for result in &results {
match result {
Err(e) => {
log!(Level::Error, "FATAL Join failed: {}", e);
break 'working
},
Ok(Err(e)) => {
log!(Level::Info, "Retry failed download: {}", e);
segments.push(e.clone());
},
_ => (),
}
}
}

Loading…
Cancel
Save