mirror of
https://github.com/jlengrand/engine.git
synced 2026-03-10 08:11:21 +00:00
use async way to download kubeconfig but add block_on future
This commit is contained in:
@@ -20,6 +20,7 @@ dns-lookup = "1.0.3"
|
||||
rand = "0.7.3"
|
||||
gethostname = "0.2.1"
|
||||
reqwest = { version = "0.10.8", features = ["blocking"] }
|
||||
futures = "0.3"
|
||||
# FIXME use https://crates.io/crates/blocking instead of runtime.rs
|
||||
|
||||
# tar gz
|
||||
|
||||
@@ -12,6 +12,8 @@ use retry::OperationResult;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
extern crate serde_json;
|
||||
use futures::executor::block_on;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
pub fn kubernetes_config_path(
|
||||
workspace_directory: &str,
|
||||
@@ -28,42 +30,18 @@ pub fn kubernetes_config_path(
|
||||
workspace_directory, kubernetes_cluster_id
|
||||
);
|
||||
|
||||
let result = retry::retry(Fixed::from_millis(3000).take(5), || {
|
||||
let try_kubeconfig = download_space_object(
|
||||
spaces_access_id,
|
||||
spaces_secret_key,
|
||||
kubernetes_config_bucket_name.as_str(),
|
||||
kubernetes_config_object_key.as_str(),
|
||||
region,
|
||||
);
|
||||
match try_kubeconfig {
|
||||
Ok(kubeconfig) => OperationResult::Ok(kubeconfig),
|
||||
Err(err) => {
|
||||
warn!("Failed to download the kubeconfig file, retrying...");
|
||||
return OperationResult::Err(format!(
|
||||
"Unable to download the kubeconfig file from space: {:?}",
|
||||
err
|
||||
));
|
||||
}
|
||||
}
|
||||
});
|
||||
// Ok if the kubeconfig is downloaded, put it as file !
|
||||
match result {
|
||||
Ok(downloaded) => {
|
||||
let mut file = File::create(kubernetes_config_file_path.clone())
|
||||
.expect("Unable to create the Kubeconfig file on disk");
|
||||
file.write_all(downloaded.as_bytes())
|
||||
.expect("Unable to write anything on hte kubeconfig file");
|
||||
Ok(kubernetes_config_file_path)
|
||||
}
|
||||
Err(e) => Err(SimpleError::new(
|
||||
SimpleErrorKind::Other,
|
||||
Some(format!(
|
||||
"Unable to download the kubeconfig file after many retries {:?}",
|
||||
e
|
||||
)),
|
||||
)),
|
||||
}
|
||||
let future_kubeconfig = download_space_object(
|
||||
spaces_access_id,
|
||||
spaces_secret_key,
|
||||
kubernetes_config_bucket_name.as_str(),
|
||||
kubernetes_config_object_key.as_str(),
|
||||
region,
|
||||
kubernetes_config_file_path.as_str().clone(),
|
||||
);
|
||||
Runtime::new()
|
||||
.expect("Failed to create Tokio runtime to download kubeconfig")
|
||||
.block_on(future_kubeconfig);
|
||||
Ok(kubernetes_config_file_path.clone())
|
||||
}
|
||||
|
||||
pub const do_cluster_api_path: &str = "https://api.digitalocean.com/v2/kubernetes/clusters";
|
||||
|
||||
@@ -3,75 +3,53 @@ use crate::s3::get_object;
|
||||
use rusoto_core::{Client, HttpClient, Region, RusotoError};
|
||||
use rusoto_credential::StaticProvider;
|
||||
use rusoto_s3::{GetObjectError, GetObjectOutput, GetObjectRequest, S3Client, S3};
|
||||
use std::io::Read;
|
||||
use std::io::{Cursor, Error};
|
||||
use tokio::macros::support::Future;
|
||||
use tokio::runtime::{Builder, Runtime};
|
||||
struct Sync_do_space {
|
||||
client: S3Client,
|
||||
runtime: Runtime,
|
||||
}
|
||||
use tokio::{fs::File, io};
|
||||
|
||||
// implement synchronous way to download s3 objects... yeah !
|
||||
impl Sync_do_space {
|
||||
fn new(access_key_id: &str, secret_access_key: &str, region: &str) -> Result<Self, Error> {
|
||||
let credentials = StaticProvider::new(
|
||||
access_key_id.to_string(),
|
||||
secret_access_key.to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let client = Client::new_with(credentials, HttpClient::new().unwrap());
|
||||
let endpoint_region = Region::Custom {
|
||||
name: region.to_string(),
|
||||
endpoint: format!("https://{}.digitaloceanspaces.com",region),
|
||||
};
|
||||
Ok(Sync_do_space {
|
||||
client: S3Client::new_with_client(client, endpoint_region),
|
||||
runtime: Builder::new().basic_scheduler().enable_all().build()?,
|
||||
})
|
||||
}
|
||||
|
||||
fn get_object(&mut self, request: GetObjectRequest) -> Result<String, SimpleError> {
|
||||
let response = self.runtime.block_on(self.client.get_object(request));
|
||||
match response {
|
||||
Ok(res) => {
|
||||
let mut body = String::new();
|
||||
res.body
|
||||
.unwrap()
|
||||
.into_blocking_read()
|
||||
.read_to_string(&mut body);
|
||||
Ok(body)
|
||||
}
|
||||
Err(e) => Err(SimpleError::new(
|
||||
SimpleErrorKind::Other,
|
||||
Some(e.to_string()),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn download_space_object(
|
||||
pub(crate) async fn download_space_object(
|
||||
access_key_id: &str,
|
||||
secret_access_key: &str,
|
||||
bucket_name: &str,
|
||||
object_key: &str,
|
||||
region: &str
|
||||
) -> Result<String, SimpleError> {
|
||||
let sync_do_space = Sync_do_space::new(access_key_id, secret_access_key,region);
|
||||
match sync_do_space {
|
||||
Ok(mut client) => {
|
||||
let mut or = GetObjectRequest::default();
|
||||
or.bucket = bucket_name.to_string();
|
||||
or.key = object_key.to_string();
|
||||
let res_body = client.get_object(or);
|
||||
match res_body {
|
||||
Ok(body) => Ok(body),
|
||||
Err(e) => Err(e),
|
||||
region: &str,
|
||||
path_to_download: &str,
|
||||
) {
|
||||
//Digital ocean doesn't implement any space download, it use the generic AWS SDK
|
||||
let region = Region::Custom {
|
||||
name: region.to_string(),
|
||||
endpoint: format!("https://{}.digitaloceanspaces.com", region),
|
||||
};
|
||||
let credentials = StaticProvider::new(
|
||||
access_key_id.to_string(),
|
||||
secret_access_key.to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let client = Client::new_with(credentials, HttpClient::new().unwrap());
|
||||
let s3_client = S3Client::new_with_client(client, region.clone());
|
||||
let mut object = s3_client
|
||||
.get_object(GetObjectRequest {
|
||||
bucket: bucket_name.to_string(),
|
||||
key: object_key.to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
match object {
|
||||
Ok(mut obj_bod) => {
|
||||
let body = obj_bod.body.take();
|
||||
let mut body = body.unwrap().into_async_read();
|
||||
let mut file = File::create(path_to_download.clone()).await;
|
||||
match file {
|
||||
Ok(mut created_file) => match io::copy(&mut body, &mut created_file).await {
|
||||
Ok(_) => info!("File {} is well downloaded", path_to_download),
|
||||
Err(e) => error!("{:?}", e),
|
||||
},
|
||||
Err(e) => error!("Unable to create file"),
|
||||
}
|
||||
}
|
||||
Err(e) => Err(SimpleError::new(
|
||||
SimpleErrorKind::Other,
|
||||
Some(e.to_string()),
|
||||
)),
|
||||
}
|
||||
Err(e) => error!("{:?}", e),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use test_utilities::digitalocean::digital_ocean_token;
|
||||
use test_utilities::utilities::{context, init};
|
||||
use tracing::{debug, error, info, span, warn, Level};
|
||||
|
||||
#[test]
|
||||
fn deploy_a_working_environment_with_no_router_on_do() {
|
||||
init();
|
||||
let span = span!(
|
||||
@@ -50,7 +51,7 @@ fn deploy_a_working_environment_with_no_router_on_do() {
|
||||
};*/
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn deploy_a_working_environment_router_and_app_on_do() {
|
||||
init();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user