IT评测·应用市场-qidao123.com技术社区

标题: 聊聊PowerJob的AliOssService [打印本页]

作者: 自由的羽毛    时间: 2024-6-11 19:15
标题: 聊聊PowerJob的AliOssService


本文主要研究一下PowerJob的AliOssService
DFsService

tech/powerjob/server/extension/dfs/DFsService.java
  1. public interface DFsService {
  2.     /**
  3.      * 存储文件
  4.      * @param storeRequest 存储请求
  5.      * @throws IOException 异常
  6.      */
  7.     void store(StoreRequest storeRequest) throws IOException;
  8.     /**
  9.      * 下载文件
  10.      * @param downloadRequest 文件下载请求
  11.      * @throws IOException 异常
  12.      */
  13.     void download(DownloadRequest downloadRequest) throws IOException;
  14.     /**
  15.      * 获取文件元信息
  16.      * @param fileLocation 文件位置
  17.      * @return 存在则返回文件元信息
  18.      * @throws IOException 异常
  19.      */
  20.     Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException;
  21.     /**
  22.      * 清理 powerjob 认为“过期”的文件
  23.      * 部分存储系统自带生命周期管理(如阿里云OSS,则不需要单独实现该方法)
  24.      * @param bucket bucket
  25.      * @param days 天数,需要清理超过 X 天的文件
  26.      */
  27.     default void cleanExpiredFiles(String bucket, int days) {
  28.     }
  29. }
复制代码
  DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法
  AbstractDFsService

tech/powerjob/server/persistence/storage/AbstractDFsService.java
  1. @Slf4j
  2. public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean {
  3.     protected ApplicationContext applicationContext;
  4.     public AbstractDFsService() {
  5.         log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName());
  6.     }
  7.     abstract protected void init(ApplicationContext applicationContext);
  8.     protected static final String PROPERTY_KEY = "oms.storage.dfs";
  9.     protected static String fetchProperty(Environment environment, String dfsType, String key) {
  10.         String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key);
  11.         return environment.getProperty(pKey);
  12.     }
  13.     @Override
  14.     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  15.         this.applicationContext = applicationContext;
  16.         log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName());
  17.         init(applicationContext);
  18.     }
  19. }
复制代码
  AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init
  AliOssService

tech/powerjob/server/persistence/storage/impl/AliOssService.java
  1. @Slf4j
  2. @Priority(value = Integer.MAX_VALUE - 1)
  3. @Conditional(AliOssService.AliOssCondition.class)
  4. public class AliOssService extends AbstractDFsService {
  5.     private static final String TYPE_ALI_OSS = "alioss";
  6.     private static final String KEY_ENDPOINT = "endpoint";
  7.     private static final String KEY_BUCKET = "bucket";
  8.     private static final String KEY_CREDENTIAL_TYPE = "credential_type";
  9.     private static final String KEY_AK = "ak";
  10.     private static final String KEY_SK = "sk";
  11.     private static final String KEY_TOKEN = "token";
  12.     private OSS oss;
  13.     private String bucket;
  14.     private static final int DOWNLOAD_PART_SIZE = 10240;
  15.     private static final String NO_SUCH_KEY = "NoSuchKey";
  16.     //......
  17.     void initOssClient(String endpoint, String bucket, String mode, String ak, String sk, String token) throws Exception {
  18.         log.info("[AliOssService] init OSS by config: endpoint={},bucket={},credentialType={},ak={},sk={},token={}", endpoint, bucket, mode, ak, sk, token);
  19.         if (StringUtils.isEmpty(bucket)) {
  20.             throw new IllegalArgumentException("'oms.storage.dfs.alioss.bucket' can't be empty, please creat a bucket in aliyun oss console then config it to powerjob");
  21.         }
  22.         this.bucket = bucket;
  23.         CredentialsProvider credentialsProvider;
  24.         CredentialType credentialType = CredentialType.parse(mode);
  25.         switch (credentialType) {
  26.             case PWD:
  27.                 credentialsProvider = new DefaultCredentialProvider(ak, sk, token);
  28.                 break;
  29.             case SYSTEM_PROPERTY:
  30.                 credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider();
  31.                 break;
  32.             default:
  33.                 credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
  34.         }
  35.         this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider);
  36.         log.info("[AliOssService] initialize successfully, THIS_WILL_BE_THE_STORAGE_LAYER.");
  37.     }
  38.     //......   
  39. }   
