diff --git a/Cargo.lock b/Cargo.lock index 595a16e..3c20b40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "anstream" version = "0.6.18" @@ -81,6 +96,22 @@ version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" +[[package]] +name = "async-compression" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522" +dependencies = [ + "brotli", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "zstd", + "zstd-safe", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -120,6 +151,27 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "brotli" +version = "7.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "4.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -138,6 +190,8 @@ version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c31a0499c1dc64f458ad13872de75c0eb7e3fdb0e67964610c914b034fc5956e" dependencies = [ + "jobserver", + "libc", "shlex", ] @@ -218,6 +272,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -295,6 +358,16 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "flate2" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -450,6 +523,7 @@ dependencies = [ "shellwords", "tokio", "tower", + "tower-http", "tower-http-client", "tower-reqwest", "which", @@ -771,6 +845,15 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.76" @@ -1516,6 +1599,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697" +dependencies = [ + "async-compression", + "bitflags", + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "pin-project-lite", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http-client" version = "0.4.1" @@ -1960,3 +2063,31 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zstd" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54a3ab4db68cea366acc5c897c7b4d4d1b8994a9cd6e6f841f8964566a419059" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.13+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index 97750aa..4aa3872 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,12 @@ reqwest = "0.12" shellwords = "1.1" tokio = { version = "1.42", features = [ "macros", "rt", "rt-multi-thread" ] } tower = { version = "0.5", features = [ "limit", "timeout" ] } +tower-http = { version = "0.6", features = [ + "decompression-br", + "decompression-deflate", + "decompression-gzip", + "decompression-zstd" +]} tower-http-client = "0.4" tower-reqwest = "0.4" which = "7.0" diff --git a/src/main.rs b/src/main.rs index a65e539..0f77c27 100644 --- a/src/main.rs +++ b/src/main.rs @@ -83,16 +83,16 @@ async fn main() -> anyhow::Result<()> { let mut state = client::State::new(&m3u8_uri, concurrency, timeout)?; - let handle = ClientActorHandle::new(timeout); - handle.download("foo.m3u8", &m3u8_uri).await; - handle.stop(); - info!("Get segments..."); let mut segments = state.get_m3u8_segment_uris(m3u8_path_and_query.as_str()).await?; info!("Sending concurrent requests..."); + let handle = ClientActorHandle::new(timeout); + handle.download("foo.m3u8", &m3u8_uri).await; + handle.stop(); + let mut join_set = JoinSet::new(); while let Some(segment) = segments.pop() { info!("Spawn task for: {}", segment); diff --git a/src/new_client.rs b/src/new_client.rs index 8fa929d..16605a4 100644 --- a/src/new_client.rs +++ b/src/new_client.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, io::ErrorKind, path::{Path, PathBuf}, time::Duration}; +use anyhow::anyhow; use futures_util::StreamExt as _; use http::{ header::{CONTENT_LENGTH, RANGE}, @@ -22,6 +23,7 @@ use tokio::{ time::timeout }; use tower::{util::BoxCloneService, ServiceBuilder, ServiceExt as _}; +use tower_http::decompression::{DecompressionBody, DecompressionLayer}; use tower_http_client::ServiceExt as _; use tower_reqwest::HttpClientLayer; @@ -53,7 +55,7 @@ pub(super) enum ClientActorMessageHandle { pub(super) type ActionIndex = u64; type JoinSetResult = Result, DownloadError>; -type HttpClient = BoxCloneService, Response, anyhow::Error>; +type HttpClient = BoxCloneService, Response>, anyhow::Error>; #[derive(Debug)] @@ -143,7 +145,7 @@ async fn file_size(filename: &Path) -> u64 { async fn request( client: &mut HttpClient , method: &str , uri: &Uri - , headers: HeaderMap ) -> anyhow::Result> { + , headers: HeaderMap ) -> anyhow::Result>> { let mut request = RequestBuilder::new() . method(method) . uri(uri) @@ -151,11 +153,11 @@ async fn request( client: &mut HttpClient request.headers_mut().extend(headers); - debug!("New Request: {:?}", request); + debug!("Request: {:?}", request); let response = client.execute(request).await?; - debug!("New Response: {:?}", response); + debug!("Response: {:?}", response.headers()); anyhow::ensure!( response.status().is_success() , "resonse status failed: {}" @@ -195,7 +197,7 @@ async fn open_outfile(status: &StatusCode, filename: &Path) -> File { } async fn store_body( file: &mut File - , body: &mut Body + , body: &mut DecompressionBody , io_timeout: Duration ) -> anyhow::Result<()> { let mut body = BodyDataStream::new(body); @@ -204,7 +206,10 @@ async fn store_body( file: &mut File let data = timeout(io_timeout, body.next()).await?; match data { None => break, - Some(Err(e)) => return Err(e.into()), + Some(Err(e)) => { + return Err(anyhow!(e.to_string())); + } + Some(Ok(data)) => { file . write_all(data.as_ref()).await?; file . flush().await?; @@ -266,6 +271,7 @@ impl ClientActor { // Add some layers. . concurrency_limit(concurrency_limit) . timeout(timeout) + . layer(DecompressionLayer::new()) // Make client compatible with the `tower-http` layers. . layer(HttpClientLayer) . service( reqwest::Client::builder() @@ -334,6 +340,7 @@ impl ClientActorHandle { pub(super) fn stop(self) { let _ = self.abort.send(Ok(None)); + drop(self.sender); } pub(super) async fn download(&self, filename: impl AsRef, uri: &Uri) {