Browse Source

Add decompression layer

main
Georg Hopp 11 months ago
parent
commit
a7ac64c6d6
Signed by: ghopp GPG Key ID: 4C5D226768784538
  1. 131
      Cargo.lock
  2. 6
      Cargo.toml
  3. 8
      src/main.rs
  4. 19
      src/new_client.rs

131
Cargo.lock

@ -26,6 +26,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3"
[[package]]
name = "alloc-stdlib"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece"
dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "anstream"
version = "0.6.18"
@ -81,6 +96,22 @@ version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7"
[[package]]
name = "async-compression"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522"
dependencies = [
"brotli",
"flate2",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"zstd",
"zstd-safe",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@ -120,6 +151,27 @@ version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
[[package]]
name = "brotli"
version = "7.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
"brotli-decompressor",
]
[[package]]
name = "brotli-decompressor"
version = "4.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
]
[[package]]
name = "bumpalo"
version = "3.16.0"
@ -138,6 +190,8 @@ version = "1.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31a0499c1dc64f458ad13872de75c0eb7e3fdb0e67964610c914b034fc5956e"
dependencies = [
"jobserver",
"libc",
"shlex",
]
@ -218,6 +272,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "crc32fast"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3"
dependencies = [
"cfg-if",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
@ -295,6 +358,16 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "flate2"
version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c"
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -450,6 +523,7 @@ dependencies = [
"shellwords",
"tokio",
"tower",
"tower-http",
"tower-http-client",
"tower-reqwest",
"which",
@ -771,6 +845,15 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
[[package]]
name = "jobserver"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.76"
@ -1516,6 +1599,26 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower-http"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697"
dependencies = [
"async-compression",
"bitflags",
"bytes",
"futures-core",
"http",
"http-body",
"http-body-util",
"pin-project-lite",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-http-client"
version = "0.4.1"
@ -1960,3 +2063,31 @@ dependencies = [
"quote",
"syn",
]
[[package]]
name = "zstd"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "7.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54a3ab4db68cea366acc5c897c7b4d4d1b8994a9cd6e6f841f8964566a419059"
dependencies = [
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.13+zstd.1.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa"
dependencies = [
"cc",
"pkg-config",
]

6
Cargo.toml

@ -16,6 +16,12 @@ reqwest = "0.12"
shellwords = "1.1"
tokio = { version = "1.42", features = [ "macros", "rt", "rt-multi-thread" ] }
tower = { version = "0.5", features = [ "limit", "timeout" ] }
tower-http = { version = "0.6", features = [
"decompression-br",
"decompression-deflate",
"decompression-gzip",
"decompression-zstd"
]}
tower-http-client = "0.4"
tower-reqwest = "0.4"
which = "7.0"

8
src/main.rs

@ -83,16 +83,16 @@ async fn main() -> anyhow::Result<()> {
let mut state = client::State::new(&m3u8_uri, concurrency, timeout)?;
let handle = ClientActorHandle::new(timeout);
handle.download("foo.m3u8", &m3u8_uri).await;
handle.stop();
info!("Get segments...");
let mut segments = state.get_m3u8_segment_uris(m3u8_path_and_query.as_str()).await?;
info!("Sending concurrent requests...");
let handle = ClientActorHandle::new(timeout);
handle.download("foo.m3u8", &m3u8_uri).await;
handle.stop();
let mut join_set = JoinSet::new();
while let Some(segment) = segments.pop() {
info!("Spawn task for: {}", segment);

19
src/new_client.rs

@ -1,5 +1,6 @@
use std::{collections::HashMap, io::ErrorKind, path::{Path, PathBuf}, time::Duration};
use anyhow::anyhow;
use futures_util::StreamExt as _;
use http::{
header::{CONTENT_LENGTH, RANGE},
@ -22,6 +23,7 @@ use tokio::{
time::timeout
};
use tower::{util::BoxCloneService, ServiceBuilder, ServiceExt as _};
use tower_http::decompression::{DecompressionBody, DecompressionLayer};
use tower_http_client::ServiceExt as _;
use tower_reqwest::HttpClientLayer;
@ -53,7 +55,7 @@ pub(super) enum ClientActorMessageHandle {
pub(super) type ActionIndex = u64;
type JoinSetResult = Result<Option<ClientActorMessageHandle>, DownloadError>;
type HttpClient = BoxCloneService<Request<Body>, Response<Body>, anyhow::Error>;
type HttpClient = BoxCloneService<Request<Body>, Response<DecompressionBody<Body>>, anyhow::Error>;
#[derive(Debug)]
@ -143,7 +145,7 @@ async fn file_size(filename: &Path) -> u64 {
async fn request( client: &mut HttpClient
, method: &str
, uri: &Uri
, headers: HeaderMap ) -> anyhow::Result<Response<Body>> {
, headers: HeaderMap ) -> anyhow::Result<Response<DecompressionBody<Body>>> {
let mut request = RequestBuilder::new()
. method(method)
. uri(uri)
@ -151,11 +153,11 @@ async fn request( client: &mut HttpClient
request.headers_mut().extend(headers);
debug!("New Request: {:?}", request);
debug!("Request: {:?}", request);
let response = client.execute(request).await?;
debug!("New Response: {:?}", response);
debug!("Response: {:?}", response.headers());
anyhow::ensure!( response.status().is_success()
, "resonse status failed: {}"
@ -195,7 +197,7 @@ async fn open_outfile(status: &StatusCode, filename: &Path) -> File {
}
async fn store_body( file: &mut File
, body: &mut Body
, body: &mut DecompressionBody<Body>
, io_timeout: Duration ) -> anyhow::Result<()> {
let mut body = BodyDataStream::new(body);
@ -204,7 +206,10 @@ async fn store_body( file: &mut File
let data = timeout(io_timeout, body.next()).await?;
match data {
None => break,
Some(Err(e)) => return Err(e.into()),
Some(Err(e)) => {
return Err(anyhow!(e.to_string()));
}
Some(Ok(data)) => {
file . write_all(data.as_ref()).await?;
file . flush().await?;
@ -266,6 +271,7 @@ impl ClientActor {
// Add some layers.
. concurrency_limit(concurrency_limit)
. timeout(timeout)
. layer(DecompressionLayer::new())
// Make client compatible with the `tower-http` layers.
. layer(HttpClientLayer)
. service( reqwest::Client::builder()
@ -334,6 +340,7 @@ impl ClientActorHandle {
pub(super) fn stop(self) {
let _ = self.abort.send(Ok(None));
drop(self.sender);
}
pub(super) async fn download(&self, filename: impl AsRef<Path>, uri: &Uri) {

Loading…
Cancel
Save