Browse Source

Add time wait on 503

main
Georg Hopp 11 months ago
parent
commit
3151db24dc
Signed by: ghopp GPG Key ID: 4C5D226768784538
  1. 10
      src/client.rs
  2. 10
      src/client/error/download_error.rs
  3. 8
      src/client/error/request_error.rs
  4. 2
      src/client_actor.rs
  5. 12
      src/client_actor/error.rs
  6. 4
      src/client_actor/message.rs
  7. 82
      src/m3u8_download.rs

10
src/client.rs

@ -53,6 +53,16 @@ pub(super) struct Client {
}
impl DownloadState {
pub(crate) fn content_type(&self) -> Option<&String> {
match self {
DownloadState::Partial{ content_type: Some(ref c), .. } => Some(c),
DownloadState::Done{ content_type: Some(ref c), .. } => Some(c),
_ => None,
}
}
}
impl Client {
pub(super) fn new( buffer: usize
, rate_limit: u64

10
src/client/error/download_error.rs

@ -1,5 +1,6 @@
use std::{error, fmt, io};
use http::StatusCode;
use tokio::time::error::Elapsed;
use crate::client::DownloadState;
@ -21,7 +22,7 @@ pub(crate) enum DownloadErrorSource {
#[derive(Debug)]
pub(crate) struct DownloadError {
state: DownloadState,
source: Option<DownloadErrorSource>,
pub(crate) source: Option<DownloadErrorSource>,
}
@ -37,6 +38,13 @@ impl DownloadError {
self.state = state.to_owned();
self
}
pub(crate) fn status_code(&self) -> Option<StatusCode> {
match self.source.as_ref()? {
DownloadErrorSource::Request(source) => source.status_code(),
_ => None,
}
}
}
impl error::Error for DownloadError {}

8
src/client/error/request_error.rs

@ -1,14 +1,14 @@
use std::{error, fmt};
use bytes::Bytes;
use http::{request, response};
use http::{request, response, StatusCode};
#[derive(Debug)]
pub(crate) struct ClientRequestError {
#[allow(dead_code)]
request: Option<request::Parts>,
response: Option<response::Parts>,
pub(crate) response: Option<response::Parts>,
#[allow(dead_code)]
response_body: Option<Bytes>,
source: Option<anyhow::Error>,
@ -33,6 +33,10 @@ impl ClientRequestError {
self.response.as_ref()
}
pub(crate) fn status_code(&self) -> Option<StatusCode> {
Some(self.response.as_ref()?.status)
}
#[allow(dead_code)]
pub(crate) fn response_body(&self) -> Option<&Bytes> {
self.response_body.as_ref()

2
src/client_actor.rs

@ -1,5 +1,5 @@
mod message;
mod error;
pub(crate) mod error;
mod util;
use std::{collections::HashMap, path::Path};

12
src/client_actor/error.rs

@ -1,5 +1,7 @@
use std::{error, fmt};
use http::StatusCode;
use crate::client::error as client_error;
use super::message::ClientActorMessageHandle;
@ -13,8 +15,8 @@ pub(crate) enum ClientActorErrorSource {
#[derive(Debug)]
pub(crate) struct ClientActorError {
pub(super) action: ClientActorMessageHandle,
pub(super) source: Option<ClientActorErrorSource>,
pub(crate) action: ClientActorMessageHandle,
pub(crate) source: Option<ClientActorErrorSource>,
}
@ -30,6 +32,12 @@ impl ClientActorError {
let action = action.to_owned();
Self { action, source }
}
pub(crate) fn status_code(&self) -> Option<StatusCode> {
match self.source.as_ref()? {
ClientActorErrorSource::Download(source) => source.status_code(),
}
}
}
impl error::Error for ClientActorError {}

4
src/client_actor/message.rs

@ -24,7 +24,7 @@ pub(super) enum ClientActorMessage {
}
#[derive(Clone, Debug)]
pub(super) enum ClientActorMessageHandle {
pub(crate) enum ClientActorMessageHandle {
Download {
filename: PathBuf,
uri: Uri,
@ -40,7 +40,7 @@ pub(super) enum ClientActorMessageHandle {
impl ClientActorMessageHandle {
pub(super) fn state_ref(&self) -> &DownloadState {
pub(crate) fn state_ref(&self) -> &DownloadState {
match self {
Self::Download { ref state, .. } => state,
_ => panic!("Called with invalid variant"),

82
src/m3u8_download.rs

@ -3,18 +3,18 @@ use std::path::{Path, PathBuf};
use anyhow::anyhow;
use bytes::Bytes;
use futures_util::future::join_all;
use http::{uri::{Authority, Scheme}, Uri};
use log::debug;
use http::{uri::{Authority, Scheme}, StatusCode, Uri};
use log::{debug, info};
use m3u8_rs::{MediaPlaylist, MediaSegment, Playlist};
use tokio::{io::AsyncWriteExt as _, fs::File};
use tokio::{fs::File, io::AsyncWriteExt as _, time::{sleep, Duration, Instant}};
use crate::{client_actor::ClientActorHandle, client::DownloadState};
use crate::{client::DownloadState, client_actor::ClientActorHandle};
#[derive(Clone, Debug)]
pub(super) enum TsState {
Created, // Nothing is done.
Failed, // The download has failed.
Failed(Option<Instant>), // The download has failed.
Ready, // All .ts downloads are done.
}
@ -30,6 +30,7 @@ struct TsPart {
pub(super) struct M3u8Download {
index_uri: Uri,
ts_parts: Vec<TsPart>,
time_wait: Duration
}
@ -53,7 +54,18 @@ impl TsPart {
self.content_type = content_type;
debug!("DONE TsPart: {:?}", self);
},
_ => self.state = TsState::Failed,
Err(error) if error.status_code() == Some(StatusCode::SERVICE_UNAVAILABLE) => {
self.state = TsState::Failed(Some(Instant::now()));
self.content_type =
error.action.state_ref().content_type().cloned();
debug!("FAILED WAIT on 503 TsPart: {:?}", self);
},
_ => {
self.state = TsState::Failed(None);
debug!("FAILED no wait TsPart: {:?}", self);
},
};
self
@ -75,6 +87,7 @@ impl M3u8Download {
. to_string();
let mut ts_parts = vec![];
let time_wait = Duration::from_secs(301);
match m3u8_rs::parse_playlist(&m3u8_data) {
Result::Err(e) => Err(anyhow!("m3u8 parse error: {}", e))?,
@ -91,7 +104,7 @@ impl M3u8Download {
},
};
Ok(Self {index_uri, ts_parts})
Ok(Self {index_uri, ts_parts, time_wait})
}
pub(super) fn index_uri(&self) -> &Uri {
@ -100,22 +113,47 @@ impl M3u8Download {
pub(super) async fn download(&mut self, client: &ClientActorHandle) {
loop {
let unfinished: Vec<_> = self.ts_parts.iter_mut()
. filter_map(|ts_part| match ts_part.state {
TsState::Ready => if ts_part.content_type != Some("video/MP2T".to_string()) {
Some(ts_part.download(client))
} else {
None
}
_ => Some(ts_part.download(client))
}).collect();
debug!("UNFINISHED NOW: {}", unfinished.len());
if unfinished.is_empty() { break; }
let mut non_waits = vec![];
let mut waits = vec![];
debug!("SHOW ALL remaining TsParts:");
for ts_part in self.ts_parts.iter_mut() {
debug!("TsParts: {:?}", ts_part);
match ts_part.state {
TsState::Ready if ts_part.content_type != Some("video/MP2T".into()) => {
non_waits.push(ts_part.download(client));
},
TsState::Ready => (),
TsState::Failed(Some(wait_from)) => {
if Instant::now() >= wait_from + self.time_wait {
non_waits.push(ts_part.download(client));
} else {
waits.push((wait_from + self.time_wait) - Instant::now());
}
},
_ => non_waits.push(ts_part.download(client)),
}
}
join_all(unfinished).await;
debug!("UNFINISHED NOW: {}", non_waits.len() + waits.len());
if non_waits.is_empty() {
if waits.is_empty() {
break
} else {
info!("All {} tasks wait for unavailable service", waits.len());
let pause_time = waits
. into_iter()
. fold(self.time_wait, |a, w| w.min(a));
info!("Sleep for {:?}", pause_time);
sleep(pause_time).await;
}
} else {
join_all(non_waits).await;
}
}
}

Loading…
Cancel
Save