复制代码
  AliOssService继承了AbstractDFsService
  store

  1.     @Override
  2.     public void store(StoreRequest storeRequest) throws IOException {
  3.         ObjectMetadata objectMetadata = new ObjectMetadata();
  4.         PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, parseFileName(storeRequest.getFileLocation()), storeRequest.getLocalFile(), objectMetadata);
  5.         oss.putObject(putObjectRequest);
  6.     }
复制代码
  store方法创建PutObjectRequest,使用oss.putObject举行上传
  download

  1.     @Override
  2.     public void download(DownloadRequest downloadRequest) throws IOException {
  3.         FileLocation dfl = downloadRequest.getFileLocation();
  4.         DownloadFileRequest downloadFileRequest = new DownloadFileRequest(bucket, parseFileName(dfl), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE);
  5.         try {
  6.             FileUtils.forceMkdirParent(downloadRequest.getTarget());
  7.             oss.downloadFile(downloadFileRequest);
  8.         } catch (Throwable t) {
  9.             ExceptionUtils.rethrow(t);
  10.         }
  11.     }
复制代码
  download方法则根据DownloadRequest指定的FileLocation创建DownloadFileRequest,然后通过oss.downloadFile(downloadFileRequest)举行下载
  fetchFileMeta

  1.     @Override
  2.     public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
  3.         try {
  4.             ObjectMetadata objectMetadata = oss.getObjectMetadata(bucket, parseFileName(fileLocation));
  5.             return Optional.ofNullable(objectMetadata).map(ossM -> {
  6.                 Map<String, Object> metaInfo = Maps.newHashMap();
  7.                 metaInfo.putAll(ossM.getRawMetadata());
  8.                 if (ossM.getUserMetadata() != null) {
  9.                     metaInfo.putAll(ossM.getUserMetadata());
  10.                 }
  11.                 return new FileMeta()
  12.                         .setLastModifiedTime(ossM.getLastModified())
  13.                         .setLength(ossM.getContentLength())
  14.                         .setMetaInfo(metaInfo);
  15.             });
  16.         } catch (OSSException oe) {
  17.             String errorCode = oe.getErrorCode();
  18.             if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) {
  19.                 return Optional.empty();
  20.             }
  21.             ExceptionUtils.rethrow(oe);
  22.         }
  23.         return Optional.empty();
  24.     }
复制代码
  fetchFileMeta方法通过oss.getObjectMetadata获取ObjectMetadata
  cleanExpiredFiles

  1.     @Override
  2.     public void cleanExpiredFiles(String bucket, int days) {
  3.         /*
  4.         阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
  5.         阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
  6.         阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
  7.          */
  8.     }
复制代码
  cleanExpiredFiles则是空操纵
  init

  1.     protected void init(ApplicationContext applicationContext) {
  2.         Environment environment = applicationContext.getEnvironment();
  3.         String endpoint = fetchProperty(environment, TYPE_ALI_OSS, KEY_ENDPOINT);
  4.         String bkt = fetchProperty(environment, TYPE_ALI_OSS, KEY_BUCKET);
  5.         String ct = fetchProperty(environment, TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);
  6.         String ak = fetchProperty(environment, TYPE_ALI_OSS, KEY_AK);
  7.         String sk = fetchProperty(environment, TYPE_ALI_OSS, KEY_SK);
  8.         String token = fetchProperty(environment, TYPE_ALI_OSS, KEY_TOKEN);
  9.         try {
  10.             initOssClient(endpoint, bkt, ct, ak, sk, token);
  11.         } catch (Exception e) {
  12.             ExceptionUtils.rethrow(e);
  13.         }
  14.     }
复制代码
  init则是通过environment获取相关属性,然后执行initOssClient
  小结

DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法;AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init;AliOssService继承了AbstractDFsService,通过ossClient实现了store、download、fetchFileMeta方法。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4