文盘Rust —— rust连接oss | 京东云技术团队

瑞星  金牌会员 | 2023-5-10 10:06:03 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 533|帖子 533|积分 1599

作者:京东科技 贾世闻
对象存储是云的基础组件之一,各大云厂商都有相关产品。这里跟大家介绍一下rust与对象存储交到的基本套路和其中的一些技巧。
基本连接

我们以 [S3 sdk](
https://github.com/awslabs/aws-sdk-rust)为例来说说基本的连接与操作,作者验证过aws、京东云、阿里云。主要的增删改查功能没有什么差别。

  • 建立客户端
  1. let shared_config = SdkConfig::builder()
  2.          .credentials_provider(SharedCredentialsProvider::new(Credentials::new(
  3.             "LTAI5t7NPuPKsXm6UeSa1",
  4.             "DGHuK03ESXQYqQ83buKMHs9NAwz",
  5.              None,
  6.              None,
  7.              "Static",
  8.          )))
  9.          .endpoint_url("http://oss-cn-beijing.aliyuncs.com")
  10.          .region(Region::new("oss-cn-beijing"))
  11.          .build();
  12.      let s3_config_builder = aws_sdk_s3::config::Builder::from(&shared_config);
  13.      let client = aws_sdk_s3::Client::from_conf(s3_config_builder.build());
复制代码
建立Client所需要的参数主要有你需要访问的oss的AK、SK,endpoint url 以及服务所在的区域。以上信息都可以在服务商的帮助文档查询到。

  • 对象列表
  1. let mut obj_list = client
  2.      .list_objects_v2()
  3.      .bucket(bucket)
  4.      .max_keys(max_keys)
  5.      .prefix(prefix_str)
  6.      .continuation_token(token_str);
  7. let list = obj_list.send().await.unwrap();
  8. println!("{:?}",list.contents());
  9. println!("{:?}",list.next_continuation_token());
复制代码
使用list_objects_v2函数返回对象列表,相比list_objects函数,list_objects_v2可以通过continuation_token和max_keys控制返回列表的长度。list.contents()返回对象列表数组,
list.next_continuation_token()返回继续查询的token。

  • 上传文件
  1. let content = ByteStream::from("content in file".as_bytes());
  2. let exp = aws_smithy_types::DateTime::from_secs(100);
  3. let upload = client
  4.     .put_object()
  5.     .bucket("bucket")
  6.     .key("/test/key")
  7.     .expires(exp)
  8.     .body(content);
  9. upload.send().await.unwrap();
复制代码
指定bucket及对象路径,body接受ByteStream类型作为文件内容,最后设置过期时间expires,无过期时间时不指定该配置即可。

  • 下载文件
  1. let key = "/tmp/test/key".to_string();
  2. let resp = client
  3.     .get_object()
  4.     .bucket("bucket")
  5.     .key(&key)
  6.     .send()
  7.     .await.unwrap();
  8. let data = resp.body.collect().await.unwrap();
  9. let bytes = data.into_bytes();
  10. let path = std::path::Path::new("/tmp/key")
  11. if let Some(p) = path.parent() {
  12.     std::fs::create_dir_all(p).unwrap();
  13. }
  14. let mut file = OpenOptions::new()
  15.     .write(true)
  16.     .truncate(true)
  17.     .create(true)
  18.     .open(path).unwrap();
  19. let _ = file.write(&*bytes);
  20. file.flush().unwrap();
复制代码
通过get_object()函数获取GetObjectOutput。返回值的body 就是文件内容,将 body 转换为 bytes,最后打开文件写入即可。

  • 删除文件
  1. let mut keys = vec![];
  2. let key1 = ObjectIdentifier::builder()
  3.     .set_key(Some("/tmp/key1".to_string()))
  4.     .build();
  5. let key2 = ObjectIdentifier::builder()
  6.     .set_key(Some("/tmp/key2".to_string()))
  7.     .build()
  8. keys.push(key1);
  9. keys.push(key2)
  10. client
  11.     .delete_objects()
  12.     .bucket(bucket)
  13.     .delete(Delete::builder().set_objects(Some(keys)).build())
  14.     .send()
  15.     .await
  16.     .unwrap();
复制代码
delete_objects 批量删除对象。首先构建keys vector,定义要删除的对象,然后通过Delete::builder(),构建 Delete model。
大文件上传
  1. let mut file = fs::File::open("/tmp/file_name").unwrap();
  2. let chunk_size = 1024*1024;
  3. let mut part_number = 0;
  4. let mut upload_parts: Vec = Vec::new();
  5. //获取上传id
  6. let multipart_upload_res: CreateMultipartUploadOutput = self
  7.     .client
  8.     .create_multipart_upload()
  9.     .bucket("bucket")
  10.     .key("/tmp/key")
  11.     .send()
  12.     .await.unwrap();
  13. let upload_id = match multipart_upload_res.upload_id() {
  14.     Some(id) => id,
  15.     None => {
  16.         return Err(anyhow!("upload id is None"));
  17.     }
  18. };
  19. //分段上传文件并记录completer_part
  20. loop {
  21.     let mut buf = vec![0; chuck_size];
  22.     let read_count = file.read(&mut buf)?;
  23.     part_number += 1;
  24.     if read_count == 0 {
  25.         break;
  26.     }
  27.     let body = &buf[..read_count];
  28.     let stream = ByteStream::from(body.to_vec());
  29.     let upload_part_res = self
  30.         .client
  31.         .upload_part()
  32.         .key(key)
  33.         .bucket(bucket)
  34.         .upload_id(upload_id)
  35.         .body(stream)
  36.         .part_number(part_number)
  37.         .send()
  38.         .await.unwrap();
  39.     let completer_part = CompletedPart::builder()
  40.         .e_tag(upload_part_res.e_tag.unwrap_or_default())
  41.         .part_number(part_number)
  42.         .build();
  43.     upload_parts.push(completer_part);
  44.     if read_count != chuck_size {
  45.         break;
  46.     }
  47. }
  48. // 完成上传文件合并
  49. let completed_multipart_upload: CompletedMultipartUpload =
  50.     CompletedMultipartUpload::builder()
  51.         .set_parts(Some(upload_parts))
  52.         .build();
  53. let _complete_multipart_upload_res = self
  54.     .client
  55.     .complete_multipart_upload()
  56.     .bucket("bucket")
  57.     .key(key)
  58.     .multipart_upload(completed_multipart_upload)
  59.     .upload_id(upload_id)
  60.     .send()
  61.     .await.unwrap();
复制代码
有时候面对大文件,比如几百兆甚至几个G的文件,为了节约带宽和内存,我才采取分段上传的方案,然后在对象存储的服务端做合并。基本流程是:指定bucket和key,获取一个上传id;按流读取文件,分段上传字节流,并记录CompletedPart;通知服务器按照CompletedPart 集合来合并文件。具体过程代码已加注释,这里不再累述。
大文件下载
  1. let mut file = match OpenOptions::new()
  2.             .truncate(true)
  3.             .create(true)
  4.             .write(true)
  5.             .open("/tmp/target_file");
  6. let key = "/tmp/test/key".to_string();
  7. let resp = client
  8.     .get_object()
  9.     .bucket("bucket")
  10.     .key(&key)
  11.     .send()
  12.     .await.unwrap();
  13. let content_len = resp.content_length();
  14. let mut byte_stream_async_reader = resp.body.into_async_read();
  15. let mut content_len_usize: usize = content_len.try_into().unwrap();
  16. loop {
  17.     if content_len_usize > chunk_size {
  18.         let mut buffer = vec![0; chunk_size];
  19.         let _ = byte_stream_async_reader.read_exact(&mut buffer).await.unwrap();
  20.         file.write_all(&buffer).unwrap();
  21.         content_len_usize -= chunk_size;
  22.         continue;
  23.     } else {
  24.         let mut buffer = vec![0; content_len_usize];
  25.         let _ = byte_stream_async_reader.read_exact(&mut buffer).await.unwrap();
  26.         file.write_all(&buffer).unwrap();
  27.         break;
  28.     }
  29. }
  30. file.flush().unwrap();
复制代码
在从对象存储服务端下载文件的过程中也会遇到大文件问题。为了节约带宽和内存,我们采取读取字节流的方式分段写入文件。首先get_object()函数获取ByteStream,通过async_reader流式读取对象字节,分段写入文件。
对象存储的相关话题今天先聊到这儿,下期见。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

瑞星

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表