From b6e7a529d115f272f35c69020a9ad8b1c5a49ce5 Mon Sep 17 00:00:00 2001 From: Romain GERARD Date: Mon, 22 Mar 2021 12:44:24 +0100 Subject: [PATCH] Fix command with timeout - reading on stdout/stderr may be blocking if the app does not output anything, thus we never reach the code that abort/timeout the process - Fix: Properly timeout reading stdout/stderr - Try to interleave stdout and stderr to have output in the correct order. For a better alternative timestamp should be added and remerged after it --- Cargo.lock | 29 ++++++++++++++ Cargo.toml | 1 + src/cmd/utilities.rs | 91 +++++++++++++++++++++++++++++++++++--------- 3 files changed, 103 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69c37e85..1a8e176a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1589,6 +1589,19 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "nix" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50e4785f2c3b7589a0d0c1dd60285e1188adac4006e8abd6dd578e1567027363" +dependencies = [ + "bitflags", + "cc", + "cfg-if 0.1.10", + "libc", + "void", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -1914,6 +1927,7 @@ dependencies = [ "tar", "tera", "test-utilities", + "timeout-readwrite", "tokio 0.2.22", "tracing", "tracing-subscriber", @@ -2995,6 +3009,15 @@ dependencies = [ "syn 1.0.48", ] +[[package]] +name = "timeout-readwrite" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f9e83663e1c312cbc7980611fa82ed60d622a947de3546350b549e5d69f9bc" +dependencies = [ + "nix", +] + [[package]] name = "tinyvec" version = "0.3.4" @@ -3501,6 +3524,12 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "walkdir" version = "2.3.1" diff --git a/Cargo.toml b/Cargo.toml index 3311cc35..b9f8d8d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ rand = "0.7.3" gethostname = "0.2.1" reqwest = { version = "0.10.8", features = ["blocking"] } futures = "0.3" +timeout-readwrite = "0.3.1" # FIXME use https://crates.io/crates/blocking instead of runtime.rs diff --git a/src/cmd/utilities.rs b/src/cmd/utilities.rs index ec8dac40..788c288c 100644 --- a/src/cmd/utilities.rs +++ b/src/cmd/utilities.rs @@ -1,13 +1,21 @@ use std::ffi::OsStr; -use std::io::Error; use std::io::{BufRead, BufReader}; +use std::io::{Error, ErrorKind}; use std::path::Path; use std::process::{Child, Command, Stdio}; +use crate::cmd::utilities::CommandOutputType::{STDERR, STDOUT}; use crate::error::SimpleErrorKind::Other; use crate::error::{SimpleError, SimpleErrorKind}; use chrono::Duration; +use itertools::Itertools; use std::time::Instant; +use timeout_readwrite::TimeoutReader; + +enum CommandOutputType { + STDOUT(Result), + STDERR(Result), +} fn command

(binary: P, args: Vec<&str>, envs: Option>, use_output: bool) -> Command where @@ -151,8 +159,8 @@ pub fn exec_with_envs_and_output( binary: P, args: Vec<&str>, envs: Vec<(&str, &str)>, - stdout_output: F, - stderr_output: X, + mut stdout_output: F, + mut stderr_output: X, timeout: Duration, ) -> Result<(), SimpleError> where @@ -160,36 +168,66 @@ where F: FnMut(Result), X: FnMut(Result), { - let command_string = command_with_envs_to_string(binary.as_ref(), &args, &envs); - info!("command: {}", command_string.as_str()); + assert!(timeout.num_seconds() > 0, "Timeout cannot be a 0 or negative duration"); - let mut child = _with_output( - command(binary, args, Some(envs), true).spawn().unwrap(), - stdout_output, - stderr_output, + let command_string = command_with_envs_to_string(binary.as_ref(), &args, &envs); + info!( + "command with {}m timeout: {}", + timeout.num_minutes(), + command_string.as_str() ); + // Start the process + let mut child_process = command(binary, args, Some(envs), true).spawn().unwrap(); + let process_start_time = Instant::now(); + + // Read stdout/stderr until timeout is reached + let reader_timeout = std::time::Duration::from_secs(1); + let stdout_reader = BufReader::new(TimeoutReader::new(child_process.stdout.take().unwrap(), reader_timeout)) + .lines() + .map(STDOUT); + + let stderr_reader = BufReader::new(TimeoutReader::new(child_process.stderr.take().unwrap(), reader_timeout)) + .lines() + .map(STDERR); + + for line in stdout_reader.interleave(stderr_reader) { + match line { + STDOUT(Err(ref err)) | STDERR(Err(ref err)) if err.kind() == ErrorKind::TimedOut => { + if (process_start_time.elapsed().as_secs() as i64) >= timeout.num_seconds() { + break; + } + } + STDOUT(line) => stdout_output(line), + STDERR(line) => stderr_output(line), + } + } + // Wait for the process to exit before reaching the timeout // If not, we just kill it - let start = Instant::now(); let exit_status; loop { - match child.try_wait() { + match child_process.try_wait() { Ok(Some(status)) => { exit_status = status; break; } Ok(None) => { - if (start.elapsed().as_secs() as i64) < timeout.num_seconds() { + if (process_start_time.elapsed().as_secs() as i64) < timeout.num_seconds() { std::thread::sleep(std::time::Duration::from_secs(1)); continue; } // Timeout ! - let _ = child + warn!( + "Killing process {} due to timeout {}m reached", + command_string, + timeout.num_minutes() + ); + let _ = child_process .kill() //Fire - .map(|_| child.wait()) - .map_err(|err| error!("Cannot kill process {:?} {}", child, err)); + .map(|_| child_process.wait()) + .map_err(|err| error!("Cannot kill process {:?} {}", child_process, err)); return Err(SimpleError::new( Other, @@ -218,7 +256,7 @@ pub fn run_version_command_for(binary_name: &str) -> String { binary_name, vec!["--version"], |r_out| match r_out { - Ok(s) => output_from_cmd.push_str(&s.to_owned()), + Ok(s) => output_from_cmd.push_str(&s), Err(e) => error!("Error while getting stdout from {} {}", binary_name, e), }, |r_err| match r_err { @@ -243,14 +281,14 @@ where .is_ok() } -pub fn command_to_string

(binary: P, args: &Vec<&str>) -> String +pub fn command_to_string

(binary: P, args: &[&str]) -> String where P: AsRef, { format!("{} {}", binary.as_ref().to_str().unwrap(), args.join(" ")) } -pub fn command_with_envs_to_string

(binary: P, args: &Vec<&str>, envs: &Vec<(&str, &str)>) -> String +pub fn command_with_envs_to_string

(binary: P, args: &[&str], envs: &[(&str, &str)]) -> String where P: AsRef, { @@ -263,3 +301,20 @@ where args.join(" ") ) } + +#[cfg(test)] +mod tests { + use crate::cmd::utilities::exec_with_envs_and_output; + use chrono::Duration; + + #[test] + fn test_command_with_timeout() { + let ret = exec_with_envs_and_output("sleep", vec!["120"], vec![], |_| {}, |_| {}, Duration::seconds(2)); + assert_eq!(ret.is_err(), true); + assert_eq!(ret.err().unwrap().message.unwrap().contains("timeout"), true); + + let ret2 = exec_with_envs_and_output("sleep", vec!["1"], vec![], |_| {}, |_| {}, Duration::seconds(5)); + + assert_eq!(ret2.is_ok(), true); + } +}