Browse Source

pass DownloadState and buffer

main
Georg Hopp 11 months ago
parent
commit
499c561598
Signed by: ghopp GPG Key ID: 4C5D226768784538
  1. 5
      src/client.rs
  2. 36
      src/client_actor.rs
  3. 4
      src/client_actor/error.rs
  4. 16
      src/client_actor/message.rs
  5. 6
      src/client_actor/util.rs
  6. 54
      src/client_new.rs
  7. 28
      src/client_new/error.rs
  8. 4
      src/m3u8_download.rs
  9. 38
      src/main.rs

5
src/client.rs

@ -46,11 +46,6 @@ type JoinSetResult = Result<Option<ClientActorMessageHandle>, ClientError>;
type HttpClient = BoxCloneService<Request<Body>, Response<DecompressionBody<Body>>, anyhow::Error>;
// ===========
// ===========
#[derive(Debug)]
struct ClientActor {

36
src/client_actor.rs

@ -6,15 +6,16 @@ use std::{collections::HashMap, path::Path};
use error::ClientActorError;
use http::{HeaderMap, Uri};
use message::{ClientActorMessage, ClientActorMessageHandle, DownloadState};
use message::{ClientActorMessage, ClientActorMessageHandle, DownloadResult};
use tokio::{sync::{mpsc, oneshot}, task::JoinSet};
use crate::client_new::DownloadState;
use super::client_new::Client;
type ActionIndex = u64;
type ClientTaskResult = Result<Option<ClientActorMessageHandle>, ClientActorError>;
type DownloadResult = Result<Option<DownloadState>, ClientActorError>;
#[derive(Debug)]
@ -70,18 +71,22 @@ impl ClientActor {
let filename = filename.to_path_buf();
let uri = uri.clone();
let message = self.actions_idx;
let handle = ClientActorMessageHandle::Download {
let mut handle = ClientActorMessageHandle::Download {
filename,
uri,
state: None,
state: DownloadState::None,
message,
};
self.tasks.spawn(async move {
client.download(handle.filename(), &handle.uri(), &HeaderMap::new()).await
. map_err(|source| ClientActorError { action: handle.clone()
, source })?;
Ok(Some(handle))
let result = client.download(handle.filename(), &handle.uri(), &HeaderMap::new()).await;
match result {
Err(source) => Err(ClientActorError::new(&handle, source)),
Ok(state) => {
handle.set_state(state);
Ok(Some(handle))
},
}
});
},
@ -89,17 +94,21 @@ impl ClientActor {
// spawn a task that does the work
let mut client = self.client.clone();
let uri = uri.clone();
let handle = ClientActorMessageHandle::GetData {
let mut handle = ClientActorMessageHandle::GetData {
uri,
buffer: None,
message: self.actions_idx,
};
self.tasks.spawn(async move {
client.data(&handle.uri(), &HeaderMap::new()).await
. map_err(|source| ClientActorError { action: handle.clone()
, source })?;
Ok(Some(handle))
let result = client.data(&handle.uri(), &HeaderMap::new()).await;
match result {
Err(source) => Err(ClientActorError::new(&handle, source)),
Ok(data) => {
*handle.buffer_mut() = Some(data);
Ok(Some(handle))
},
}
});
},
@ -114,6 +123,7 @@ impl ClientActorHandle {
pub(super) fn new(client: Client, max_tasks: usize) -> Self {
let (sender, receiver) = mpsc::channel(1);
let (abort, abort_rx) = oneshot::channel::<ClientTaskResult>();
let actor = ClientActor::new(client, max_tasks, receiver, abort_rx )
. expect("Client create error");

4
src/client_actor/error.rs

@ -25,13 +25,13 @@ macro_rules! map_ca_error {
#[derive(Debug)]
pub(super) struct ClientActorError {
pub(crate) struct ClientActorError {
pub(super) action: ClientActorMessageHandle,
pub(super) source: anyhow::Error,
}
impl ClientActorError {
pub(super) fn new( action: ClientActorMessageHandle
pub(super) fn new( action: &ClientActorMessageHandle
, source: anyhow::Error ) -> Self {
let action = action.to_owned();
Self { action, source }

16
src/client_actor/message.rs

@ -3,20 +3,14 @@ use std::path::PathBuf;
use http::Uri;
use tokio::sync::oneshot;
use crate::client_new::DownloadState;
use super::error::ClientActorError;
type ActionIndex = u64;
type DownloadResult = Result<Option<DownloadState>, ClientActorError>;
pub(super) type DownloadResult = Result<DownloadState, ClientActorError>;
#[derive(Clone, Debug)]
pub enum DownloadState {
GotHead,
#[allow(dead_code)]
Partial { content_type: Option<String> },
Done { content_type: Option<String> },
}
#[derive(Debug)]
pub(super) enum ClientActorMessage {
@ -36,7 +30,7 @@ pub(super) enum ClientActorMessageHandle {
Download {
filename: PathBuf,
uri: Uri,
state: Option<DownloadState>,
state: DownloadState,
message: ActionIndex,
},
GetData {
@ -50,7 +44,7 @@ pub(super) enum ClientActorMessageHandle {
impl ClientActorMessageHandle {
pub(super) fn set_state(&mut self, new_state: DownloadState) {
match self {
Self::Download { ref mut state, .. } => *state = Some(new_state),
Self::Download { ref mut state, .. } => *state = new_state,
_ => panic!("Called with invalid variant"),
};
}

6
src/client_actor/util.rs

@ -25,14 +25,12 @@ async fn process_next_result(mut actor: ClientActor, result: ClientTaskResult) -
client.download( e.action.filename()
, &e.action.uri()
, &HeaderMap::new() ).await
. map_err(|source| ClientActorError { action: e.action.clone()
, source })?;
. map_err(|source| ClientActorError::new(&e.action, source))?;
Ok(Some(e.action))
},
GetData { .. } => {
client.data(&e.action.uri(), &HeaderMap::new()).await
. map_err(|source| ClientActorError { action: e.action.clone()
, source })?;
. map_err(|source| ClientActorError::new(&e.action, source))?;
Ok(Some(e.action))
},
}

54
src/client_new.rs

@ -5,7 +5,7 @@ use std::{path::Path, time::Duration};
use anyhow::anyhow;
use clap::{crate_name, crate_version};
use error::RequestError;
use error::{DownloadError, RequestError};
use futures_util::StreamExt as _;
use http::{
header::{CONTENT_LENGTH, CONTENT_TYPE, ORIGIN, RANGE, USER_AGENT}, request::Builder as RequestBuilder, HeaderMap, HeaderValue, Request, Response, Uri
@ -30,6 +30,16 @@ type ClientResponseResult = Result<ClientResponse, RequestError>;
type HttpClient = BoxCloneService<Request<Body>, ClientResponse, anyhow::Error>;
#[derive(Clone, Debug)]
pub(super) enum DownloadState {
None,
GotHead,
#[allow(dead_code)]
Partial { content_type: Option<String>, size: usize },
#[allow(dead_code)]
Done { content_type: Option<String>, size: usize },
}
#[derive(Clone, Debug)]
pub(super) struct Client {
client: HttpClient,
@ -37,6 +47,7 @@ pub(super) struct Client {
body_timeout: Option<Duration>,
}
impl Client {
pub(super) fn new( buffer: usize
, rate_limit: u64
@ -67,12 +78,12 @@ impl Client {
Ok(Self {client, default_headers, body_timeout})
}
fn set_body_timeout(mut self, timeout: Option<Duration>) -> Self {
pub(super) fn set_body_timeout(mut self, timeout: Option<Duration>) -> Self {
self.body_timeout = timeout;
self
}
fn set_origin(mut self, origin: Option<String>) -> Self {
pub(super) fn set_origin(mut self, origin: Option<String>) -> Self {
if let Some(origin) = origin {
self.default_headers.insert(
ORIGIN,
@ -83,7 +94,7 @@ impl Client {
self
}
fn set_user_agent(mut self, user_agent: Option<String>) -> Self {
pub(super) fn set_user_agent(mut self, user_agent: Option<String>) -> Self {
if let Some(user_agent) = user_agent {
self.default_headers.insert(
USER_AGENT,
@ -115,9 +126,7 @@ impl Client {
, filename: impl AsRef<Path>
, uri: &Uri
, headers: &HeaderMap )
-> anyhow::Result<Option<(String, usize)>> {
let filename = filename.as_ref();
-> anyhow::Result<DownloadState> {
// - get all informations to eventually existing file
let mut from = util::file_size(&filename).await;
@ -129,10 +138,11 @@ impl Client {
let content_type = util::get_header::<String>( response_headers
, CONTENT_TYPE )
. or(Some("unknown".into()));
let state = DownloadState::GotHead;
if let Some(content_length) = content_length {
if from != 0 && content_length - 1 <= from {
return Ok(None);
return Ok(state);
}
} else {
from = 0;
@ -147,9 +157,9 @@ impl Client {
// - open or create file
let file = util::open_or_create(&response.status(), &filename).await;
// - download Data
let size = self.clone().store_body(file, response.body_mut()).await?;
Ok(content_type.map(|c| (c, size)))
Ok( self.clone().store_body( file
, content_type
, response.body_mut() ).await? )
}
async fn head( &mut self
@ -191,30 +201,36 @@ impl Client {
async fn store_body( self
, mut file: File
, body: &mut ClientBody ) -> anyhow::Result<usize> {
, content_type: Option<String>
, body: &mut ClientBody ) -> Result<DownloadState, DownloadError> {
let mut body = BodyDataStream::new(body);
let mut written = 0;
let mut size = 0;
let mut state = DownloadState::Partial { content_type: content_type.clone(), size };
loop {
let data_future = body.next();
let data = if let Some(io_timeout) = self.body_timeout {
// give timeout somehow... probably from client.
timeout(io_timeout, data_future).await?
timeout(io_timeout, data_future).await
. map_err(|e| DownloadError::new(state.clone(), e.into()))?
} else {
data_future.await
};
match data {
None => break,
Some(Err(e)) => Err(anyhow!(e))?,
Some(Err(e)) => Err(DownloadError::new(state.clone(), anyhow!(e)))?,
Some(Ok(data)) => {
written += data.len();
file . write_all(&data).await?;
file . flush().await?;
size += data.len();
state = DownloadState::Partial { content_type: content_type.clone(), size };
file . write_all(&data).await
. map_err(|e| DownloadError::new(state.clone(), e.into()))?;
file . flush().await
. map_err(|e| DownloadError::new(state.clone(), e.into()))?;
},
}
};
Ok(written)
Ok(DownloadState::Done { content_type, size })
}
}

28
src/client_new/error.rs

@ -2,6 +2,14 @@ use std::{error, fmt};
use http::Response;
use super::DownloadState;
#[derive(Debug)]
pub(super) struct DownloadError {
pub(super) state: DownloadState,
pub(super) source: anyhow::Error,
}
#[derive(Debug)]
pub(super) struct RequestError {
@ -9,6 +17,26 @@ pub(super) struct RequestError {
pub(super) source: Option<anyhow::Error>,
}
impl DownloadError {
pub(super) fn new( state: DownloadState
, source: anyhow::Error ) -> Self {
Self { state, source }
}
}
impl error::Error for DownloadError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
Some(self.source.as_ref())
}
}
impl fmt::Display for DownloadError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "download error ({:?}): {}", self.state, self.source)
}
}
impl RequestError {
pub(super) fn new( response: Option<Response<()>>
, source: Option<anyhow::Error> ) -> Self {

4
src/m3u8_download.rs

@ -7,7 +7,7 @@ use log::debug;
use m3u8_rs::{MediaPlaylist, MediaSegment, Playlist};
use tokio::{io::AsyncWriteExt as _, fs::File};
use crate::client::{ClientActorHandle, DownloadState};
use crate::{client_actor::ClientActorHandle, client_new::DownloadState};
#[derive(Clone, Debug)]
@ -57,7 +57,7 @@ impl TsPart {
async fn download(&mut self, client: &ClientActorHandle) -> &Self {
let state = client.download(self.filename.clone(), &self.uri).await;
match state {
Ok(Some(DownloadState::Done { content_type })) => {
Ok(DownloadState::Done { content_type, .. }) => {
self.state = TsState::Ready;
self.content_type = content_type;
},

38
src/main.rs

@ -4,18 +4,15 @@ mod m3u8_download;
mod client_new;
mod client_actor;
use std::{
ffi::OsStr,
path::PathBuf,
time::Duration
};
use std::{ffi::OsStr, path::PathBuf, time::Duration};
use anyhow::anyhow;
use clap::Parser;
use client_actor::ClientActorHandle;
use client_new::Client;
use env_logger::Env;
use http::Uri;
use m3u8_download::M3u8Download;
use client::ClientActorHandle;
use log::{debug, info};
@ -73,6 +70,11 @@ async fn main() -> anyhow::Result<()> {
let concurrency_limit = args.concurrency.unwrap_or(20);
let timeout = args.timeout.unwrap_or(15);
let timeout = Duration::from_secs(timeout);
let body_timeout = if args.use_body_timeout {
Some(timeout)
} else {
None
};
let m3u8_uri = Uri::try_from(&args.url)?;
@ -82,24 +84,28 @@ async fn main() -> anyhow::Result<()> {
info!("Creating an HTTP client with Tower layers...");
let client = ClientActorHandle::new( buffer
, rate_limit
, concurrency_limit
, timeout
, args.use_body_timeout
, args.origin
, args.agent );
let client = Client::new(buffer, rate_limit, concurrency_limit, timeout)?
. set_body_timeout(body_timeout)
. set_origin(args.origin);
let client = if let Some(user_agent) = args.agent {
client.set_user_agent(Some(user_agent))
} else {
client
};
let actor = ClientActorHandle::new(client, buffer + concurrency_limit);
info!("Get segments...");
let m3u8_data = client.body_bytes(&m3u8_uri).await
let m3u8_data = actor.body_bytes(&m3u8_uri).await
. ok_or(anyhow!("Unable to get body for: {}", m3u8_uri))?;
let mut download = M3u8Download::new(m3u8_data, m3u8_uri).await?;
info!("Sending concurrent requests...");
download.download(&client).await;
client.stop();
download.download(&actor).await;
actor.stop();
info!("Call ffmpeg to join ts files to single mp4...");

Loading…
Cancel
Save