A tool to get a HLS video stream.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

162 lines
5.0 KiB

mod message;
mod error;
mod util;
use std::{collections::HashMap, path::Path};
use bytes::Bytes;
use error::ClientActorError;
use http::{HeaderMap, Uri};
use message::{ClientActorMessage, ClientActorMessageHandle};
use tokio::{sync::{mpsc, oneshot}, task::JoinSet};
use super::client::{Client, DownloadState};
type ActionIndex = u64;
type ClientTaskResult = Result<Option<ClientActorMessageHandle>, ClientActorError>;
type DownloadResult = Result<DownloadState, ClientActorError>;
#[derive(Debug)]
struct ClientActor {
client: Client,
tasks: JoinSet<ClientTaskResult>,
max_tasks: usize,
actions: HashMap<ActionIndex, ClientActorMessage>,
actions_idx: ActionIndex,
receiver: mpsc::Receiver<ClientActorMessage>,
}
pub(super) struct ClientActorHandle {
sender: mpsc::Sender<ClientActorMessage>,
abort: oneshot::Sender<ClientTaskResult>,
}
impl ClientActor {
fn new( client: Client
, max_tasks: usize
, receiver: mpsc::Receiver<ClientActorMessage>
, abort_rx: oneshot::Receiver<ClientTaskResult> ) -> anyhow::Result<Self> {
let mut tasks = JoinSet::new();
tasks.spawn(async move {
let _ = abort_rx.await;
Ok(None)
});
let actions = HashMap::new();
let actions_idx = 0;
Ok(Self {
client,
tasks,
max_tasks,
actions,
actions_idx,
receiver })
}
async fn handle_message(&mut self, message: ClientActorMessage) {
self.actions.insert(self.actions_idx, message);
use ClientActorMessage::{Download, GetData};
match self.actions.get(&self.actions_idx) {
Some(Download { ref filename, ref uri, .. }) => {
// spawn a task that does the work
let mut client = self.client.clone();
let filename = filename.to_path_buf();
let uri = uri.clone();
let message = self.actions_idx;
let mut handle = ClientActorMessageHandle::Download {
filename,
uri,
state: DownloadState::None,
message,
};
self.tasks.spawn(async move {
let result = client.download(handle.filename(), &handle.uri(), &HeaderMap::new()).await;
match result {
Err(source) =>
Err(ClientActorError::new(&handle, source.into())),
Ok(state) => {
handle.set_state(state);
Ok(Some(handle))
},
}
});
},
Some(GetData { ref uri, .. }) => {
// spawn a task that does the work
let mut client = self.client.clone();
let uri = uri.clone();
let mut handle = ClientActorMessageHandle::GetData {
uri,
buffer: None,
message: self.actions_idx,
};
self.tasks.spawn(async move {
let result = client.data(&handle.uri(), &HeaderMap::new()).await;
match result {
Err(source) =>
Err(ClientActorError::new(&handle, source.into())),
Ok(data) => {
*handle.buffer_mut() = Some(data);
Ok(Some(handle))
},
}
});
},
None => (),
}
self.actions_idx += 1;
}
}
impl ClientActorHandle {
pub(super) fn new(client: Client, max_tasks: usize) -> Self {
let (sender, receiver) = mpsc::channel(1);
let (abort, abort_rx) = oneshot::channel::<ClientTaskResult>();
let actor = ClientActor::new(client, max_tasks, receiver, abort_rx )
. expect("Client create error");
tokio::spawn(util::run_client(actor));
Self { sender, abort }
}
pub(super) fn stop(self) {
let _ = self.abort.send(Ok(None));
drop(self.sender);
}
pub(super) async fn download( &self
, filename: impl AsRef<Path>
, uri: &Uri ) -> DownloadResult {
let filename = filename.as_ref().to_path_buf();
let uri = uri.to_owned();
let (send, receive) = oneshot::channel();
let msg = ClientActorMessage::Download { filename, uri, respond_to: send };
let _ = self.sender.send(msg).await;
receive.await.expect("Actor cancelled unexpected")
}
pub(super) async fn body_bytes(&self, uri: &Uri) -> Option<Bytes> {
let uri = uri.to_owned();
let (send, receive) = oneshot::channel();
let msg = ClientActorMessage::GetData { uri, respond_to: send };
let _ = self.sender.send(msg).await;
receive.await.expect("Actor cancelled unexpected")
}
}