From bd0fa9036302f966449bd8f474fb084aea424b60 Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Fri, 10 Jan 2025 23:32:02 +0100 Subject: [PATCH] code deduplication --- src/client.rs | 211 ++++++++++++++++++-------------------------------- 1 file changed, 77 insertions(+), 134 deletions(-) diff --git a/src/client.rs b/src/client.rs index 4888055..f725702 100644 --- a/src/client.rs +++ b/src/client.rs @@ -55,7 +55,7 @@ struct ClientActor { default_headers: HeaderMap, receiver: mpsc::Receiver, - tasks_left: usize, + max_tasks: usize, } @@ -64,155 +64,99 @@ pub(super) struct ClientActorHandle { abort: oneshot::Sender, } +async fn process_next_result(mut actor: ClientActor, result: JoinSetResult) -> ClientActor { + use ClientActorMessageHandle::{Download, GetData}; + + match result { + Err(e) => { + info!("Retry failed download: {:?}", e); + // retry ... instead of responing here we could also respond + // with something that in turn would be used to retry... + let client = actor.client.clone(); + let default_headers = actor.default_headers.clone(); + actor.tasks.spawn(async move { + match e.action { + Download { .. } => { + download::download( client + , default_headers + , e.action + , actor.body_timeout ).await + }, + GetData { .. } => + data::data(client, e.action).await, + } + }); + }, + + // when the task finishes + Ok(Some(action)) => { + match action { + Download { ref uri, ref state, ref message, .. } => { + info!("Done download: {:?}", uri); + if let Some((_, message)) = actor.actions.remove_entry(message) { + use ClientActorMessage::Download; + + match message { + Download { respond_to, .. } => { + let _ = respond_to.send(Ok(state.clone())); + }, + _ => panic!("Wrong variant ... this should never happen"), + } + } else { + panic!("Lost a message"); + } + }, + + GetData { ref uri, ref buffer, ref message } => { + info!("Done get_data: {:?}", uri); + if let Some((_, message)) = actor.actions.remove_entry(message) { + use ClientActorMessage::GetData; + match message { + GetData { respond_to, .. } => { + let _ = respond_to.send(buffer.clone()); + }, + _ => panic!("Wrong variant ... this should never happen"), + } + } else { + panic!("Lost a message"); + } + }, + } + }, + + // Got a stop message...here we still continue procession until the + // JoinSet is empty. + Ok(None) => (), + }; + + actor +} + async fn run_client(mut actor: ClientActor) { loop { - if actor.tasks_left == 0 { + if actor.tasks.len() >= actor.max_tasks { if let Some(join) = actor.tasks.join_next().await { - use ClientActorMessageHandle::{Download, GetData}; - match join { Err(e) => { error!("FATAL Join failed: {}", e); break }, - - Ok(Err(e)) => { - info!("Retry failed download: {:?}", e); - // retry ... instead of responing here we could also respond - // with something that in turn would be used to retry... - let client = actor.client.clone(); - let default_headers = actor.default_headers.clone(); - actor.tasks.spawn(async move { - match e.action { - Download { .. } => { - download::download( client - , default_headers - , e.action - , actor.body_timeout ).await - }, - GetData { .. } => - data::data(client, e.action).await, - } - }); - }, - - // when the task finishes - Ok(Ok(Some(action))) => { - actor.tasks_left += 1; - match action { - Download { filename: _, ref uri, state, ref message } => { - info!("Done download: {:?}", uri); - if let Some((_, message)) = actor.actions.remove_entry(message) { - use ClientActorMessage::Download; - - match message { - Download { respond_to, .. } => { - let _ = respond_to.send(Ok(state)); - }, - _ => panic!("Wrong variant ... this should never happen"), - } - } else { - panic!("Lost a message"); - } - }, - - GetData { ref uri, buffer, ref message } => { - info!("Done get_data: {:?}", uri); - if let Some((_, message)) = actor.actions.remove_entry(message) { - use ClientActorMessage::GetData; - match message { - GetData { respond_to, .. } => { - let _ = respond_to.send(buffer); - }, - _ => panic!("Wrong variant ... this should never happen"), - } - } else { - panic!("Lost a message"); - } - }, - } - }, - - // Got a stop message...here we still continue procession until the - // JoinSet is empty. - Ok(Ok(None)) => (), - }; + Ok(result) => actor = process_next_result(actor, result).await, + } }; } else { select! { Some(join) = actor.tasks.join_next() => { - use ClientActorMessageHandle::{Download, GetData}; - match join { Err(e) => { error!("FATAL Join failed: {}", e); break }, - - Ok(Err(e)) => { - info!("Retry failed download: {:?}", e); - // retry ... instead of responing here we could also respond - // with something that in turn would be used to retry... - let client = actor.client.clone(); - let default_headers = actor.default_headers.clone(); - actor.tasks.spawn(async move { - match e.action { - Download { .. } => - download::download( client - , default_headers - , e.action - , actor.body_timeout ).await, - GetData { .. } => - data::data(client, e.action).await, - } - }); - }, - - // when the task finishes - Ok(Ok(Some(action))) => { - actor.tasks_left += 1; - match action { - Download { filename: _, ref uri, state, ref message } => { - info!("Done download: {:?}", uri); - if let Some((_, message)) = actor.actions.remove_entry(message) { - use ClientActorMessage::Download; - - match message { - Download { respond_to, .. } => { - let _ = respond_to.send(Ok(state)); - }, - _ => panic!("Wrong variant ... this should never happen"), - } - } else { - panic!("Lost a message"); - } - }, - - GetData { ref uri, buffer, ref message } => { - info!("Done get_data: {:?}", uri); - if let Some((_, message)) = actor.actions.remove_entry(message) { - use ClientActorMessage::GetData; - match message { - GetData { respond_to, .. } => { - let _ = respond_to.send(buffer); - }, - _ => panic!("Wrong variant ... this should never happen"), - } - } else { - panic!("Lost a message"); - } - }, - } - }, - - // Got a stop message...here we still continue procession until the - // JoinSet is empty. - Ok(Ok(None)) => (), - }; + Ok(result) => actor = process_next_result(actor, result).await, + } } Some(message) = actor.receiver.recv() => { - actor.tasks_left -= 1; actor.handle_message(message).await; } @@ -223,7 +167,6 @@ async fn run_client(mut actor: ClientActor) { } - impl ClientActor { fn new( buffer: usize , rate_limit: u64 @@ -271,7 +214,7 @@ impl ClientActor { actions, default_headers, actions_idx, - tasks_left }) + max_tasks: tasks_left }) } fn set_body_timeout(mut self, timeout: Option) -> Self { @@ -307,7 +250,7 @@ impl ClientActor { use ClientActorMessage::{Download, GetData}; match self.actions.get(&self.actions_idx) { - Some(Download { ref filename, ref uri, respond_to: _ }) => { + Some(Download { ref filename, ref uri, .. }) => { // spawn a task that does the work let client = self.client.clone(); let timeout = self.body_timeout; @@ -330,7 +273,7 @@ impl ClientActor { self.actions_idx += 1; }, - Some(GetData { ref uri, respond_to: _ }) => { + Some(GetData { ref uri, .. }) => { // spawn a task that does the work let client = self.client.clone();