diff --git a/src/client_actor.rs b/src/client_actor.rs index 8ff0ce9..5af2e04 100644 --- a/src/client_actor.rs +++ b/src/client_actor.rs @@ -35,6 +35,12 @@ pub(super) struct ClientActorHandle { } +impl From for DownloadResult { + fn from(value: ClientActorError) -> Self { + Err(value) + } +} + impl ClientActor { fn new( client: Client , max_tasks: usize @@ -42,6 +48,10 @@ impl ClientActor { , abort_rx: oneshot::Receiver ) -> anyhow::Result { let mut tasks = JoinSet::new(); + // the actor uses a JoinSet to make concurrent requests and Downloads + // calling next on an empty JoinSet always yields (afaic). To prevent + // this I spawn a taskt in that JoinSet, that will only finish when the + // application stops or a message reaches abort_rx. tasks.spawn(async move { let _ = abort_rx.await; Ok(None) @@ -59,6 +69,39 @@ impl ClientActor { receiver }) } + fn get_action( &mut self + , handle: &ClientActorMessageHandle ) -> ClientActorMessage { + match self.actions.remove(&handle.action_index()) { + Some(message) => message, + None => panic!("Lost a message"), + } + } + + fn respond_action_ok( &mut self + , handle: &ClientActorMessageHandle ) { + match self.get_action(handle) { + ClientActorMessage::Download { respond_to, .. } => { + let _ = respond_to.send(Ok(handle.state_ref().clone())); + }, + ClientActorMessage::GetData { respond_to, .. } => { + let _ = respond_to.send(handle.buffer_ref().clone()); + }, + } + } + + fn respond_action_err( &mut self + , error: ClientActorError ) { + let handle = error.action.clone(); + match self.get_action(&handle) { + ClientActorMessage::Download { respond_to, .. } => { + let _ = respond_to.send(Err(error)); + }, + ClientActorMessage::GetData { respond_to, .. } => { + let _ = respond_to.send(handle.buffer_ref().clone()); + }, + } + } + async fn handle_message(&mut self, message: ClientActorMessage) { self.actions.insert(self.actions_idx, message); diff --git a/src/client_actor/message.rs b/src/client_actor/message.rs index d188f73..d067aed 100644 --- a/src/client_actor/message.rs +++ b/src/client_actor/message.rs @@ -40,11 +40,22 @@ pub(super) enum ClientActorMessageHandle { impl ClientActorMessageHandle { - pub(super) fn set_state(&mut self, new_state: DownloadState) { + pub(super) fn state_ref(&self) -> &DownloadState { + match self { + Self::Download { ref state, .. } => state, + _ => panic!("Called with invalid variant"), + } + } + + pub(super) fn state_mut(&mut self) -> &mut DownloadState { match self { - Self::Download { ref mut state, .. } => *state = new_state, + Self::Download { ref mut state, .. } => state, _ => panic!("Called with invalid variant"), - }; + } + } + + pub(super) fn set_state(&mut self, new_state: DownloadState) { + *self.state_mut() = new_state; } pub(super) fn filename(&self) -> PathBuf { @@ -61,10 +72,24 @@ impl ClientActorMessageHandle { } } + pub(super) fn buffer_ref(&self) -> &Option { + match self { + Self::GetData { ref buffer, .. } => buffer, + _ => panic!("Called with invalid variant"), + } + } + pub(super) fn buffer_mut(&mut self) -> &mut Option { match self { Self::GetData { ref mut buffer, .. } => buffer, _ => panic!("Called with invalid variant"), } } + + pub(super) fn action_index(&self) -> ActionIndex { + match self { + Self::Download { ref message, .. } => *message, + Self::GetData { ref message, .. } => *message, + } + } } diff --git a/src/client_actor/util.rs b/src/client_actor/util.rs index 5359948..8bbae57 100644 --- a/src/client_actor/util.rs +++ b/src/client_actor/util.rs @@ -1,87 +1,23 @@ -use http::HeaderMap; use log::{error, info}; use tokio::select; use super::{ - error::ClientActorError, - message::{ClientActorMessage, ClientActorMessageHandle}, ClientActor, ClientTaskResult, }; async fn process_next_result(mut actor: ClientActor, result: ClientTaskResult) -> ClientActor { - use ClientActorMessageHandle::{Download, GetData}; - match result { - Err(mut e) => { - info!("Retry failed download: {:?}", e); - // retry ... instead of responing here we could also respond - // with something that in turn would be used to retry... - let mut client = actor.client.clone(); - actor.tasks.spawn(async move { - match e.action { - Download { .. } => { - let result = client.download( e.action.filename() - , &e.action.uri() - , &HeaderMap::new()).await; - match result { - Err(source) => - Err(ClientActorError::new( &e.action - , source.into() )), - Ok(state) => { - e.action.set_state(state); - Ok(Some(e.action)) - }, - } - }, - GetData { .. } => { - let result = client.data(&e.action.uri(), &HeaderMap::new()).await; - match result { - Err(source) => - Err(ClientActorError::new( &e.action - , source.into() )), - Ok(data) => { - *e.action.buffer_mut() = Some(data); - Ok(Some(e.action)) - }, - } - }, - } - }); + Err(e) => { + info!("Aborted download: {:?}", e); + actor.respond_action_err(e); }, // when the task finishes Ok(Some(action)) => { - match action { - Download { ref uri, ref state, ref message, .. } => { - info!("Done download: {:?}", uri); - if let Some((_, message)) = actor.actions.remove_entry(message) { - match message { - ClientActorMessage::Download { respond_to, .. } => { - let _ = respond_to.send(Ok(state.clone())); - }, - _ => panic!("Wrong variant ... this should never happen"), - } - } else { - panic!("Lost a message"); - } - }, - - GetData { ref uri, ref buffer, ref message } => { - info!("Done get_data: {:?}", uri); - if let Some((_, message)) = actor.actions.remove_entry(message) { - match message { - ClientActorMessage::GetData { respond_to, .. } => { - let _ = respond_to.send(buffer.clone()); - }, - _ => panic!("Wrong variant ... this should never happen"), - } - } else { - panic!("Lost a message"); - } - }, - } + info!("Done download: {:?}", action); + actor.respond_action_ok(&action); }, // Got a stop message...here we still continue procession until the