Browse Source

Add Actor action to retrieve the body

main
Georg Hopp 11 months ago
parent
commit
eaff58f52f
Signed by: ghopp GPG Key ID: 4C5D226768784538
  1. 2
      src/main.rs
  2. 121
      src/new_client.rs

2
src/main.rs

@ -91,6 +91,8 @@ async fn main() -> anyhow::Result<()> {
let handle = ClientActorHandle::new(timeout);
handle.download("foo.m3u8", &m3u8_uri).await;
let body = handle.body_bytes(&m3u8_uri).await;
debug!("body: {:?}", body);
handle.stop();
let mut join_set = JoinSet::new();

121
src/new_client.rs

@ -24,7 +24,7 @@ use tokio::{
};
use tower::{util::BoxCloneService, ServiceBuilder, ServiceExt as _};
use tower_http::decompression::{DecompressionBody, DecompressionLayer};
use tower_http_client::ServiceExt as _;
use tower_http_client::{client::BodyReader, ServiceExt as _};
use tower_reqwest::HttpClientLayer;
use crate::{
@ -42,6 +42,10 @@ pub(super) enum ClientActorMessage {
uri: Uri,
respond_to: oneshot::Sender<DownloadState>,
},
GetData {
uri: Uri,
respond_to: oneshot::Sender<Option<Vec<u8>>>,
},
}
#[derive(Clone, Debug)]
@ -51,6 +55,11 @@ pub(super) enum ClientActorMessageHandle {
uri: Uri,
message: ActionIndex,
},
GetData {
uri: Uri,
buffer: Option<Vec<u8>>,
message: ActionIndex,
},
}
pub(super) type ActionIndex = u64;
@ -98,18 +107,38 @@ async fn run_client(mut actor: ClientActor) {
// when the task finishes
Ok(Ok(Some(action))) => {
info!("Done download: {:?}", action);
use ClientActorMessageHandle::Download;
use ClientActorMessageHandle::{Download, GetData};
match action {
Download { filename: _, uri: _, ref message } => {
if let Some((_, message)) = actor.actions.remove_entry(message) {
use ClientActorMessage::Download;
let Download { filename: _, uri: _, respond_to } = message;
let _ = respond_to.send(DownloadState::Ready);
match message {
Download { filename: _, uri: _, respond_to } => {
let _ = respond_to.send(DownloadState::Ready);
},
_ => panic!("Wrong variant ... this should never happen"),
}
} else {
panic!("Lost a message");
}
}
},
GetData { uri: _, buffer, ref message } => {
if let Some((_, message)) = actor.actions.remove_entry(message) {
use ClientActorMessage::GetData;
match message {
GetData { uri: _, respond_to } => {
let _ = respond_to.send(buffer);
},
_ => panic!("Wrong variant ... this should never happen"),
}
} else {
panic!("Lost a message");
}
},
}
},
@ -223,7 +252,16 @@ async fn store_body( file: &mut File
async fn download( mut client: HttpClient
, message: ClientActorMessageHandle
, io_timeout: Duration ) -> JoinSetResult {
let ClientActorMessageHandle::Download { ref filename, ref uri, message: _ } = message;
use ClientActorMessageHandle::Download;
let (filename, uri) =
if let Download { ref filename, ref uri, message: _ } = message {
(filename, uri)
} else {
return Err(DownloadError::new(
message.clone(),
Some(anyhow!("Called with invalid variant")) ));
};
// - get all informations to eventually existing file
let mut from = file_size(filename).await;
@ -261,6 +299,49 @@ async fn download( mut client: HttpClient
Ok(Some(message))
}
pub(super) async fn body_bytes( mut client: HttpClient
, mut message: ClientActorMessageHandle ) -> JoinSetResult {
use ClientActorMessageHandle::GetData;
let uri =
if let GetData { ref uri, buffer: _, message: _ } = message {
uri
} else {
return Err(DownloadError::new(
message.clone(),
Some(anyhow!("Called with invalid variant")) ));
};
let mut response = request( &mut client
, "GET"
, uri
, HeaderMap::new() )
. await
. map_err(|e| DownloadError::new(message.clone(), Some(e)))?;
// read body into Vec<u8>
let body: Vec<u8> = BodyReader::new(response.body_mut())
. bytes()
. await
. map_err(|e| DownloadError::new( message.clone()
, Some(anyhow!(e.to_string())) ))?
. to_vec();
let buffer =
if let GetData { uri: _, ref mut buffer, message: _ } = message {
buffer
} else {
return Err(DownloadError::new(
message.clone(),
Some(anyhow!("Called with invalid variant")) ));
};
*buffer = Some(body);
Ok(Some(message))
}
impl ClientActor {
pub(super) fn new( concurrency_limit: usize
@ -298,7 +379,7 @@ impl ClientActor {
async fn handle_message(&mut self, message: ClientActorMessage) {
self.actions.insert(self.actions_idx, message);
use ClientActorMessage::Download;
use ClientActorMessage::{Download, GetData};
match self.actions.get(&self.actions_idx) {
Some(Download { ref filename, ref uri, respond_to: _ }) => {
@ -319,6 +400,23 @@ impl ClientActor {
self.actions_idx += 1;
},
Some(GetData { ref uri, respond_to: _ }) => {
// spawn a task that does the work
let client = self.client.clone();
let handle = ClientActorMessageHandle::GetData {
uri: uri.clone(),
buffer: None,
message: self.actions_idx,
};
self.tasks.spawn(async move {
body_bytes(client, handle).await
});
self.actions_idx += 1;
},
None => (),
}
}
@ -352,4 +450,13 @@ impl ClientActorHandle {
let _ = self.sender.send(msg).await;
receive.await.expect("Actor cancelled unexpected");
}
pub(super) async fn body_bytes(&self, uri: &Uri) -> Option<Vec<u8>> {
let uri = uri.to_owned();
let (send, receive) = oneshot::channel();
let msg = ClientActorMessage::GetData { uri, respond_to: send };
let _ = self.sender.send(msg).await;
receive.await.expect("Actor cancelled unexpected")
}
}
Loading…
Cancel
Save