| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! Download blob data from Google Cloud Storage (GCS). |
| |
| use { |
| crate::token_store::TokenStore, |
| anyhow::{bail, Context, Result}, |
| fuchsia_hyper::{new_https_client, HttpsClient}, |
| hyper::{body::HttpBody as _, Body, Response, StatusCode}, |
| std::{ |
| fs::{create_dir_all, File}, |
| io::Write, |
| path::Path, |
| sync::Arc, |
| }, |
| }; |
| |
| /// Create clients with credentials for use with GCS. |
| /// |
| /// Avoid more than one GCS ClientFactory with the *same auth* at a time. One way |
| /// to accomplish this is with a static once cell. |
| /// ``` |
| /// use once_cell::sync::OnceCell; |
| /// static GCS_CLIENT_FACTORY: OnceCell<ClientFactory> = OnceCell::new(); |
| /// let client_factory = GCS_CLIENT_FACTORY.get_or_init(|| { |
| /// let auth = [...]; |
| /// ClientFactory::new_with_auth(auth).expect(...) |
| /// }); |
| /// ``` |
| /// If more than one GCS ClientFactory with auth is active at the same time, the |
| /// creation of access tokens may create unnecessary network traffic (spamming) |
| /// or contention. |
| /// |
| /// Note: A ClientFactory using `TokenStore::new_without_auth()` doesn't have |
| /// issues with more than one instance since there are no tokens to update. |
| /// |
| /// The ClientFactory is thread/async safe to encourage creating a single, |
| /// shared instance. |
| pub struct ClientFactory { |
| token_store: Arc<TokenStore>, |
| } |
| |
| impl ClientFactory { |
| /// Create a ClientFactory. Avoid creating more than one (see above). |
| pub fn new(token_store: TokenStore) -> Self { |
| let token_store = Arc::new(token_store); |
| Self { token_store } |
| } |
| |
| /// Create a new https client with shared access to the GCS credentials. |
| /// |
| /// Multiple clients may be created to perform downloads in parallel. |
| pub fn create_client(&self) -> Client { |
| Client::from_token_store(self.token_store.clone()) |
| } |
| } |
| |
| /// An https client capable of fetching objects from GCS. |
| #[derive(Clone, Debug)] |
| pub struct Client { |
| /// Base client used for HTTP/S IO. |
| https: HttpsClient, |
| token_store: Arc<TokenStore>, |
| } |
| |
| impl Client { |
| /// An https client that used a `token_store` to authenticate with GCS. |
| /// |
| /// The `token_store` may be used by multiple clients, even in separate |
| /// threads. It's preferable to share a single token_store to share the |
| /// access token stored therein. |
| /// |
| /// This is sufficient for downloading publicly accessible data blobs from |
| /// GCS. |
| /// |
| /// Intentionally not public. Use ClientFactory::new_client() instead. |
| fn from_token_store(token_store: Arc<TokenStore>) -> Self { |
| Self { https: new_https_client(), token_store } |
| } |
| |
| /// Save content of matching objects (blob) from GCS to local location |
| /// `output_dir`. |
| pub async fn fetch_all<P, W>( |
| &self, |
| bucket: &str, |
| prefix: &str, |
| output_dir: P, |
| verbose: bool, |
| writer: &mut W, |
| ) -> Result<()> |
| where |
| P: AsRef<Path>, |
| W: Write + Sync, |
| { |
| let objects = self |
| .token_store |
| .list(&self.https, bucket, prefix) |
| .await |
| .context("listing with token store")?; |
| let output_dir = output_dir.as_ref(); |
| for object in objects { |
| if let Some(relative_path) = object.strip_prefix(prefix) { |
| let start = std::time::Instant::now(); |
| // Strip leading slash, if present. |
| let relative_path = if relative_path.starts_with("/") { |
| &relative_path[1..] |
| } else { |
| relative_path |
| }; |
| let output_path = if relative_path.is_empty() { |
| // The `relative_path` is empty with then specified prefix |
| // is a file. |
| output_dir.join(Path::new(prefix).file_name().expect("Prefix file name.")) |
| } else { |
| output_dir.join(relative_path) |
| }; |
| |
| if let Some(parent) = output_path.parent() { |
| create_dir_all(&parent) |
| .with_context(|| format!("creating dir all for {:?}", parent))?; |
| } |
| let mut file = File::create(&output_path).context("creating file")?; |
| if verbose { |
| writeln!(writer, "GCS fetch: gs://{}/{}", bucket, object)?; |
| } else { |
| write!(writer, ".")?; |
| writer.flush()?; |
| } |
| self.write(bucket, &object, &mut file).await.context("writing object")?; |
| |
| use std::io::{Seek, SeekFrom}; |
| let file_size = file.seek(SeekFrom::End(0)).context("getting file size")?; |
| log::debug!( |
| "Wrote gs://{}/{} to {:?}, {} bytes in {} seconds.", |
| bucket, |
| object, |
| output_path, |
| file_size, |
| start.elapsed().as_secs_f32() |
| ); |
| } |
| } |
| if !verbose { |
| writeln!(writer, "")?; |
| } |
| Ok(()) |
| } |
| |
| /// Save content of a stored object (blob) from GCS at location `output`. |
| /// |
| /// Wraps call to `self.write` which wraps `self.stream()`. |
| pub async fn fetch<P: AsRef<Path>>(&self, bucket: &str, object: &str, output: P) -> Result<()> { |
| let mut file = File::create(output.as_ref())?; |
| self.write(bucket, object, &mut file).await |
| } |
| |
| /// Reads content of a stored object (blob) from GCS. |
| pub async fn stream(&self, bucket: &str, object: &str) -> Result<Response<Body>> { |
| self.token_store.download(&self.https, bucket, object).await |
| } |
| |
| /// Write content of a stored object (blob) from GCS to writer. |
| /// |
| /// Wraps call to `self.stream`. |
| pub async fn write<W: Write>(&self, bucket: &str, object: &str, writer: &mut W) -> Result<()> { |
| let mut res = self.stream(bucket, object).await?; |
| if res.status() == StatusCode::OK { |
| while let Some(next) = res.data().await { |
| let chunk = next?; |
| writer.write_all(&chunk)?; |
| } |
| return Ok(()); |
| } |
| bail!("Failed to fetch file, result status: {:?}", res.status()); |
| } |
| |
| /// List objects in `bucket` with matching `prefix`. |
| pub async fn list(&self, bucket: &str, prefix: &str) -> Result<Vec<String>> { |
| self.token_store.list(&self.https, bucket, prefix).await |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use { |
| super::*, |
| crate::token_store::read_boto_refresh_token, |
| std::{fs::read_to_string, path::Path}, |
| }; |
| |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_client_factory_no_auth() { |
| let client_factory = ClientFactory::new(TokenStore::new_without_auth()); |
| let client = client_factory.create_client(); |
| let res = |
| client.stream("for_testing_does_not_exist", "face_test_object").await.expect("stream"); |
| assert_eq!(res.status(), 404); |
| } |
| |
| /// This test relies on a local file which is not present on test bots, so |
| /// it is marked "ignore". |
| /// This can be run with `fx test gcs_lib_test -- --ignored`. |
| #[ignore] |
| #[fuchsia_async::run_singlethreaded(test)] |
| async fn test_client_factory_with_auth() { |
| // Set up authorized client. |
| use home::home_dir; |
| let boto_path = Path::new(&home_dir().expect("home dir")).join(".boto"); |
| let refresh = |
| read_boto_refresh_token(&boto_path).expect("boto file").expect("refresh token"); |
| let auth = |
| TokenStore::new_with_auth(refresh, /*access_token=*/ None).expect("new with auth"); |
| let client_factory = ClientFactory::new(auth); |
| let client = client_factory.create_client(); |
| |
| // Try downloading something that doesn't exist. |
| let res = |
| client.stream("for_testing_does_not_exist", "face_test_object").await.expect("stream"); |
| assert_eq!(res.status(), 404); |
| |
| // Fetch something that does exist. |
| let out_dir = tempfile::tempdir().unwrap(); |
| let out_file = out_dir.path().join("downloaded"); |
| client.fetch("fuchsia-sdk", "development/LATEST_LINUX", &out_file).await.expect("fetch"); |
| assert!(out_file.exists()); |
| let fetched = read_to_string(out_file).expect("read out_file"); |
| assert!(!fetched.is_empty()); |
| |
| // Write the same data. |
| let mut written = Vec::new(); |
| client.write("fuchsia-sdk", "development/LATEST_LINUX", &mut written).await.expect("write"); |
| // The data is expected to be small (less than a KiB). For a non-test |
| // keeping the whole file in memory may be impractical. |
| let written = String::from_utf8(written).expect("streamed string"); |
| assert!(!written.is_empty()); |
| |
| // Compare the fetched and written data. |
| assert_eq!(fetched, written); |
| |
| // Stream the same data. |
| let res = client.stream("fuchsia-sdk", "development/LATEST_LINUX").await.expect("stream"); |
| assert_eq!(res.status(), 200); |
| // The data is expected to be small (less than a KiB). For a non-test |
| // keeping the whole file in memory may be impractical. |
| let streamed_bytes = hyper::body::to_bytes(res.into_body()).await.expect("streamed bytes"); |
| let streamed = String::from_utf8(streamed_bytes.to_vec()).expect("streamed string"); |
| |
| // Compare the fetched and streamed data. |
| assert_eq!(fetched, streamed); |
| } |
| } |