Allow to customize how to abort a command (#650)

This commit is contained in:
Erèbe - Romain Gerard
2022-03-18 09:18:42 +01:00
committed by GitHub
parent f7c5ff09e8
commit 32c998b1b9
5 changed files with 144 additions and 133 deletions

View File

@@ -1,15 +1,15 @@
use std::io::{Error, ErrorKind};
use std::path::Path;
use std::time::Duration;
use std::{env, fs};
use chrono::Duration;
use git2::{Cred, CredentialType};
use sysinfo::{Disk, DiskExt, SystemExt};
use crate::build_platform::{docker, Build, BuildPlatform, BuildResult, Credentials, Kind};
use crate::cmd::command;
use crate::cmd::command::CommandError::Killed;
use crate::cmd::command::QoveryCommand;
use crate::cmd::command::{CommandKiller, QoveryCommand};
use crate::cmd::docker::{ContainerImage, Docker, DockerError};
use crate::errors::{CommandError, EngineError, Tag};
use crate::events::{EngineEvent, EventDetails, EventMessage, ToTransmitter, Transmitter};
@@ -20,7 +20,7 @@ use crate::models::{
Context, Listen, Listener, Listeners, ListenersHelper, ProgressInfo, ProgressLevel, ProgressScope,
};
const BUILD_DURATION_TIMEOUT_MIN: i64 = 30;
const BUILD_DURATION_TIMEOUT_SEC: u64 = 30 * 60;
/// https://buildpacks.io/
const BUILDPACKS_BUILDERS: [&str; 1] = [
@@ -164,8 +164,7 @@ impl LocalDocker {
self.context.execution_id(),
));
},
Duration::minutes(BUILD_DURATION_TIMEOUT_MIN),
is_task_canceled,
&CommandKiller::from(Duration::from_secs(BUILD_DURATION_TIMEOUT_SEC), is_task_canceled),
);
match exit_status {
@@ -276,8 +275,8 @@ impl LocalDocker {
// buildpacks build
let mut cmd = QoveryCommand::new("pack", &buildpacks_args, &self.get_docker_host_envs());
let cmd_killer = CommandKiller::from(Duration::from_secs(BUILD_DURATION_TIMEOUT_SEC), is_task_canceled);
exit_status = cmd.exec_with_abort(
Duration::minutes(BUILD_DURATION_TIMEOUT_MIN),
&mut |line| {
self.logger.log(
LogLevel::Info,
@@ -308,7 +307,7 @@ impl LocalDocker {
self.context.execution_id(),
));
},
is_task_canceled,
&cmd_killer,
);
if exit_status.is_ok() {
@@ -682,7 +681,8 @@ fn docker_prune_images(envs: Vec<(&str, &str)>) -> Result<(), CommandError> {
let mut errored_commands = vec![];
for prune in all_prunes_commands {
let mut cmd = QoveryCommand::new("docker", &prune, &envs);
if let Err(e) = cmd.exec_with_timeout(Duration::minutes(BUILD_DURATION_TIMEOUT_MIN), &mut |_| {}, &mut |_| {}) {
let cmd_killer = CommandKiller::from_timeout(Duration::from_secs(BUILD_DURATION_TIMEOUT_SEC));
if let Err(e) = cmd.exec_with_abort(&mut |_| {}, &mut |_| {}, &cmd_killer) {
errored_commands.push(format!("{} {:?}", prune[0], e));
}
}

View File

@@ -7,9 +7,8 @@ use std::process::{Child, Command, ExitStatus, Stdio};
use crate::cmd::command::CommandError::{ExecutionError, ExitStatusError, Killed, TimeoutError};
use crate::cmd::command::CommandOutputType::{STDERR, STDOUT};
use chrono::Duration;
use itertools::Itertools;
use std::time::Instant;
use std::time::{Duration, Instant};
use timeout_readwrite::TimeoutReader;
enum CommandOutputType {
@@ -45,6 +44,63 @@ impl CommandError {
}
}
#[derive(Debug, Clone)]
pub enum AbortReason {
Timeout(Duration),
Canceled(String),
}
pub struct CommandKiller<'a> {
should_abort: Box<dyn Fn() -> Option<AbortReason> + 'a>,
}
impl<'a> CommandKiller<'a> {
pub fn never() -> CommandKiller<'a> {
CommandKiller {
should_abort: Box::new(|| None),
}
}
pub fn from_timeout(timeout: Duration) -> CommandKiller<'a> {
let now = Instant::now();
CommandKiller {
should_abort: Box::new(move || {
if now.elapsed() >= timeout {
return Some(AbortReason::Timeout(timeout));
}
None
}),
}
}
pub fn from_cancelable(is_canceled: &'a dyn Fn() -> bool) -> CommandKiller<'a> {
CommandKiller {
should_abort: Box::new(move || {
if is_canceled() {
return Some(AbortReason::Canceled("Task canceled".to_string()));
}
None
}),
}
}
pub fn from(timeout: Duration, is_canceled: &'a dyn Fn() -> bool) -> CommandKiller<'a> {
let has_timeout = Self::from_timeout(timeout);
let is_canceled = Self::from_cancelable(is_canceled);
CommandKiller {
should_abort: Box::new(move || {
(is_canceled.should_abort)()?;
(has_timeout.should_abort)()?;
None
}),
}
}
pub fn should_abort(&self) -> Option<AbortReason> {
(self.should_abort)()
}
}
pub struct QoveryCommand {
command: Command,
}
@@ -74,10 +130,9 @@ impl QoveryCommand {
pub fn exec(&mut self) -> Result<(), CommandError> {
self.exec_with_abort(
Duration::max_value(),
&mut |line| info!("{}", line),
&mut |line| warn!("{}", line),
|| false,
&CommandKiller::never(),
)
}
@@ -90,36 +145,19 @@ impl QoveryCommand {
STDOUT: FnMut(String),
STDERR: FnMut(String),
{
self.exec_with_abort(Duration::max_value(), stdout_output, stderr_output, || false)
self.exec_with_abort(stdout_output, stderr_output, &CommandKiller::never())
}
pub fn exec_with_timeout<STDOUT, STDERR>(
pub fn exec_with_abort<STDOUT, STDERR>(
&mut self,
timeout: Duration,
stdout_output: &mut STDOUT,
stderr_output: &mut STDERR,
abort_notifier: &CommandKiller,
) -> Result<(), CommandError>
where
STDOUT: FnMut(String),
STDERR: FnMut(String),
{
self.exec_with_abort(timeout, stdout_output, stderr_output, || false)
}
pub fn exec_with_abort<STDOUT, STDERR, F>(
&mut self,
timeout: Duration,
stdout_output: &mut STDOUT,
stderr_output: &mut STDERR,
should_be_killed: F,
) -> Result<(), CommandError>
where
STDOUT: FnMut(String),
STDERR: FnMut(String),
F: Fn() -> bool,
{
assert!(timeout.num_seconds() > 0, "Timeout cannot be a 0 or negative duration");
info!("command: {:?}", self.command);
let mut cmd_handle = self
.command
@@ -128,10 +166,8 @@ impl QoveryCommand {
.spawn()
.map_err(ExecutionError)?;
let process_start_time = Instant::now();
// Read stdout/stderr until timeout is reached
let reader_timeout = std::time::Duration::from_secs(10.min(timeout.num_seconds() as u64));
let reader_timeout = std::time::Duration::from_secs(5);
let stdout = cmd_handle.stdout.take().ok_or(ExecutionError(Error::new(
ErrorKind::BrokenPipe,
"Cannot get stdout for command",
@@ -160,11 +196,7 @@ impl QoveryCommand {
STDERR(Err(err)) => error!("Error on stderr of cmd {:?}: {:?}", self.command, err),
}
if should_be_killed() {
break;
}
if (process_start_time.elapsed().as_secs() as i64) >= timeout.num_seconds() {
if abort_notifier.should_abort().is_some() {
break;
}
}
@@ -180,23 +212,24 @@ impl QoveryCommand {
}
Ok(None) => {
// Does the process should be killed ?
if should_be_killed() {
let msg = format!("Killing process {:?}", self.command);
warn!("{}", msg);
Self::kill(&mut cmd_handle);
return Err(Killed(msg));
}
// Does the timeout has been reached ?
if (process_start_time.elapsed().as_secs() as i64) >= timeout.num_seconds() {
let msg = format!(
"Killing process {:?} due to timeout {}m reached",
self.command,
timeout.num_minutes()
);
warn!("{}", msg);
Self::kill(&mut cmd_handle);
return Err(TimeoutError(msg));
match abort_notifier.should_abort() {
None => {}
Some(AbortReason::Timeout(timeout)) => {
let msg = format!(
"Killing process {:?} due to timeout {}s reached",
self.command,
timeout.as_secs()
);
warn!("{}", msg);
Self::kill(&mut cmd_handle);
return Err(TimeoutError(msg));
}
Some(AbortReason::Canceled(_)) => {
let msg = format!("Killing process {:?}", self.command);
warn!("{}", msg);
Self::kill(&mut cmd_handle);
return Err(Killed(msg));
}
}
}
Err(err) => return Err(ExecutionError(err)),
@@ -252,10 +285,10 @@ where
#[cfg(test)]
mod tests {
use crate::cmd::command::{does_binary_exist, run_version_command_for, CommandError, QoveryCommand};
use chrono::Duration;
use crate::cmd::command::{does_binary_exist, run_version_command_for, CommandError, CommandKiller, QoveryCommand};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};
use std::time::Duration;
use std::{thread, time};
#[test]
@@ -282,17 +315,29 @@ mod tests {
#[test]
fn test_command_with_timeout() {
let mut cmd = QoveryCommand::new("sleep", &vec!["120"], &vec![]);
let ret = cmd.exec_with_timeout(Duration::seconds(2), &mut |_| {}, &mut |_| {});
let ret = cmd.exec_with_abort(
&mut |_| {},
&mut |_| {},
&CommandKiller::from_timeout(Duration::from_secs(2)),
);
assert!(matches!(ret, Err(CommandError::TimeoutError(_))));
let mut cmd = QoveryCommand::new("sh", &vec!["-c", "cat /dev/urandom | grep -a --null-data ."], &vec![]);
let ret = cmd.exec_with_timeout(Duration::seconds(2), &mut |_| {}, &mut |_| {});
let ret = cmd.exec_with_abort(
&mut |_| {},
&mut |_| {},
&CommandKiller::from_timeout(Duration::from_secs(2)),
);
assert!(matches!(ret, Err(CommandError::TimeoutError(_))));
let mut cmd = QoveryCommand::new("sleep", &vec!["1"], &vec![]);
let ret = cmd.exec_with_timeout(Duration::seconds(2), &mut |_| {}, &mut |_| {});
let ret = cmd.exec_with_abort(
&mut |_| {},
&mut |_| {},
&CommandKiller::from_timeout(Duration::from_secs(2)),
);
assert_eq!(ret.is_ok(), true);
}
@@ -313,8 +358,9 @@ mod tests {
});
let cmd_killer = move || should_kill2.load(Ordering::Acquire);
let cmd_killer = CommandKiller::from_cancelable(&cmd_killer);
barrier.wait();
let ret = cmd.exec_with_abort(Duration::max_value(), &mut |_| {}, &mut |_| {}, cmd_killer);
let ret = cmd.exec_with_abort(&mut |_| {}, &mut |_| {}, &cmd_killer);
assert!(matches!(ret, Err(CommandError::Killed(_))));
}

View File

@@ -1,7 +1,6 @@
use crate::cmd::command::{CommandError, QoveryCommand};
use crate::cmd::command::{CommandError, CommandKiller, QoveryCommand};
use crate::errors::EngineError;
use crate::events::EventDetails;
use chrono::Duration;
use std::path::Path;
use std::process::ExitStatus;
use url::Url;
@@ -86,10 +85,9 @@ impl Docker {
let buildx_cmd_exist = docker_exec(
&args,
&docker.get_all_envs(&vec![]),
Some(Duration::max_value()),
&|| false,
&mut |_| {},
&mut |_| {},
&CommandKiller::never(),
);
if let Err(_) = buildx_cmd_exist {
return Err(DockerError::InvalidConfig(format!(
@@ -112,10 +110,9 @@ impl Docker {
let _ = docker_exec(
&args,
&docker.get_all_envs(&vec![]),
Some(Duration::max_value()),
&|| false,
&mut |_| {},
&mut |_| {},
&CommandKiller::never(),
);
Ok(docker)
@@ -149,10 +146,9 @@ impl Docker {
docker_exec(
&args,
&self.get_all_envs(&vec![]),
None,
&|| false,
&mut |line| info!("{}", line),
&mut |line| warn!("{}", line),
&CommandKiller::never(),
)?;
Ok(())
@@ -164,10 +160,9 @@ impl Docker {
let ret = docker_exec(
&vec!["image", "inspect", &image.image_name()],
&self.get_all_envs(&vec![]),
None,
&|| false,
&mut |line| info!("{}", line),
&mut |line| warn!("{}", line),
&CommandKiller::never(),
);
Ok(matches!(ret, Ok(_)))
@@ -180,10 +175,9 @@ impl Docker {
let ret = docker_exec(
&vec!["manifest", "inspect", &image.image_name()],
&self.get_all_envs(&vec![]),
None,
&|| false,
&mut |line| info!("{}", line),
&mut |line| warn!("{}", line),
&CommandKiller::never(),
);
match ret {
@@ -198,22 +192,20 @@ impl Docker {
image: &ContainerImage,
stdout_output: &mut Stdout,
stderr_output: &mut Stderr,
timeout: Duration,
should_abort: &dyn Fn() -> bool,
should_abort: &CommandKiller,
) -> Result<(), DockerError>
where
Stdout: FnMut(String),
Stderr: FnMut(String),
{
info!("Docker pull {:?}, timeout: {:?}", image, timeout);
info!("Docker pull {:?}", image);
docker_exec(
&vec!["pull", &image.image_name()],
&self.get_all_envs(&vec![]),
Some(timeout),
should_abort,
stdout_output,
stderr_output,
should_abort,
)
}
@@ -227,8 +219,7 @@ impl Docker {
push_after_build: bool,
stdout_output: &mut Stdout,
stderr_output: &mut Stderr,
timeout: Duration,
should_abort: &dyn Fn() -> bool,
should_abort: &CommandKiller,
) -> Result<(), DockerError>
where
Stdout: FnMut(String),
@@ -239,11 +230,6 @@ impl Docker {
return Ok(());
}
// if it is already aborted, nothing to do
if (should_abort)() {
return Err(DockerError::Aborted("build".to_string()));
}
// Do some checks
if !dockerfile.is_file() {
return Err(DockerError::InvalidConfig(format!(
@@ -269,7 +255,6 @@ impl Docker {
push_after_build,
stdout_output,
stderr_output,
timeout,
should_abort,
)
} else {
@@ -282,7 +267,6 @@ impl Docker {
push_after_build,
stdout_output,
stderr_output,
timeout,
should_abort,
)
}
@@ -298,8 +282,7 @@ impl Docker {
push_after_build: bool,
stdout_output: &mut Stdout,
stderr_output: &mut Stderr,
timeout: Duration,
should_abort: &dyn Fn() -> bool,
should_abort: &CommandKiller,
) -> Result<(), DockerError>
where
Stdout: FnMut(String),
@@ -308,7 +291,7 @@ impl Docker {
info!("Docker build {:?}", image_to_build.image_name());
// Best effort to pull the cache, if it does not exist that's ok too
let _ = self.pull(cache, stdout_output, stderr_output, timeout, should_abort);
let _ = self.pull(cache, stdout_output, stderr_output, should_abort);
let mut args_string: Vec<String> = vec![
"build".to_string(),
@@ -338,14 +321,13 @@ impl Docker {
let _ = docker_exec(
&args_string.iter().map(|x| x.as_str()).collect::<Vec<&str>>(),
&self.get_all_envs(&vec![]),
Some(timeout),
should_abort,
stdout_output,
stderr_output,
should_abort,
)?;
if push_after_build {
let _ = self.push(image_to_build, stdout_output, stderr_output, timeout, should_abort)?;
let _ = self.push(image_to_build, stdout_output, stderr_output, should_abort)?;
}
Ok(())
@@ -361,8 +343,7 @@ impl Docker {
push_after_build: bool,
stdout_output: &mut Stdout,
stderr_output: &mut Stderr,
timeout: Duration,
should_abort: &dyn Fn() -> bool,
should_abort: &CommandKiller,
) -> Result<(), DockerError>
where
Stdout: FnMut(String),
@@ -405,10 +386,9 @@ impl Docker {
docker_exec(
&args_string.iter().map(|x| x.as_str()).collect::<Vec<&str>>(),
&self.get_all_envs(&vec![]),
Some(timeout),
should_abort,
stdout_output,
stderr_output,
should_abort,
)
}
@@ -417,14 +397,13 @@ impl Docker {
image: &ContainerImage,
stdout_output: &mut Stdout,
stderr_output: &mut Stderr,
timeout: Duration,
should_abort: &dyn Fn() -> bool,
should_abort: &CommandKiller,
) -> Result<(), DockerError>
where
Stdout: FnMut(String),
Stderr: FnMut(String),
{
info!("Docker push {:?}, timeout: {:?}", image, timeout);
info!("Docker push {:?}", image);
let image_names = image.image_names();
let mut args = vec!["push"];
args.extend(image_names.iter().map(|x| x.as_str()));
@@ -432,10 +411,9 @@ impl Docker {
docker_exec(
&args,
&self.get_all_envs(&vec![]),
Some(timeout),
should_abort,
stdout_output,
stderr_output,
should_abort,
)
}
}
@@ -443,18 +421,16 @@ impl Docker {
fn docker_exec<F, X>(
args: &[&str],
envs: &[(&str, &str)],
timeout: Option<Duration>,
should_abort: &dyn Fn() -> bool,
stdout_output: &mut F,
stderr_output: &mut X,
cmd_killer: &CommandKiller,
) -> Result<(), DockerError>
where
F: FnMut(String),
X: FnMut(String),
{
let timeout = timeout.unwrap_or_else(|| Duration::max_value());
let mut cmd = QoveryCommand::new("docker", args, envs);
let ret = cmd.exec_with_abort(timeout, stdout_output, stderr_output, should_abort);
let ret = cmd.exec_with_abort(stdout_output, stderr_output, &cmd_killer);
match ret {
Ok(_) => Ok(()),
@@ -474,9 +450,10 @@ pub fn to_engine_error(event_details: &EventDetails, error: DockerError) -> Engi
#[cfg(feature = "test-with-docker")]
#[cfg(test)]
mod tests {
use crate::cmd::command::CommandKiller;
use crate::cmd::docker::{ContainerImage, Docker, DockerError};
use chrono::Duration;
use std::path::Path;
use std::time::Duration;
use url::Url;
fn private_registry_url() -> Url {
@@ -497,8 +474,7 @@ mod tests {
&image,
&mut |msg| println!("{}", msg),
&mut |msg| eprintln!("{}", msg),
Duration::max_value(),
&|| false,
&CommandKiller::never(),
);
assert!(matches!(ret, Err(_)));
@@ -513,8 +489,7 @@ mod tests {
&image,
&mut |msg| println!("{}", msg),
&mut |msg| eprintln!("{}", msg),
Duration::max_value(),
&|| false,
&CommandKiller::never(),
);
assert!(matches!(ret, Ok(_)));
@@ -523,8 +498,7 @@ mod tests {
&image,
&mut |msg| println!("{}", msg),
&mut |msg| eprintln!("{}", msg),
Duration::seconds(1),
&|| false,
&CommandKiller::from_timeout(Duration::from_secs(1)),
);
assert!(matches!(ret, Err(DockerError::Timeout(_))));
}
@@ -554,8 +528,7 @@ mod tests {
false,
&mut |msg| println!("{}", msg),
&mut |msg| eprintln!("{}", msg),
Duration::max_value(),
&|| false,
&CommandKiller::never(),
);
assert!(matches!(ret, Ok(_)));
@@ -570,8 +543,7 @@ mod tests {
false,
&mut |msg| println!("{}", msg),
&mut |msg| eprintln!("{}", msg),
Duration::max_value(),
&|| false,
&CommandKiller::never(),
);
assert!(matches!(ret, Err(_)));
@@ -603,8 +575,7 @@ mod tests {
false,
&mut |msg| println!("{}", msg),
&mut |msg| eprintln!("{}", msg),
Duration::max_value(),
&|| false,
&CommandKiller::never(),
);
assert!(matches!(ret, Ok(_)));
@@ -618,8 +589,7 @@ mod tests {
false,
&mut |msg| println!("{}", msg),
&mut |msg| eprintln!("{}", msg),
Duration::max_value(),
&|| false,
&CommandKiller::never(),
);
assert!(matches!(ret, Ok(_)));
@@ -651,8 +621,7 @@ mod tests {
false,
&mut |msg| println!("{}", msg),
&mut |msg| eprintln!("{}", msg),
Duration::max_value(),
&|| false,
&CommandKiller::never(),
);
assert!(matches!(ret, Ok(_)));
@@ -666,8 +635,7 @@ mod tests {
&image_to_build,
&mut |msg| println!("{}", msg),
&mut |msg| eprintln!("{}", msg),
Duration::max_value(),
&|| false,
&CommandKiller::never(),
);
assert!(matches!(ret, Ok(_)));
@@ -675,8 +643,7 @@ mod tests {
&image_to_build,
&mut |msg| println!("{}", msg),
&mut |msg| eprintln!("{}", msg),
Duration::max_value(),
&|| false,
&CommandKiller::never(),
);
assert!(matches!(ret, Ok(_)));
}

View File

@@ -10,7 +10,6 @@ use crate::cmd::helm::HelmError::{CannotRollback, CmdError, InvalidKubeConfig, R
use crate::cmd::structs::{HelmChart, HelmListItem};
use crate::errors::{CommandError, EngineError};
use crate::events::EventDetails;
use chrono::Duration;
use semver::Version;
use serde_derive::Deserialize;
use std::fs::File;
@@ -548,7 +547,7 @@ where
// Helm returns an error each time a command does not succeed as they want. Which leads to handling error with status code 1
// It means that the command successfully ran, but it didn't terminate as expected
let mut cmd = QoveryCommand::new("helm", args, envs);
match cmd.exec_with_timeout(Duration::max_value(), stdout_output, stderr_output) {
match cmd.exec_with_output(stdout_output, stderr_output) {
Err(err) => Err(CommandError::new(format!("{:?}", err), None)),
_ => Ok(()),
}

View File

@@ -1,6 +1,5 @@
use std::path::Path;
use chrono::Duration;
use retry::delay::Fibonacci;
use retry::OperationResult;
use serde::de::DeserializeOwned;
@@ -41,7 +40,7 @@ where
{
let mut cmd = QoveryCommand::new("kubectl", &args, &envs);
if let Err(err) = cmd.exec_with_timeout(Duration::max_value(), stdout_output, stderr_output) {
if let Err(err) = cmd.exec_with_output(stdout_output, stderr_output) {
let args_string = args.join(" ");
let msg = format!("Error on command: kubectl {}. {:?}", args_string, &err);
error!("{}", &msg);