feat: send long running task messages on infra operations

This commit is contained in:
Benjamin Chastanier
2021-11-03 18:33:41 +01:00
parent 6441e7633a
commit a023b30239
4 changed files with 1679 additions and 1553 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,10 @@ use std::fs::File;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::str::FromStr;
use std::sync::mpsc;
use std::sync::mpsc::TryRecvError;
use std::thread;
use std::time::Duration;
use retry::delay::Fibonacci;
use retry::Error::Operation;
@@ -25,7 +28,8 @@ use crate::error::SimpleErrorKind::Other;
use crate::error::{
cast_simple_error_to_engine_error, EngineError, EngineErrorCause, EngineErrorScope, SimpleError, SimpleErrorKind,
};
use crate::models::{Context, Listen, ListenersHelper, ProgressInfo, ProgressLevel, ProgressScope, StringPath};
use crate::models::ProgressLevel::Info;
use crate::models::{Action, Context, Listen, ListenersHelper, ProgressInfo, ProgressLevel, ProgressScope, StringPath};
use crate::object_storage::ObjectStorage;
use crate::unit_conversion::{any_to_mi, cpu_string_to_float};
@@ -1437,3 +1441,93 @@ mod tests {
}
}
}
/// TODO(benjaminch): to be refactored with similar function in services.rs
/// This function call (start|pause|delete)_in_progress function every 10 seconds when a
/// long blocking task is running.
pub fn send_progress_on_long_task<K, R, F>(kubernetes: &K, action: Action, long_task: F) -> R
where
K: Kubernetes + Listen,
F: Fn() -> R,
{
let waiting_message = match action {
Action::Create => Some(format!(
"Infrastructure '{}' deployment is in progress...",
kubernetes.name_with_id()
)),
Action::Pause => Some(format!(
"Infrastructure '{}' pause is in progress...",
kubernetes.name_with_id()
)),
Action::Delete => Some(format!(
"Infrastructure '{}' deletion is in progress...",
kubernetes.name_with_id()
)),
Action::Nothing => None,
};
send_progress_on_long_task_with_message(kubernetes, waiting_message, action, long_task)
}
/// TODO(benjaminch): to be refactored with similar function in services.rs
/// This function call (start|pause|delete)_in_progress function every 10 seconds when a
/// long blocking task is running.
pub fn send_progress_on_long_task_with_message<K, M, R, F>(
kubernetes: &K,
waiting_message: Option<M>,
action: Action,
long_task: F,
) -> R
where
K: Kubernetes + Listen,
M: Into<String>,
F: Fn() -> R,
{
let listeners = std::clone::Clone::clone(kubernetes.listeners());
let progress_info = ProgressInfo::new(
ProgressScope::Environment {
id: kubernetes.context().execution_id().to_string(),
},
Info,
waiting_message.map(|message| message.into()),
kubernetes.context().execution_id(),
);
let (tx, rx) = mpsc::channel();
// monitor thread to notify user while the blocking task is executed
let _ = std::thread::Builder::new()
.name("task-monitor".to_string())
.spawn(move || {
// stop the thread when the blocking task is done
let listeners_helper = ListenersHelper::new(&listeners);
let action = action;
let progress_info = progress_info;
loop {
// do notify users here
let progress_info = std::clone::Clone::clone(&progress_info);
match action {
Action::Create => listeners_helper.deployment_in_progress(progress_info),
Action::Pause => listeners_helper.pause_in_progress(progress_info),
Action::Delete => listeners_helper.delete_in_progress(progress_info),
Action::Nothing => {} // should not happens
};
thread::sleep(Duration::from_secs(10));
// watch for thread termination
match rx.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => {}
}
}
});
let blocking_task_result = long_task();
let _ = tx.send(());
blocking_task_result
}

File diff suppressed because it is too large Load Diff