A tool to get a HLS video stream.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

153 lines
4.0 KiB

mod process;
mod m3u8_download;
mod client;
mod client_actor;
use std::{ffi::OsStr, path::PathBuf, time::Duration};
use anyhow::anyhow;
use clap::Parser;
use client_actor::ClientActorHandle;
use client::Client;
use env_logger::Env;
use http::Uri;
use m3u8_download::M3u8Download;
use log::{debug, info};
use process::{enter_download_dir, ffmpeg, remove_download_dir};
#[derive(Debug, Parser)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(short, long)]
name: PathBuf,
#[arg(short, long)]
url: String,
#[arg( short
, long
, default_value_t = 10
, help = "request to store in client after \
concurrency limit reached" )]
buffer: usize,
#[arg( short
, long
, default_value_t = 40
, help = "number of requests per second the client should perform" )]
rate: u64,
#[arg( short
, long
, default_value_t = 20
, help = "number of concurrent requests" )]
concurrency: usize,
#[arg( short
, long
, default_value_t = 30
, help = "network io timeout" )]
timeout: u64,
#[arg( short = 'B'
, default_value_t = false
, help = "also use timeout on body reads" )]
use_body_timeout: bool,
#[arg( short = 'R'
, default_value_t = false
, help = "prevent reconnect after each iteration" )]
prevent_reconnect: bool,
#[arg( short
, default_value_t = 301
, help = "wait for temporary failure like 503" )]
wait: u64,
#[arg(short, long)]
origin: Option<String>,
#[arg(short, long)]
agent: Option<String>,
#[arg(short, action = clap::ArgAction::Count, help = "Increase verbosity")]
verbose: u8,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let log_level = match args.verbose {
0 => "error",
1 => "info",
_ => "debug",
};
let env = Env::default()
. filter_or("LOG_LEVEL", log_level)
. write_style_or("LOG_STYLE", "always");
env_logger::init_from_env(env);
let name = args.name.as_path();
if name.file_name() != Some(OsStr::new(name)) {
Err(anyhow!("Name must not contain a path"))?
}
if name.extension() != Some(OsStr::new("mp4")) {
Err(anyhow!("Only filenames with .mp4 extension are allowed"))?
}
let timeout = Duration::from_secs(args.timeout);
let wait_time = Duration::from_secs(args.wait);
let body_timeout = if args.use_body_timeout {
Some(timeout)
} else {
None
};
let m3u8_uri = Uri::try_from(&args.url)?;
info!("Create and chdir into temporary download dir...");
let basename = enter_download_dir(&name).await?;
info!("Creating an HTTP client with Tower layers...");
let client = Client::new(args.buffer, args.rate, args.concurrency, timeout)?
. set_body_timeout(body_timeout)
. set_origin(args.origin)?;
let client = if let Some(user_agent) = args.agent {
client.set_user_agent(Some(user_agent))?
} else {
client
};
let client = if log_level == "error" {
client.init_progress()
} else {
client
};
let actor = ClientActorHandle::new(client, args.buffer + args.concurrency);
info!("Get segments...");
let m3u8_data = actor.body_bytes(&m3u8_uri).await
. ok_or(anyhow!("Unable to get body for: {}", m3u8_uri))?;
let mut download = M3u8Download::new( m3u8_data
, m3u8_uri
, wait_time
, ! args.prevent_reconnect ).await?;
info!("Sending concurrent requests...");
download.download(&actor).await;
actor.stop();
info!("Call ffmpeg to join ts files to single mp4...");
let status = ffmpeg(&name, download.index_uri()).await?;
debug!("ffmpeg status: {}", status);
info!("Leave and remove temporary download dir...");
remove_download_dir(&basename).await?;
Ok(())
}