From 3151db24dc4fcd74fb072177890039ee80e260b7 Mon Sep 17 00:00:00 2001 From: Georg Hopp Date: Tue, 14 Jan 2025 21:58:34 +0100 Subject: [PATCH] Add time wait on 503 --- src/client.rs | 10 ++++ src/client/error/download_error.rs | 10 +++- src/client/error/request_error.rs | 8 ++- src/client_actor.rs | 2 +- src/client_actor/error.rs | 12 ++++- src/client_actor/message.rs | 4 +- src/m3u8_download.rs | 82 ++++++++++++++++++++++-------- 7 files changed, 98 insertions(+), 30 deletions(-) diff --git a/src/client.rs b/src/client.rs index 2a0e06c..1c7720b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -53,6 +53,16 @@ pub(super) struct Client { } +impl DownloadState { + pub(crate) fn content_type(&self) -> Option<&String> { + match self { + DownloadState::Partial{ content_type: Some(ref c), .. } => Some(c), + DownloadState::Done{ content_type: Some(ref c), .. } => Some(c), + _ => None, + } + } +} + impl Client { pub(super) fn new( buffer: usize , rate_limit: u64 diff --git a/src/client/error/download_error.rs b/src/client/error/download_error.rs index a798dbe..6f79833 100644 --- a/src/client/error/download_error.rs +++ b/src/client/error/download_error.rs @@ -1,5 +1,6 @@ use std::{error, fmt, io}; +use http::StatusCode; use tokio::time::error::Elapsed; use crate::client::DownloadState; @@ -21,7 +22,7 @@ pub(crate) enum DownloadErrorSource { #[derive(Debug)] pub(crate) struct DownloadError { state: DownloadState, - source: Option, + pub(crate) source: Option, } @@ -37,6 +38,13 @@ impl DownloadError { self.state = state.to_owned(); self } + + pub(crate) fn status_code(&self) -> Option { + match self.source.as_ref()? { + DownloadErrorSource::Request(source) => source.status_code(), + _ => None, + } + } } impl error::Error for DownloadError {} diff --git a/src/client/error/request_error.rs b/src/client/error/request_error.rs index ae7e89c..0dc6885 100644 --- a/src/client/error/request_error.rs +++ b/src/client/error/request_error.rs @@ -1,14 +1,14 @@ use std::{error, fmt}; use bytes::Bytes; -use http::{request, response}; +use http::{request, response, StatusCode}; #[derive(Debug)] pub(crate) struct ClientRequestError { #[allow(dead_code)] request: Option, - response: Option, + pub(crate) response: Option, #[allow(dead_code)] response_body: Option, source: Option, @@ -33,6 +33,10 @@ impl ClientRequestError { self.response.as_ref() } + pub(crate) fn status_code(&self) -> Option { + Some(self.response.as_ref()?.status) + } + #[allow(dead_code)] pub(crate) fn response_body(&self) -> Option<&Bytes> { self.response_body.as_ref() diff --git a/src/client_actor.rs b/src/client_actor.rs index 5af2e04..a12bfdf 100644 --- a/src/client_actor.rs +++ b/src/client_actor.rs @@ -1,5 +1,5 @@ mod message; -mod error; +pub(crate) mod error; mod util; use std::{collections::HashMap, path::Path}; diff --git a/src/client_actor/error.rs b/src/client_actor/error.rs index e4b9e84..f9d67bf 100644 --- a/src/client_actor/error.rs +++ b/src/client_actor/error.rs @@ -1,5 +1,7 @@ use std::{error, fmt}; +use http::StatusCode; + use crate::client::error as client_error; use super::message::ClientActorMessageHandle; @@ -13,8 +15,8 @@ pub(crate) enum ClientActorErrorSource { #[derive(Debug)] pub(crate) struct ClientActorError { - pub(super) action: ClientActorMessageHandle, - pub(super) source: Option, + pub(crate) action: ClientActorMessageHandle, + pub(crate) source: Option, } @@ -30,6 +32,12 @@ impl ClientActorError { let action = action.to_owned(); Self { action, source } } + + pub(crate) fn status_code(&self) -> Option { + match self.source.as_ref()? { + ClientActorErrorSource::Download(source) => source.status_code(), + } + } } impl error::Error for ClientActorError {} diff --git a/src/client_actor/message.rs b/src/client_actor/message.rs index d067aed..4f938bc 100644 --- a/src/client_actor/message.rs +++ b/src/client_actor/message.rs @@ -24,7 +24,7 @@ pub(super) enum ClientActorMessage { } #[derive(Clone, Debug)] -pub(super) enum ClientActorMessageHandle { +pub(crate) enum ClientActorMessageHandle { Download { filename: PathBuf, uri: Uri, @@ -40,7 +40,7 @@ pub(super) enum ClientActorMessageHandle { impl ClientActorMessageHandle { - pub(super) fn state_ref(&self) -> &DownloadState { + pub(crate) fn state_ref(&self) -> &DownloadState { match self { Self::Download { ref state, .. } => state, _ => panic!("Called with invalid variant"), diff --git a/src/m3u8_download.rs b/src/m3u8_download.rs index 3fca917..dbd832b 100644 --- a/src/m3u8_download.rs +++ b/src/m3u8_download.rs @@ -3,18 +3,18 @@ use std::path::{Path, PathBuf}; use anyhow::anyhow; use bytes::Bytes; use futures_util::future::join_all; -use http::{uri::{Authority, Scheme}, Uri}; -use log::debug; +use http::{uri::{Authority, Scheme}, StatusCode, Uri}; +use log::{debug, info}; use m3u8_rs::{MediaPlaylist, MediaSegment, Playlist}; -use tokio::{io::AsyncWriteExt as _, fs::File}; +use tokio::{fs::File, io::AsyncWriteExt as _, time::{sleep, Duration, Instant}}; -use crate::{client_actor::ClientActorHandle, client::DownloadState}; +use crate::{client::DownloadState, client_actor::ClientActorHandle}; #[derive(Clone, Debug)] pub(super) enum TsState { Created, // Nothing is done. - Failed, // The download has failed. + Failed(Option), // The download has failed. Ready, // All .ts downloads are done. } @@ -30,6 +30,7 @@ struct TsPart { pub(super) struct M3u8Download { index_uri: Uri, ts_parts: Vec, + time_wait: Duration } @@ -53,7 +54,18 @@ impl TsPart { self.content_type = content_type; debug!("DONE TsPart: {:?}", self); }, - _ => self.state = TsState::Failed, + + Err(error) if error.status_code() == Some(StatusCode::SERVICE_UNAVAILABLE) => { + self.state = TsState::Failed(Some(Instant::now())); + self.content_type = + error.action.state_ref().content_type().cloned(); + debug!("FAILED WAIT on 503 TsPart: {:?}", self); + }, + + _ => { + self.state = TsState::Failed(None); + debug!("FAILED no wait TsPart: {:?}", self); + }, }; self @@ -75,6 +87,7 @@ impl M3u8Download { . to_string(); let mut ts_parts = vec![]; + let time_wait = Duration::from_secs(301); match m3u8_rs::parse_playlist(&m3u8_data) { Result::Err(e) => Err(anyhow!("m3u8 parse error: {}", e))?, @@ -91,7 +104,7 @@ impl M3u8Download { }, }; - Ok(Self {index_uri, ts_parts}) + Ok(Self {index_uri, ts_parts, time_wait}) } pub(super) fn index_uri(&self) -> &Uri { @@ -100,22 +113,47 @@ impl M3u8Download { pub(super) async fn download(&mut self, client: &ClientActorHandle) { loop { - let unfinished: Vec<_> = self.ts_parts.iter_mut() - . filter_map(|ts_part| match ts_part.state { - TsState::Ready => if ts_part.content_type != Some("video/MP2T".to_string()) { - Some(ts_part.download(client)) - } else { - None - } - - _ => Some(ts_part.download(client)) - }).collect(); - - debug!("UNFINISHED NOW: {}", unfinished.len()); - - if unfinished.is_empty() { break; } + let mut non_waits = vec![]; + let mut waits = vec![]; + + debug!("SHOW ALL remaining TsParts:"); + for ts_part in self.ts_parts.iter_mut() { + debug!("TsParts: {:?}", ts_part); + match ts_part.state { + TsState::Ready if ts_part.content_type != Some("video/MP2T".into()) => { + non_waits.push(ts_part.download(client)); + }, + + TsState::Ready => (), + + TsState::Failed(Some(wait_from)) => { + if Instant::now() >= wait_from + self.time_wait { + non_waits.push(ts_part.download(client)); + } else { + waits.push((wait_from + self.time_wait) - Instant::now()); + } + }, + + _ => non_waits.push(ts_part.download(client)), + } + } - join_all(unfinished).await; + debug!("UNFINISHED NOW: {}", non_waits.len() + waits.len()); + + if non_waits.is_empty() { + if waits.is_empty() { + break + } else { + info!("All {} tasks wait for unavailable service", waits.len()); + let pause_time = waits + . into_iter() + . fold(self.time_wait, |a, w| w.min(a)); + info!("Sleep for {:?}", pause_time); + sleep(pause_time).await; + } + } else { + join_all(non_waits).await; + } } }