Fix command with timeout

- reading on stdout/stderr may be blocking if the app does
    not output anything, thus we never reach the code that abort/timeout
    the process

  - Fix: Properly timeout reading stdout/stderr

  - Try to interleave stdout and stderr to have output in the correct
  order. For a better alternative timestamp should be added and remerged
  after it
This commit is contained in:
Romain GERARD
2021-03-22 12:44:24 +01:00
committed by Pierre Mavro
parent 4d79d21118
commit b6e7a529d1
3 changed files with 103 additions and 18 deletions

29
Cargo.lock generated
View File

@@ -1589,6 +1589,19 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "nix"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50e4785f2c3b7589a0d0c1dd60285e1188adac4006e8abd6dd578e1567027363"
dependencies = [
"bitflags",
"cc",
"cfg-if 0.1.10",
"libc",
"void",
]
[[package]]
name = "ntapi"
version = "0.3.6"
@@ -1914,6 +1927,7 @@ dependencies = [
"tar",
"tera",
"test-utilities",
"timeout-readwrite",
"tokio 0.2.22",
"tracing",
"tracing-subscriber",
@@ -2995,6 +3009,15 @@ dependencies = [
"syn 1.0.48",
]
[[package]]
name = "timeout-readwrite"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f9e83663e1c312cbc7980611fa82ed60d622a947de3546350b549e5d69f9bc"
dependencies = [
"nix",
]
[[package]]
name = "tinyvec"
version = "0.3.4"
@@ -3501,6 +3524,12 @@ version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
[[package]]
name = "void"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
[[package]]
name = "walkdir"
version = "2.3.1"

View File

@@ -21,6 +21,7 @@ rand = "0.7.3"
gethostname = "0.2.1"
reqwest = { version = "0.10.8", features = ["blocking"] }
futures = "0.3"
timeout-readwrite = "0.3.1"
# FIXME use https://crates.io/crates/blocking instead of runtime.rs

View File

@@ -1,13 +1,21 @@
use std::ffi::OsStr;
use std::io::Error;
use std::io::{BufRead, BufReader};
use std::io::{Error, ErrorKind};
use std::path::Path;
use std::process::{Child, Command, Stdio};
use crate::cmd::utilities::CommandOutputType::{STDERR, STDOUT};
use crate::error::SimpleErrorKind::Other;
use crate::error::{SimpleError, SimpleErrorKind};
use chrono::Duration;
use itertools::Itertools;
use std::time::Instant;
use timeout_readwrite::TimeoutReader;
enum CommandOutputType {
STDOUT(Result<String, std::io::Error>),
STDERR(Result<String, std::io::Error>),
}
fn command<P>(binary: P, args: Vec<&str>, envs: Option<Vec<(&str, &str)>>, use_output: bool) -> Command
where
@@ -151,8 +159,8 @@ pub fn exec_with_envs_and_output<P, F, X>(
binary: P,
args: Vec<&str>,
envs: Vec<(&str, &str)>,
stdout_output: F,
stderr_output: X,
mut stdout_output: F,
mut stderr_output: X,
timeout: Duration,
) -> Result<(), SimpleError>
where
@@ -160,36 +168,66 @@ where
F: FnMut(Result<String, Error>),
X: FnMut(Result<String, Error>),
{
let command_string = command_with_envs_to_string(binary.as_ref(), &args, &envs);
info!("command: {}", command_string.as_str());
assert!(timeout.num_seconds() > 0, "Timeout cannot be a 0 or negative duration");
let mut child = _with_output(
command(binary, args, Some(envs), true).spawn().unwrap(),
stdout_output,
stderr_output,
let command_string = command_with_envs_to_string(binary.as_ref(), &args, &envs);
info!(
"command with {}m timeout: {}",
timeout.num_minutes(),
command_string.as_str()
);
// Start the process
let mut child_process = command(binary, args, Some(envs), true).spawn().unwrap();
let process_start_time = Instant::now();
// Read stdout/stderr until timeout is reached
let reader_timeout = std::time::Duration::from_secs(1);
let stdout_reader = BufReader::new(TimeoutReader::new(child_process.stdout.take().unwrap(), reader_timeout))
.lines()
.map(STDOUT);
let stderr_reader = BufReader::new(TimeoutReader::new(child_process.stderr.take().unwrap(), reader_timeout))
.lines()
.map(STDERR);
for line in stdout_reader.interleave(stderr_reader) {
match line {
STDOUT(Err(ref err)) | STDERR(Err(ref err)) if err.kind() == ErrorKind::TimedOut => {
if (process_start_time.elapsed().as_secs() as i64) >= timeout.num_seconds() {
break;
}
}
STDOUT(line) => stdout_output(line),
STDERR(line) => stderr_output(line),
}
}
// Wait for the process to exit before reaching the timeout
// If not, we just kill it
let start = Instant::now();
let exit_status;
loop {
match child.try_wait() {
match child_process.try_wait() {
Ok(Some(status)) => {
exit_status = status;
break;
}
Ok(None) => {
if (start.elapsed().as_secs() as i64) < timeout.num_seconds() {
if (process_start_time.elapsed().as_secs() as i64) < timeout.num_seconds() {
std::thread::sleep(std::time::Duration::from_secs(1));
continue;
}
// Timeout !
let _ = child
warn!(
"Killing process {} due to timeout {}m reached",
command_string,
timeout.num_minutes()
);
let _ = child_process
.kill() //Fire
.map(|_| child.wait())
.map_err(|err| error!("Cannot kill process {:?} {}", child, err));
.map(|_| child_process.wait())
.map_err(|err| error!("Cannot kill process {:?} {}", child_process, err));
return Err(SimpleError::new(
Other,
@@ -218,7 +256,7 @@ pub fn run_version_command_for(binary_name: &str) -> String {
binary_name,
vec!["--version"],
|r_out| match r_out {
Ok(s) => output_from_cmd.push_str(&s.to_owned()),
Ok(s) => output_from_cmd.push_str(&s),
Err(e) => error!("Error while getting stdout from {} {}", binary_name, e),
},
|r_err| match r_err {
@@ -243,14 +281,14 @@ where
.is_ok()
}
pub fn command_to_string<P>(binary: P, args: &Vec<&str>) -> String
pub fn command_to_string<P>(binary: P, args: &[&str]) -> String
where
P: AsRef<Path>,
{
format!("{} {}", binary.as_ref().to_str().unwrap(), args.join(" "))
}
pub fn command_with_envs_to_string<P>(binary: P, args: &Vec<&str>, envs: &Vec<(&str, &str)>) -> String
pub fn command_with_envs_to_string<P>(binary: P, args: &[&str], envs: &[(&str, &str)]) -> String
where
P: AsRef<Path>,
{
@@ -263,3 +301,20 @@ where
args.join(" ")
)
}
#[cfg(test)]
mod tests {
use crate::cmd::utilities::exec_with_envs_and_output;
use chrono::Duration;
#[test]
fn test_command_with_timeout() {
let ret = exec_with_envs_and_output("sleep", vec!["120"], vec![], |_| {}, |_| {}, Duration::seconds(2));
assert_eq!(ret.is_err(), true);
assert_eq!(ret.err().unwrap().message.unwrap().contains("timeout"), true);
let ret2 = exec_with_envs_and_output("sleep", vec!["1"], vec![], |_| {}, |_| {}, Duration::seconds(5));
assert_eq!(ret2.is_ok(), true);
}
}