|
|
|
@ -6,11 +6,15 @@ mod message; |
|
|
|
|
|
|
|
use std::{collections::HashMap, path::Path, time::Duration};
|
|
|
|
|
|
|
|
use clap::{crate_name, crate_version};
|
|
|
|
use error::ClientError;
|
|
|
|
use http::{
|
|
|
|
header::{ORIGIN, USER_AGENT},
|
|
|
|
HeaderMap,
|
|
|
|
HeaderValue,
|
|
|
|
Request,
|
|
|
|
Response,
|
|
|
|
Uri
|
|
|
|
Uri,
|
|
|
|
};
|
|
|
|
use message::{ClientActorMessage, ClientActorMessageHandle};
|
|
|
|
use reqwest::{redirect::Policy, Body};
|
|
|
|
@ -26,25 +30,6 @@ use tower_reqwest::HttpClientLayer; |
|
|
|
use log::{debug, error, info};
|
|
|
|
|
|
|
|
|
|
|
|
#[macro_export]
|
|
|
|
macro_rules! mk_dlerror {
|
|
|
|
($message:ident, $($err:tt)*) => {{
|
|
|
|
use $crate::client::error;
|
|
|
|
error::ClientError::new( $message.clone()
|
|
|
|
, Some(anyhow::anyhow!($($err)*) ))
|
|
|
|
}};
|
|
|
|
}
|
|
|
|
|
|
|
|
#[macro_export]
|
|
|
|
macro_rules! map_dlerror {
|
|
|
|
($message:ident) => {{
|
|
|
|
use $crate::client::error;
|
|
|
|
|e| error::ClientError::new( $message.clone()
|
|
|
|
, Some(anyhow::anyhow!(e) ))
|
|
|
|
}};
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub enum DownloadState {
|
|
|
|
GotHead,
|
|
|
|
@ -62,13 +47,15 @@ type HttpClient = BoxCloneService<Request<Body>, Response<DecompressionBody<Body |
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct ClientActor {
|
|
|
|
timeout: Duration,
|
|
|
|
body_timeout: Option<Duration>,
|
|
|
|
client: HttpClient,
|
|
|
|
tasks: JoinSet<JoinSetResult>,
|
|
|
|
actions: HashMap<ActionIndex, ClientActorMessage>,
|
|
|
|
actions_idx: ActionIndex,
|
|
|
|
default_headers: HeaderMap,
|
|
|
|
|
|
|
|
receiver: mpsc::Receiver<ClientActorMessage>,
|
|
|
|
tasks_left: usize,
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@ -79,8 +66,8 @@ pub(super) struct ClientActorHandle { |
|
|
|
|
|
|
|
async fn run_client(mut actor: ClientActor) {
|
|
|
|
loop {
|
|
|
|
select! {
|
|
|
|
Some(join) = actor.tasks.join_next() => {
|
|
|
|
if actor.tasks_left == 0 {
|
|
|
|
if let Some(join) = actor.tasks.join_next().await {
|
|
|
|
use ClientActorMessageHandle::{Download, GetData};
|
|
|
|
|
|
|
|
match join {
|
|
|
|
@ -94,10 +81,15 @@ async fn run_client(mut actor: ClientActor) { |
|
|
|
// retry ... instead of responing here we could also respond
|
|
|
|
// with something that in turn would be used to retry...
|
|
|
|
let client = actor.client.clone();
|
|
|
|
let default_headers = actor.default_headers.clone();
|
|
|
|
actor.tasks.spawn(async move {
|
|
|
|
match e.action {
|
|
|
|
Download { .. } =>
|
|
|
|
download::download(client, e.action, actor.timeout).await,
|
|
|
|
Download { .. } => {
|
|
|
|
download::download( client
|
|
|
|
, default_headers
|
|
|
|
, e.action
|
|
|
|
, actor.body_timeout ).await
|
|
|
|
},
|
|
|
|
GetData { .. } =>
|
|
|
|
data::data(client, e.action).await,
|
|
|
|
}
|
|
|
|
@ -106,6 +98,7 @@ async fn run_client(mut actor: ClientActor) { |
|
|
|
|
|
|
|
// when the task finishes
|
|
|
|
Ok(Ok(Some(action))) => {
|
|
|
|
actor.tasks_left += 1;
|
|
|
|
match action {
|
|
|
|
Download { filename: _, ref uri, state, ref message } => {
|
|
|
|
info!("Done download: {:?}", uri);
|
|
|
|
@ -144,13 +137,87 @@ async fn run_client(mut actor: ClientActor) { |
|
|
|
// JoinSet is empty.
|
|
|
|
Ok(Ok(None)) => (),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
};
|
|
|
|
} else {
|
|
|
|
select! {
|
|
|
|
Some(join) = actor.tasks.join_next() => {
|
|
|
|
use ClientActorMessageHandle::{Download, GetData};
|
|
|
|
|
|
|
|
match join {
|
|
|
|
Err(e) => {
|
|
|
|
error!("FATAL Join failed: {}", e);
|
|
|
|
break
|
|
|
|
},
|
|
|
|
|
|
|
|
Ok(Err(e)) => {
|
|
|
|
info!("Retry failed download: {:?}", e);
|
|
|
|
// retry ... instead of responing here we could also respond
|
|
|
|
// with something that in turn would be used to retry...
|
|
|
|
let client = actor.client.clone();
|
|
|
|
let default_headers = actor.default_headers.clone();
|
|
|
|
actor.tasks.spawn(async move {
|
|
|
|
match e.action {
|
|
|
|
Download { .. } =>
|
|
|
|
download::download( client
|
|
|
|
, default_headers
|
|
|
|
, e.action
|
|
|
|
, actor.body_timeout ).await,
|
|
|
|
GetData { .. } =>
|
|
|
|
data::data(client, e.action).await,
|
|
|
|
}
|
|
|
|
});
|
|
|
|
},
|
|
|
|
|
|
|
|
// when the task finishes
|
|
|
|
Ok(Ok(Some(action))) => {
|
|
|
|
actor.tasks_left += 1;
|
|
|
|
match action {
|
|
|
|
Download { filename: _, ref uri, state, ref message } => {
|
|
|
|
info!("Done download: {:?}", uri);
|
|
|
|
if let Some((_, message)) = actor.actions.remove_entry(message) {
|
|
|
|
use ClientActorMessage::Download;
|
|
|
|
|
|
|
|
match message {
|
|
|
|
Download { respond_to, .. } => {
|
|
|
|
let _ = respond_to.send(Ok(state));
|
|
|
|
},
|
|
|
|
_ => panic!("Wrong variant ... this should never happen"),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
panic!("Lost a message");
|
|
|
|
}
|
|
|
|
},
|
|
|
|
|
|
|
|
GetData { ref uri, buffer, ref message } => {
|
|
|
|
info!("Done get_data: {:?}", uri);
|
|
|
|
if let Some((_, message)) = actor.actions.remove_entry(message) {
|
|
|
|
use ClientActorMessage::GetData;
|
|
|
|
match message {
|
|
|
|
GetData { respond_to, .. } => {
|
|
|
|
let _ = respond_to.send(buffer);
|
|
|
|
},
|
|
|
|
_ => panic!("Wrong variant ... this should never happen"),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
panic!("Lost a message");
|
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
},
|
|
|
|
|
|
|
|
Some(message) = actor.receiver.recv() => {
|
|
|
|
actor.handle_message(message).await;
|
|
|
|
}
|
|
|
|
// Got a stop message...here we still continue procession until the
|
|
|
|
// JoinSet is empty.
|
|
|
|
Ok(Ok(None)) => (),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
Some(message) = actor.receiver.recv() => {
|
|
|
|
actor.tasks_left -= 1;
|
|
|
|
actor.handle_message(message).await;
|
|
|
|
}
|
|
|
|
|
|
|
|
else => {}
|
|
|
|
else => {}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@ -158,12 +225,12 @@ async fn run_client(mut actor: ClientActor) { |
|
|
|
|
|
|
|
|
|
|
|
impl ClientActor {
|
|
|
|
pub(super) fn new( buffer: usize |
|
|
|
, rate_limit: u64 |
|
|
|
, concurrency_limit: usize |
|
|
|
, timeout: Duration
|
|
|
|
, receiver: mpsc::Receiver<ClientActorMessage>
|
|
|
|
, abort_rx: oneshot::Receiver<JoinSetResult> ) -> anyhow::Result<Self> {
|
|
|
|
fn new( buffer: usize |
|
|
|
, rate_limit: u64 |
|
|
|
, concurrency_limit: usize |
|
|
|
, timeout: Duration
|
|
|
|
, receiver: mpsc::Receiver<ClientActorMessage>
|
|
|
|
, abort_rx: oneshot::Receiver<JoinSetResult> ) -> anyhow::Result<Self> {
|
|
|
|
let client = ServiceBuilder::new()
|
|
|
|
// Add some layers.
|
|
|
|
. buffer(buffer)
|
|
|
|
@ -179,8 +246,6 @@ impl ClientActor { |
|
|
|
. map_err(anyhow::Error::msg)
|
|
|
|
. boxed_clone();
|
|
|
|
|
|
|
|
debug!("-> client: {:?}", client);
|
|
|
|
|
|
|
|
let mut tasks = JoinSet::new();
|
|
|
|
|
|
|
|
tasks.spawn(async move {
|
|
|
|
@ -190,8 +255,50 @@ impl ClientActor { |
|
|
|
|
|
|
|
let actions = HashMap::new();
|
|
|
|
let actions_idx = 0;
|
|
|
|
let tasks_left = buffer + concurrency_limit;
|
|
|
|
let body_timeout = None;
|
|
|
|
let mut default_headers = HeaderMap::new();
|
|
|
|
default_headers.insert(
|
|
|
|
USER_AGENT,
|
|
|
|
HeaderValue::from_str(&( crate_name!().to_string() + "/"
|
|
|
|
+ crate_version!() )).unwrap() );
|
|
|
|
|
|
|
|
Ok(Self {
|
|
|
|
body_timeout,
|
|
|
|
client,
|
|
|
|
tasks,
|
|
|
|
receiver,
|
|
|
|
actions,
|
|
|
|
default_headers,
|
|
|
|
actions_idx,
|
|
|
|
tasks_left })
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(Self {timeout, client, tasks, receiver, actions, actions_idx})
|
|
|
|
fn set_body_timeout(mut self, timeout: Option<Duration>) -> Self {
|
|
|
|
self.body_timeout = timeout;
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
|
|
|
fn set_origin(mut self, origin: Option<String>) -> Self {
|
|
|
|
if let Some(origin) = origin {
|
|
|
|
self.default_headers.insert(
|
|
|
|
ORIGIN,
|
|
|
|
HeaderValue::from_str(origin.as_str()).unwrap() );
|
|
|
|
} else {
|
|
|
|
self.default_headers.remove(ORIGIN);
|
|
|
|
}
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
|
|
|
fn set_user_agent(mut self, user_agent: Option<String>) -> Self {
|
|
|
|
if let Some(user_agent) = user_agent {
|
|
|
|
self.default_headers.insert(
|
|
|
|
USER_AGENT,
|
|
|
|
HeaderValue::from_str(user_agent.as_str()).unwrap() );
|
|
|
|
} else {
|
|
|
|
self.default_headers.remove(USER_AGENT);
|
|
|
|
}
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn handle_message(&mut self, message: ClientActorMessage) {
|
|
|
|
@ -203,7 +310,7 @@ impl ClientActor { |
|
|
|
Some(Download { ref filename, ref uri, respond_to: _ }) => {
|
|
|
|
// spawn a task that does the work
|
|
|
|
let client = self.client.clone();
|
|
|
|
let timeout = self.timeout;
|
|
|
|
let timeout = self.body_timeout;
|
|
|
|
|
|
|
|
let handle = ClientActorMessageHandle::Download {
|
|
|
|
filename: filename.to_path_buf(),
|
|
|
|
@ -212,8 +319,12 @@ impl ClientActor { |
|
|
|
message: self.actions_idx,
|
|
|
|
};
|
|
|
|
|
|
|
|
let default_headers = self.default_headers.clone();
|
|
|
|
self.tasks.spawn(async move {
|
|
|
|
download::download(client, handle, timeout).await
|
|
|
|
download::download( client
|
|
|
|
, default_headers
|
|
|
|
, handle
|
|
|
|
, timeout ).await
|
|
|
|
});
|
|
|
|
|
|
|
|
self.actions_idx += 1;
|
|
|
|
@ -245,7 +356,10 @@ impl ClientActorHandle { |
|
|
|
pub(super) fn new( buffer: usize |
|
|
|
, rate_limit: u64 |
|
|
|
, concurrency_limit: usize |
|
|
|
, timeout: Duration ) -> Self {
|
|
|
|
, timeout: Duration
|
|
|
|
, use_body_timeout: bool |
|
|
|
, origin: Option<String>
|
|
|
|
, user_agent: Option<String> ) -> Self {
|
|
|
|
let (sender, receiver) = mpsc::channel(1);
|
|
|
|
let (abort, abort_rx) = oneshot::channel::<JoinSetResult>();
|
|
|
|
let actor = ClientActor::new( buffer
|
|
|
|
@ -254,7 +368,23 @@ impl ClientActorHandle { |
|
|
|
, timeout
|
|
|
|
, receiver
|
|
|
|
, abort_rx )
|
|
|
|
. expect("Client create error");
|
|
|
|
. expect("Client create error")
|
|
|
|
. set_origin(origin);
|
|
|
|
|
|
|
|
let actor = if let Some(user_agent) = user_agent {
|
|
|
|
actor.set_user_agent(Some(user_agent))
|
|
|
|
} else {
|
|
|
|
actor
|
|
|
|
};
|
|
|
|
|
|
|
|
let actor = if use_body_timeout {
|
|
|
|
actor.set_body_timeout(Some(timeout))
|
|
|
|
} else {
|
|
|
|
actor
|
|
|
|
};
|
|
|
|
|
|
|
|
debug!("-> actor: {:?}", actor);
|
|
|
|
|
|
|
|
tokio::spawn(run_client(actor));
|
|
|
|
|
|
|
|
Self { sender, abort }
|
|
|
|
|