From 32c998b1b9a716afa8c87294fed58f2f0a1d2b45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Er=C3=A8be=20-=20Romain=20Gerard?= Date: Fri, 18 Mar 2022 09:18:42 +0100 Subject: [PATCH] Allow to customize how to abort a command (#650) --- src/build_platform/local_docker.rs | 16 +-- src/cmd/command.rs | 156 +++++++++++++++++++---------- src/cmd/docker.rs | 99 ++++++------------ src/cmd/helm.rs | 3 +- src/cmd/kubectl.rs | 3 +- 5 files changed, 144 insertions(+), 133 deletions(-) diff --git a/src/build_platform/local_docker.rs b/src/build_platform/local_docker.rs index 47abd851..4277ea84 100644 --- a/src/build_platform/local_docker.rs +++ b/src/build_platform/local_docker.rs @@ -1,15 +1,15 @@ use std::io::{Error, ErrorKind}; use std::path::Path; +use std::time::Duration; use std::{env, fs}; -use chrono::Duration; use git2::{Cred, CredentialType}; use sysinfo::{Disk, DiskExt, SystemExt}; use crate::build_platform::{docker, Build, BuildPlatform, BuildResult, Credentials, Kind}; use crate::cmd::command; use crate::cmd::command::CommandError::Killed; -use crate::cmd::command::QoveryCommand; +use crate::cmd::command::{CommandKiller, QoveryCommand}; use crate::cmd::docker::{ContainerImage, Docker, DockerError}; use crate::errors::{CommandError, EngineError, Tag}; use crate::events::{EngineEvent, EventDetails, EventMessage, ToTransmitter, Transmitter}; @@ -20,7 +20,7 @@ use crate::models::{ Context, Listen, Listener, Listeners, ListenersHelper, ProgressInfo, ProgressLevel, ProgressScope, }; -const BUILD_DURATION_TIMEOUT_MIN: i64 = 30; +const BUILD_DURATION_TIMEOUT_SEC: u64 = 30 * 60; /// https://buildpacks.io/ const BUILDPACKS_BUILDERS: [&str; 1] = [ @@ -164,8 +164,7 @@ impl LocalDocker { self.context.execution_id(), )); }, - Duration::minutes(BUILD_DURATION_TIMEOUT_MIN), - is_task_canceled, + &CommandKiller::from(Duration::from_secs(BUILD_DURATION_TIMEOUT_SEC), is_task_canceled), ); match exit_status { @@ -276,8 +275,8 @@ impl LocalDocker { // buildpacks build let mut cmd = QoveryCommand::new("pack", &buildpacks_args, &self.get_docker_host_envs()); + let cmd_killer = CommandKiller::from(Duration::from_secs(BUILD_DURATION_TIMEOUT_SEC), is_task_canceled); exit_status = cmd.exec_with_abort( - Duration::minutes(BUILD_DURATION_TIMEOUT_MIN), &mut |line| { self.logger.log( LogLevel::Info, @@ -308,7 +307,7 @@ impl LocalDocker { self.context.execution_id(), )); }, - is_task_canceled, + &cmd_killer, ); if exit_status.is_ok() { @@ -682,7 +681,8 @@ fn docker_prune_images(envs: Vec<(&str, &str)>) -> Result<(), CommandError> { let mut errored_commands = vec![]; for prune in all_prunes_commands { let mut cmd = QoveryCommand::new("docker", &prune, &envs); - if let Err(e) = cmd.exec_with_timeout(Duration::minutes(BUILD_DURATION_TIMEOUT_MIN), &mut |_| {}, &mut |_| {}) { + let cmd_killer = CommandKiller::from_timeout(Duration::from_secs(BUILD_DURATION_TIMEOUT_SEC)); + if let Err(e) = cmd.exec_with_abort(&mut |_| {}, &mut |_| {}, &cmd_killer) { errored_commands.push(format!("{} {:?}", prune[0], e)); } } diff --git a/src/cmd/command.rs b/src/cmd/command.rs index 041426a9..1366eb65 100644 --- a/src/cmd/command.rs +++ b/src/cmd/command.rs @@ -7,9 +7,8 @@ use std::process::{Child, Command, ExitStatus, Stdio}; use crate::cmd::command::CommandError::{ExecutionError, ExitStatusError, Killed, TimeoutError}; use crate::cmd::command::CommandOutputType::{STDERR, STDOUT}; -use chrono::Duration; use itertools::Itertools; -use std::time::Instant; +use std::time::{Duration, Instant}; use timeout_readwrite::TimeoutReader; enum CommandOutputType { @@ -45,6 +44,63 @@ impl CommandError { } } +#[derive(Debug, Clone)] +pub enum AbortReason { + Timeout(Duration), + Canceled(String), +} +pub struct CommandKiller<'a> { + should_abort: Box Option + 'a>, +} + +impl<'a> CommandKiller<'a> { + pub fn never() -> CommandKiller<'a> { + CommandKiller { + should_abort: Box::new(|| None), + } + } + + pub fn from_timeout(timeout: Duration) -> CommandKiller<'a> { + let now = Instant::now(); + CommandKiller { + should_abort: Box::new(move || { + if now.elapsed() >= timeout { + return Some(AbortReason::Timeout(timeout)); + } + + None + }), + } + } + + pub fn from_cancelable(is_canceled: &'a dyn Fn() -> bool) -> CommandKiller<'a> { + CommandKiller { + should_abort: Box::new(move || { + if is_canceled() { + return Some(AbortReason::Canceled("Task canceled".to_string())); + } + None + }), + } + } + + pub fn from(timeout: Duration, is_canceled: &'a dyn Fn() -> bool) -> CommandKiller<'a> { + let has_timeout = Self::from_timeout(timeout); + let is_canceled = Self::from_cancelable(is_canceled); + CommandKiller { + should_abort: Box::new(move || { + (is_canceled.should_abort)()?; + (has_timeout.should_abort)()?; + None + }), + } + } + + pub fn should_abort(&self) -> Option { + (self.should_abort)() + } +} + pub struct QoveryCommand { command: Command, } @@ -74,10 +130,9 @@ impl QoveryCommand { pub fn exec(&mut self) -> Result<(), CommandError> { self.exec_with_abort( - Duration::max_value(), &mut |line| info!("{}", line), &mut |line| warn!("{}", line), - || false, + &CommandKiller::never(), ) } @@ -90,36 +145,19 @@ impl QoveryCommand { STDOUT: FnMut(String), STDERR: FnMut(String), { - self.exec_with_abort(Duration::max_value(), stdout_output, stderr_output, || false) + self.exec_with_abort(stdout_output, stderr_output, &CommandKiller::never()) } - pub fn exec_with_timeout( + pub fn exec_with_abort( &mut self, - timeout: Duration, stdout_output: &mut STDOUT, stderr_output: &mut STDERR, + abort_notifier: &CommandKiller, ) -> Result<(), CommandError> where STDOUT: FnMut(String), STDERR: FnMut(String), { - self.exec_with_abort(timeout, stdout_output, stderr_output, || false) - } - - pub fn exec_with_abort( - &mut self, - timeout: Duration, - stdout_output: &mut STDOUT, - stderr_output: &mut STDERR, - should_be_killed: F, - ) -> Result<(), CommandError> - where - STDOUT: FnMut(String), - STDERR: FnMut(String), - F: Fn() -> bool, - { - assert!(timeout.num_seconds() > 0, "Timeout cannot be a 0 or negative duration"); - info!("command: {:?}", self.command); let mut cmd_handle = self .command @@ -128,10 +166,8 @@ impl QoveryCommand { .spawn() .map_err(ExecutionError)?; - let process_start_time = Instant::now(); - // Read stdout/stderr until timeout is reached - let reader_timeout = std::time::Duration::from_secs(10.min(timeout.num_seconds() as u64)); + let reader_timeout = std::time::Duration::from_secs(5); let stdout = cmd_handle.stdout.take().ok_or(ExecutionError(Error::new( ErrorKind::BrokenPipe, "Cannot get stdout for command", @@ -160,11 +196,7 @@ impl QoveryCommand { STDERR(Err(err)) => error!("Error on stderr of cmd {:?}: {:?}", self.command, err), } - if should_be_killed() { - break; - } - - if (process_start_time.elapsed().as_secs() as i64) >= timeout.num_seconds() { + if abort_notifier.should_abort().is_some() { break; } } @@ -180,23 +212,24 @@ impl QoveryCommand { } Ok(None) => { // Does the process should be killed ? - if should_be_killed() { - let msg = format!("Killing process {:?}", self.command); - warn!("{}", msg); - Self::kill(&mut cmd_handle); - return Err(Killed(msg)); - } - - // Does the timeout has been reached ? - if (process_start_time.elapsed().as_secs() as i64) >= timeout.num_seconds() { - let msg = format!( - "Killing process {:?} due to timeout {}m reached", - self.command, - timeout.num_minutes() - ); - warn!("{}", msg); - Self::kill(&mut cmd_handle); - return Err(TimeoutError(msg)); + match abort_notifier.should_abort() { + None => {} + Some(AbortReason::Timeout(timeout)) => { + let msg = format!( + "Killing process {:?} due to timeout {}s reached", + self.command, + timeout.as_secs() + ); + warn!("{}", msg); + Self::kill(&mut cmd_handle); + return Err(TimeoutError(msg)); + } + Some(AbortReason::Canceled(_)) => { + let msg = format!("Killing process {:?}", self.command); + warn!("{}", msg); + Self::kill(&mut cmd_handle); + return Err(Killed(msg)); + } } } Err(err) => return Err(ExecutionError(err)), @@ -252,10 +285,10 @@ where #[cfg(test)] mod tests { - use crate::cmd::command::{does_binary_exist, run_version_command_for, CommandError, QoveryCommand}; - use chrono::Duration; + use crate::cmd::command::{does_binary_exist, run_version_command_for, CommandError, CommandKiller, QoveryCommand}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Barrier}; + use std::time::Duration; use std::{thread, time}; #[test] @@ -282,17 +315,29 @@ mod tests { #[test] fn test_command_with_timeout() { let mut cmd = QoveryCommand::new("sleep", &vec!["120"], &vec![]); - let ret = cmd.exec_with_timeout(Duration::seconds(2), &mut |_| {}, &mut |_| {}); + let ret = cmd.exec_with_abort( + &mut |_| {}, + &mut |_| {}, + &CommandKiller::from_timeout(Duration::from_secs(2)), + ); assert!(matches!(ret, Err(CommandError::TimeoutError(_)))); let mut cmd = QoveryCommand::new("sh", &vec!["-c", "cat /dev/urandom | grep -a --null-data ."], &vec![]); - let ret = cmd.exec_with_timeout(Duration::seconds(2), &mut |_| {}, &mut |_| {}); + let ret = cmd.exec_with_abort( + &mut |_| {}, + &mut |_| {}, + &CommandKiller::from_timeout(Duration::from_secs(2)), + ); assert!(matches!(ret, Err(CommandError::TimeoutError(_)))); let mut cmd = QoveryCommand::new("sleep", &vec!["1"], &vec![]); - let ret = cmd.exec_with_timeout(Duration::seconds(2), &mut |_| {}, &mut |_| {}); + let ret = cmd.exec_with_abort( + &mut |_| {}, + &mut |_| {}, + &CommandKiller::from_timeout(Duration::from_secs(2)), + ); assert_eq!(ret.is_ok(), true); } @@ -313,8 +358,9 @@ mod tests { }); let cmd_killer = move || should_kill2.load(Ordering::Acquire); + let cmd_killer = CommandKiller::from_cancelable(&cmd_killer); barrier.wait(); - let ret = cmd.exec_with_abort(Duration::max_value(), &mut |_| {}, &mut |_| {}, cmd_killer); + let ret = cmd.exec_with_abort(&mut |_| {}, &mut |_| {}, &cmd_killer); assert!(matches!(ret, Err(CommandError::Killed(_)))); } diff --git a/src/cmd/docker.rs b/src/cmd/docker.rs index c74fb991..d5dd98ef 100644 --- a/src/cmd/docker.rs +++ b/src/cmd/docker.rs @@ -1,7 +1,6 @@ -use crate::cmd::command::{CommandError, QoveryCommand}; +use crate::cmd::command::{CommandError, CommandKiller, QoveryCommand}; use crate::errors::EngineError; use crate::events::EventDetails; -use chrono::Duration; use std::path::Path; use std::process::ExitStatus; use url::Url; @@ -86,10 +85,9 @@ impl Docker { let buildx_cmd_exist = docker_exec( &args, &docker.get_all_envs(&vec![]), - Some(Duration::max_value()), - &|| false, &mut |_| {}, &mut |_| {}, + &CommandKiller::never(), ); if let Err(_) = buildx_cmd_exist { return Err(DockerError::InvalidConfig(format!( @@ -112,10 +110,9 @@ impl Docker { let _ = docker_exec( &args, &docker.get_all_envs(&vec![]), - Some(Duration::max_value()), - &|| false, &mut |_| {}, &mut |_| {}, + &CommandKiller::never(), ); Ok(docker) @@ -149,10 +146,9 @@ impl Docker { docker_exec( &args, &self.get_all_envs(&vec![]), - None, - &|| false, &mut |line| info!("{}", line), &mut |line| warn!("{}", line), + &CommandKiller::never(), )?; Ok(()) @@ -164,10 +160,9 @@ impl Docker { let ret = docker_exec( &vec!["image", "inspect", &image.image_name()], &self.get_all_envs(&vec![]), - None, - &|| false, &mut |line| info!("{}", line), &mut |line| warn!("{}", line), + &CommandKiller::never(), ); Ok(matches!(ret, Ok(_))) @@ -180,10 +175,9 @@ impl Docker { let ret = docker_exec( &vec!["manifest", "inspect", &image.image_name()], &self.get_all_envs(&vec![]), - None, - &|| false, &mut |line| info!("{}", line), &mut |line| warn!("{}", line), + &CommandKiller::never(), ); match ret { @@ -198,22 +192,20 @@ impl Docker { image: &ContainerImage, stdout_output: &mut Stdout, stderr_output: &mut Stderr, - timeout: Duration, - should_abort: &dyn Fn() -> bool, + should_abort: &CommandKiller, ) -> Result<(), DockerError> where Stdout: FnMut(String), Stderr: FnMut(String), { - info!("Docker pull {:?}, timeout: {:?}", image, timeout); + info!("Docker pull {:?}", image); docker_exec( &vec!["pull", &image.image_name()], &self.get_all_envs(&vec![]), - Some(timeout), - should_abort, stdout_output, stderr_output, + should_abort, ) } @@ -227,8 +219,7 @@ impl Docker { push_after_build: bool, stdout_output: &mut Stdout, stderr_output: &mut Stderr, - timeout: Duration, - should_abort: &dyn Fn() -> bool, + should_abort: &CommandKiller, ) -> Result<(), DockerError> where Stdout: FnMut(String), @@ -239,11 +230,6 @@ impl Docker { return Ok(()); } - // if it is already aborted, nothing to do - if (should_abort)() { - return Err(DockerError::Aborted("build".to_string())); - } - // Do some checks if !dockerfile.is_file() { return Err(DockerError::InvalidConfig(format!( @@ -269,7 +255,6 @@ impl Docker { push_after_build, stdout_output, stderr_output, - timeout, should_abort, ) } else { @@ -282,7 +267,6 @@ impl Docker { push_after_build, stdout_output, stderr_output, - timeout, should_abort, ) } @@ -298,8 +282,7 @@ impl Docker { push_after_build: bool, stdout_output: &mut Stdout, stderr_output: &mut Stderr, - timeout: Duration, - should_abort: &dyn Fn() -> bool, + should_abort: &CommandKiller, ) -> Result<(), DockerError> where Stdout: FnMut(String), @@ -308,7 +291,7 @@ impl Docker { info!("Docker build {:?}", image_to_build.image_name()); // Best effort to pull the cache, if it does not exist that's ok too - let _ = self.pull(cache, stdout_output, stderr_output, timeout, should_abort); + let _ = self.pull(cache, stdout_output, stderr_output, should_abort); let mut args_string: Vec = vec![ "build".to_string(), @@ -338,14 +321,13 @@ impl Docker { let _ = docker_exec( &args_string.iter().map(|x| x.as_str()).collect::>(), &self.get_all_envs(&vec![]), - Some(timeout), - should_abort, stdout_output, stderr_output, + should_abort, )?; if push_after_build { - let _ = self.push(image_to_build, stdout_output, stderr_output, timeout, should_abort)?; + let _ = self.push(image_to_build, stdout_output, stderr_output, should_abort)?; } Ok(()) @@ -361,8 +343,7 @@ impl Docker { push_after_build: bool, stdout_output: &mut Stdout, stderr_output: &mut Stderr, - timeout: Duration, - should_abort: &dyn Fn() -> bool, + should_abort: &CommandKiller, ) -> Result<(), DockerError> where Stdout: FnMut(String), @@ -405,10 +386,9 @@ impl Docker { docker_exec( &args_string.iter().map(|x| x.as_str()).collect::>(), &self.get_all_envs(&vec![]), - Some(timeout), - should_abort, stdout_output, stderr_output, + should_abort, ) } @@ -417,14 +397,13 @@ impl Docker { image: &ContainerImage, stdout_output: &mut Stdout, stderr_output: &mut Stderr, - timeout: Duration, - should_abort: &dyn Fn() -> bool, + should_abort: &CommandKiller, ) -> Result<(), DockerError> where Stdout: FnMut(String), Stderr: FnMut(String), { - info!("Docker push {:?}, timeout: {:?}", image, timeout); + info!("Docker push {:?}", image); let image_names = image.image_names(); let mut args = vec!["push"]; args.extend(image_names.iter().map(|x| x.as_str())); @@ -432,10 +411,9 @@ impl Docker { docker_exec( &args, &self.get_all_envs(&vec![]), - Some(timeout), - should_abort, stdout_output, stderr_output, + should_abort, ) } } @@ -443,18 +421,16 @@ impl Docker { fn docker_exec( args: &[&str], envs: &[(&str, &str)], - timeout: Option, - should_abort: &dyn Fn() -> bool, stdout_output: &mut F, stderr_output: &mut X, + cmd_killer: &CommandKiller, ) -> Result<(), DockerError> where F: FnMut(String), X: FnMut(String), { - let timeout = timeout.unwrap_or_else(|| Duration::max_value()); let mut cmd = QoveryCommand::new("docker", args, envs); - let ret = cmd.exec_with_abort(timeout, stdout_output, stderr_output, should_abort); + let ret = cmd.exec_with_abort(stdout_output, stderr_output, &cmd_killer); match ret { Ok(_) => Ok(()), @@ -474,9 +450,10 @@ pub fn to_engine_error(event_details: &EventDetails, error: DockerError) -> Engi #[cfg(feature = "test-with-docker")] #[cfg(test)] mod tests { + use crate::cmd::command::CommandKiller; use crate::cmd::docker::{ContainerImage, Docker, DockerError}; - use chrono::Duration; use std::path::Path; + use std::time::Duration; use url::Url; fn private_registry_url() -> Url { @@ -497,8 +474,7 @@ mod tests { &image, &mut |msg| println!("{}", msg), &mut |msg| eprintln!("{}", msg), - Duration::max_value(), - &|| false, + &CommandKiller::never(), ); assert!(matches!(ret, Err(_))); @@ -513,8 +489,7 @@ mod tests { &image, &mut |msg| println!("{}", msg), &mut |msg| eprintln!("{}", msg), - Duration::max_value(), - &|| false, + &CommandKiller::never(), ); assert!(matches!(ret, Ok(_))); @@ -523,8 +498,7 @@ mod tests { &image, &mut |msg| println!("{}", msg), &mut |msg| eprintln!("{}", msg), - Duration::seconds(1), - &|| false, + &CommandKiller::from_timeout(Duration::from_secs(1)), ); assert!(matches!(ret, Err(DockerError::Timeout(_)))); } @@ -554,8 +528,7 @@ mod tests { false, &mut |msg| println!("{}", msg), &mut |msg| eprintln!("{}", msg), - Duration::max_value(), - &|| false, + &CommandKiller::never(), ); assert!(matches!(ret, Ok(_))); @@ -570,8 +543,7 @@ mod tests { false, &mut |msg| println!("{}", msg), &mut |msg| eprintln!("{}", msg), - Duration::max_value(), - &|| false, + &CommandKiller::never(), ); assert!(matches!(ret, Err(_))); @@ -603,8 +575,7 @@ mod tests { false, &mut |msg| println!("{}", msg), &mut |msg| eprintln!("{}", msg), - Duration::max_value(), - &|| false, + &CommandKiller::never(), ); assert!(matches!(ret, Ok(_))); @@ -618,8 +589,7 @@ mod tests { false, &mut |msg| println!("{}", msg), &mut |msg| eprintln!("{}", msg), - Duration::max_value(), - &|| false, + &CommandKiller::never(), ); assert!(matches!(ret, Ok(_))); @@ -651,8 +621,7 @@ mod tests { false, &mut |msg| println!("{}", msg), &mut |msg| eprintln!("{}", msg), - Duration::max_value(), - &|| false, + &CommandKiller::never(), ); assert!(matches!(ret, Ok(_))); @@ -666,8 +635,7 @@ mod tests { &image_to_build, &mut |msg| println!("{}", msg), &mut |msg| eprintln!("{}", msg), - Duration::max_value(), - &|| false, + &CommandKiller::never(), ); assert!(matches!(ret, Ok(_))); @@ -675,8 +643,7 @@ mod tests { &image_to_build, &mut |msg| println!("{}", msg), &mut |msg| eprintln!("{}", msg), - Duration::max_value(), - &|| false, + &CommandKiller::never(), ); assert!(matches!(ret, Ok(_))); } diff --git a/src/cmd/helm.rs b/src/cmd/helm.rs index b79b8c8b..5e056975 100644 --- a/src/cmd/helm.rs +++ b/src/cmd/helm.rs @@ -10,7 +10,6 @@ use crate::cmd::helm::HelmError::{CannotRollback, CmdError, InvalidKubeConfig, R use crate::cmd::structs::{HelmChart, HelmListItem}; use crate::errors::{CommandError, EngineError}; use crate::events::EventDetails; -use chrono::Duration; use semver::Version; use serde_derive::Deserialize; use std::fs::File; @@ -548,7 +547,7 @@ where // Helm returns an error each time a command does not succeed as they want. Which leads to handling error with status code 1 // It means that the command successfully ran, but it didn't terminate as expected let mut cmd = QoveryCommand::new("helm", args, envs); - match cmd.exec_with_timeout(Duration::max_value(), stdout_output, stderr_output) { + match cmd.exec_with_output(stdout_output, stderr_output) { Err(err) => Err(CommandError::new(format!("{:?}", err), None)), _ => Ok(()), } diff --git a/src/cmd/kubectl.rs b/src/cmd/kubectl.rs index 08015214..ae9f0303 100644 --- a/src/cmd/kubectl.rs +++ b/src/cmd/kubectl.rs @@ -1,6 +1,5 @@ use std::path::Path; -use chrono::Duration; use retry::delay::Fibonacci; use retry::OperationResult; use serde::de::DeserializeOwned; @@ -41,7 +40,7 @@ where { let mut cmd = QoveryCommand::new("kubectl", &args, &envs); - if let Err(err) = cmd.exec_with_timeout(Duration::max_value(), stdout_output, stderr_output) { + if let Err(err) = cmd.exec_with_output(stdout_output, stderr_output) { let args_string = args.join(" "); let msg = format!("Error on command: kubectl {}. {:?}", args_string, &err); error!("{}", &msg);