mod message; pub(crate) mod error; mod util; use std::{collections::HashMap, path::Path}; use bytes::Bytes; use error::ClientActorError; use http::{HeaderMap, Uri}; use message::{ClientActorMessage, ClientActorMessageHandle}; use tokio::{sync::{mpsc, oneshot}, task::JoinSet}; use super::client::{Client, DownloadState}; type ActionIndex = u64; type ClientTaskResult = Result, ClientActorError>; type DownloadResult = Result; #[derive(Debug)] struct ClientActor { client: Client, tasks: JoinSet, max_tasks: usize, actions: HashMap, actions_idx: ActionIndex, receiver: mpsc::Receiver, } pub(super) struct ClientActorHandle { sender: mpsc::Sender, abort: oneshot::Sender, } impl From for DownloadResult { fn from(value: ClientActorError) -> Self { Err(value) } } impl ClientActor { fn new( client: Client , max_tasks: usize , receiver: mpsc::Receiver , abort_rx: oneshot::Receiver ) -> anyhow::Result { let mut tasks = JoinSet::new(); // the actor uses a JoinSet to make concurrent requests and Downloads // calling next on an empty JoinSet always yields (afaic). To prevent // this I spawn a taskt in that JoinSet, that will only finish when the // application stops or a message reaches abort_rx. tasks.spawn(async move { let _ = abort_rx.await; Ok(None) }); let actions = HashMap::new(); let actions_idx = 0; Ok(Self { client, tasks, max_tasks, actions, actions_idx, receiver }) } fn get_action( &mut self , handle: &ClientActorMessageHandle ) -> ClientActorMessage { match self.actions.remove(&handle.action_index()) { Some(message) => message, None => panic!("Lost a message"), } } fn respond_action_ok( &mut self , handle: &ClientActorMessageHandle ) { match self.get_action(handle) { ClientActorMessage::Download { respond_to, .. } => { let _ = respond_to.send(Ok(handle.state_ref().clone())); }, ClientActorMessage::GetData { respond_to, .. } => { let _ = respond_to.send(handle.buffer_ref().clone()); }, _ => (), } } fn respond_action_err( &mut self , error: ClientActorError ) { let handle = error.action.clone(); match self.get_action(&handle) { ClientActorMessage::Download { respond_to, .. } => { let _ = respond_to.send(Err(error)); }, ClientActorMessage::GetData { respond_to, .. } => { let _ = respond_to.send(handle.buffer_ref().clone()); }, _ => (), } } async fn handle_message(&mut self, message: ClientActorMessage) { self.actions.insert(self.actions_idx, message); use ClientActorMessage::{Download, GetData, Reconnect}; match self.actions.get(&self.actions_idx) { Some(Reconnect) => self.client.rebuild_http_client(), Some(Download { ref filename, ref uri, .. }) => { // spawn a task that does the work let mut client = self.client.clone(); let filename = filename.to_path_buf(); let uri = uri.clone(); let message = self.actions_idx; let mut handle = ClientActorMessageHandle::Download { filename, uri, state: DownloadState::None, message, }; self.tasks.spawn(async move { let result = client.download(handle.filename(), &handle.uri(), &HeaderMap::new()).await; match result { Err(source) => Err(ClientActorError::new(&handle, source.into())), Ok(state) => { handle.set_state(state); Ok(Some(handle)) }, } }); }, Some(GetData { ref uri, .. }) => { // spawn a task that does the work let mut client = self.client.clone(); let uri = uri.clone(); let mut handle = ClientActorMessageHandle::GetData { uri, buffer: None, message: self.actions_idx, }; self.tasks.spawn(async move { let result = client.data(&handle.uri(), &HeaderMap::new()).await; match result { Err(source) => Err(ClientActorError::new(&handle, source.into())), Ok(data) => { *handle.buffer_mut() = Some(data); Ok(Some(handle)) }, } }); }, None => (), } self.actions_idx += 1; } } impl ClientActorHandle { pub(super) fn new(client: Client, max_tasks: usize) -> Self { let (sender, receiver) = mpsc::channel(1); let (abort, abort_rx) = oneshot::channel::(); let actor = ClientActor::new(client, max_tasks, receiver, abort_rx ) . expect("Client create error"); tokio::spawn(util::run_client(actor)); Self { sender, abort } } pub(super) fn stop(self) { let _ = self.abort.send(Ok(None)); drop(self.sender); } pub(super) async fn download( &self , filename: impl AsRef , uri: &Uri ) -> DownloadResult { let filename = filename.as_ref().to_path_buf(); let uri = uri.to_owned(); let (send, receive) = oneshot::channel(); let msg = ClientActorMessage::Download { filename, uri, respond_to: send }; let _ = self.sender.send(msg).await; receive.await.expect("Actor cancelled unexpected") } pub(super) async fn body_bytes(&self, uri: &Uri) -> Option { let uri = uri.to_owned(); let (send, receive) = oneshot::channel(); let msg = ClientActorMessage::GetData { uri, respond_to: send }; let _ = self.sender.send(msg).await; receive.await.expect("Actor cancelled unexpected") } pub(super) async fn reconnect(&self) { let msg = ClientActorMessage::Reconnect; let _ = self.sender.send(msg).await; } }