Browse Source

do not retry download in actor

main
Georg Hopp 11 months ago
parent
commit
dc519c01a5
Signed by: ghopp GPG Key ID: 4C5D226768784538
  1. 43
      src/client_actor.rs
  2. 31
      src/client_actor/message.rs
  3. 74
      src/client_actor/util.rs

43
src/client_actor.rs

@ -35,6 +35,12 @@ pub(super) struct ClientActorHandle {
}
impl From<ClientActorError> for DownloadResult {
fn from(value: ClientActorError) -> Self {
Err(value)
}
}
impl ClientActor {
fn new( client: Client
, max_tasks: usize
@ -42,6 +48,10 @@ impl ClientActor {
, abort_rx: oneshot::Receiver<ClientTaskResult> ) -> anyhow::Result<Self> {
let mut tasks = JoinSet::new();
// the actor uses a JoinSet to make concurrent requests and Downloads
// calling next on an empty JoinSet always yields (afaic). To prevent
// this I spawn a taskt in that JoinSet, that will only finish when the
// application stops or a message reaches abort_rx.
tasks.spawn(async move {
let _ = abort_rx.await;
Ok(None)
@ -59,6 +69,39 @@ impl ClientActor {
receiver })
}
fn get_action( &mut self
, handle: &ClientActorMessageHandle ) -> ClientActorMessage {
match self.actions.remove(&handle.action_index()) {
Some(message) => message,
None => panic!("Lost a message"),
}
}
fn respond_action_ok( &mut self
, handle: &ClientActorMessageHandle ) {
match self.get_action(handle) {
ClientActorMessage::Download { respond_to, .. } => {
let _ = respond_to.send(Ok(handle.state_ref().clone()));
},
ClientActorMessage::GetData { respond_to, .. } => {
let _ = respond_to.send(handle.buffer_ref().clone());
},
}
}
fn respond_action_err( &mut self
, error: ClientActorError ) {
let handle = error.action.clone();
match self.get_action(&handle) {
ClientActorMessage::Download { respond_to, .. } => {
let _ = respond_to.send(Err(error));
},
ClientActorMessage::GetData { respond_to, .. } => {
let _ = respond_to.send(handle.buffer_ref().clone());
},
}
}
async fn handle_message(&mut self, message: ClientActorMessage) {
self.actions.insert(self.actions_idx, message);

31
src/client_actor/message.rs

@ -40,11 +40,22 @@ pub(super) enum ClientActorMessageHandle {
impl ClientActorMessageHandle {
pub(super) fn set_state(&mut self, new_state: DownloadState) {
pub(super) fn state_ref(&self) -> &DownloadState {
match self {
Self::Download { ref state, .. } => state,
_ => panic!("Called with invalid variant"),
}
}
pub(super) fn state_mut(&mut self) -> &mut DownloadState {
match self {
Self::Download { ref mut state, .. } => *state = new_state,
Self::Download { ref mut state, .. } => state,
_ => panic!("Called with invalid variant"),
};
}
}
pub(super) fn set_state(&mut self, new_state: DownloadState) {
*self.state_mut() = new_state;
}
pub(super) fn filename(&self) -> PathBuf {
@ -61,10 +72,24 @@ impl ClientActorMessageHandle {
}
}
pub(super) fn buffer_ref(&self) -> &Option<Bytes> {
match self {
Self::GetData { ref buffer, .. } => buffer,
_ => panic!("Called with invalid variant"),
}
}
pub(super) fn buffer_mut(&mut self) -> &mut Option<Bytes> {
match self {
Self::GetData { ref mut buffer, .. } => buffer,
_ => panic!("Called with invalid variant"),
}
}
pub(super) fn action_index(&self) -> ActionIndex {
match self {
Self::Download { ref message, .. } => *message,
Self::GetData { ref message, .. } => *message,
}
}
}

74
src/client_actor/util.rs

@ -1,87 +1,23 @@
use http::HeaderMap;
use log::{error, info};
use tokio::select;
use super::{
error::ClientActorError,
message::{ClientActorMessage, ClientActorMessageHandle},
ClientActor,
ClientTaskResult,
};
async fn process_next_result(mut actor: ClientActor, result: ClientTaskResult) -> ClientActor {
use ClientActorMessageHandle::{Download, GetData};
match result {
Err(mut e) => {
info!("Retry failed download: {:?}", e);
// retry ... instead of responing here we could also respond
// with something that in turn would be used to retry...
let mut client = actor.client.clone();
actor.tasks.spawn(async move {
match e.action {
Download { .. } => {
let result = client.download( e.action.filename()
, &e.action.uri()
, &HeaderMap::new()).await;
match result {
Err(source) =>
Err(ClientActorError::new( &e.action
, source.into() )),
Ok(state) => {
e.action.set_state(state);
Ok(Some(e.action))
},
}
},
GetData { .. } => {
let result = client.data(&e.action.uri(), &HeaderMap::new()).await;
match result {
Err(source) =>
Err(ClientActorError::new( &e.action
, source.into() )),
Ok(data) => {
*e.action.buffer_mut() = Some(data);
Ok(Some(e.action))
},
}
},
}
});
Err(e) => {
info!("Aborted download: {:?}", e);
actor.respond_action_err(e);
},
// when the task finishes
Ok(Some(action)) => {
match action {
Download { ref uri, ref state, ref message, .. } => {
info!("Done download: {:?}", uri);
if let Some((_, message)) = actor.actions.remove_entry(message) {
match message {
ClientActorMessage::Download { respond_to, .. } => {
let _ = respond_to.send(Ok(state.clone()));
},
_ => panic!("Wrong variant ... this should never happen"),
}
} else {
panic!("Lost a message");
}
},
GetData { ref uri, ref buffer, ref message } => {
info!("Done get_data: {:?}", uri);
if let Some((_, message)) = actor.actions.remove_entry(message) {
match message {
ClientActorMessage::GetData { respond_to, .. } => {
let _ = respond_to.send(buffer.clone());
},
_ => panic!("Wrong variant ... this should never happen"),
}
} else {
panic!("Lost a message");
}
},
}
info!("Done download: {:?}", action);
actor.respond_action_ok(&action);
},
// Got a stop message...here we still continue procession until the

Loading…
Cancel
Save