Browse Source

Very basic first steps

main
Georg Hopp 12 months ago
commit
29eb221469
Signed by: ghopp GPG Key ID: 4C5D226768784538
  1. 1
      .gitignore
  2. 1919
      Cargo.lock
  3. 18
      Cargo.toml
  4. 141
      src/main.rs

1
.gitignore

@ -0,0 +1 @@
**/target

1919
Cargo.lock
File diff suppressed because it is too large
View File

18
Cargo.toml

@ -0,0 +1,18 @@
[package]
name = "hlsclient"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0"
clap = { version = "4.5", features = [ "derive" ] }
env_logger = "0.11"
futures-util = "0.3"
http = "1.2"
log = "0.4"
m3u8-rs = "6.0"
reqwest = "0.12"
tokio = { version = "1.42", features = [ "macros", "rt-multi-thread" ] }
tower = { version = "0.5", features = [ "buffer", "limit" ] }
tower-http-client = "0.4"
tower-reqwest = "0.4"

141
src/main.rs

@ -0,0 +1,141 @@
use std::time::Duration;
use anyhow::anyhow;
use clap::Parser;
use env_logger::Env;
use http::{uri::{Authority, Scheme}, Request, Response, Uri};
use m3u8_rs::Playlist;
use reqwest::Body;
use tower::{ServiceBuilder, ServiceExt as _};
use tower_http_client::{client::BodyReader, ServiceExt as _};
use tower_reqwest::HttpClientLayer;
use log::{log, Level};
#[allow(dead_code)]
#[derive(Debug)]
struct DownloadMessage {
url: String,
}
#[derive(Debug, Parser)]
struct Args {
#[arg(short, long)]
url: String
}
type HttpClient = tower::util::BoxCloneService<Request<Body>, Response<Body>, anyhow::Error>;
#[derive(Clone, Debug)]
struct State {
scheme: Scheme,
auth: Authority,
client: HttpClient,
}
impl State {
async fn get(&mut self, path_and_query: &str) -> anyhow::Result<()> {
let uri = Uri::builder()
. scheme(self.scheme.clone())
. authority(self.auth.clone())
. path_and_query(path_and_query)
. build()?;
let mut response = self
. client
. get(uri)
. send()?
. await?;
anyhow::ensure!(response.status().is_success(), "response failed");
log!(Level::Debug, "-> Response: {:?}", response);
let body = BodyReader::new(response.body_mut()).bytes().await?;
match m3u8_rs::parse_playlist(&body) {
Result::Ok((_, Playlist::MasterPlaylist(pl))) =>
log!(Level::Info, "Master playlist: {:?}", pl),
Result::Ok((_, Playlist::MediaPlaylist(pl))) =>
log!(Level::Info, "Media playlist: {:?}", pl),
Result::Err(e) =>
Err(anyhow!("Parsing error: {}", e))?,
};
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
eprintln!("-> Initialize env_logger");
let env = Env::default()
. filter_or("LOG_LEVEL", "error")
. write_style_or("LOG_STYLE", "always");
env_logger::init_from_env(env);
log!(Level::Info, "-> Parse command line arguments...");
let args = Args::parse();
log!(Level::Debug, "-> Arguments: {:?}", args);
log!(Level::Info, "-> Creating an HTTP client with Tower layers...");
let m3u8_uri = Uri::try_from(&args.url)?;
let m3u8_scheme = m3u8_uri.scheme()
. ok_or(anyhow!("Problem scheme in m3u8 uri"))?;
let m3u8_auth = m3u8_uri.authority()
. ok_or(anyhow!("Problem authority in m3u8 uri"))?;
let m3u8_path_and_query = m3u8_uri.path_and_query()
. ok_or(anyhow!("Problem path and query in m3u8 uri"))?;
let state = State {
scheme: m3u8_scheme.clone(),
auth: m3u8_auth.clone(),
client: ServiceBuilder::new()
// Add some layers.
. buffer(64)
. rate_limit(10, Duration::from_secs(1))
. concurrency_limit(50)
// Make client compatible with the `tower-http` layers.
. layer(HttpClientLayer)
. service(reqwest::Client::new())
. map_err(anyhow::Error::msg)
. boxed_clone(),
};
log!(Level::Debug, "-> state: {:?}", state);
// I think about a worker pool... probably of concurrency_limit amount.
// The worker needs to get the url. Our first target is to store the
// data on a file with the same name as the last part of the URL.
log!(Level::Info, "-> Sending concurrent requests...");
let tasks = (0..1).map({
|i| {
let state = state.clone();
let m3u8_path_and_query = m3u8_path_and_query.clone();
tokio::spawn(async move {
let mut state = state.clone();
state.get(m3u8_path_and_query.as_str()).await?;
log!( Level::Info
, "[task {}]: {} completed successfully!"
, i
, m3u8_path_and_query );
anyhow::Ok(())
})
}
});
let results = futures_util::future::join_all(tasks).await;
for result in &results {
match result {
Err(e) => log!(Level::Error, "{}", e),
Ok(Err(e)) => log!(Level::Error, "{}", e),
_ => (),
}
}
Ok(())
}
Loading…
Cancel
Save