马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
序
本文主要研究一下PowerJob的AliOssService
DFsService
tech/powerjob/server/extension/dfs/DFsService.java
- public interface DFsService {
- /**
- * 存储文件
- * @param storeRequest 存储请求
- * @throws IOException 异常
- */
- void store(StoreRequest storeRequest) throws IOException;
- /**
- * 下载文件
- * @param downloadRequest 文件下载请求
- * @throws IOException 异常
- */
- void download(DownloadRequest downloadRequest) throws IOException;
- /**
- * 获取文件元信息
- * @param fileLocation 文件位置
- * @return 存在则返回文件元信息
- * @throws IOException 异常
- */
- Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException;
- /**
- * 清理 powerjob 认为“过期”的文件
- * 部分存储系统自带生命周期管理(如阿里云OSS,则不需要单独实现该方法)
- * @param bucket bucket
- * @param days 天数,需要清理超过 X 天的文件
- */
- default void cleanExpiredFiles(String bucket, int days) {
- }
- }
复制代码 DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法
AbstractDFsService
tech/powerjob/server/persistence/storage/AbstractDFsService.java
- @Slf4j
- public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean {
- protected ApplicationContext applicationContext;
- public AbstractDFsService() {
- log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName());
- }
- abstract protected void init(ApplicationContext applicationContext);
- protected static final String PROPERTY_KEY = "oms.storage.dfs";
- protected static String fetchProperty(Environment environment, String dfsType, String key) {
- String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key);
- return environment.getProperty(pKey);
- }
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = applicationContext;
- log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName());
- init(applicationContext);
- }
- }
复制代码 AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init
AliOssService
tech/powerjob/server/persistence/storage/impl/AliOssService.java
- @Slf4j
- @Priority(value = Integer.MAX_VALUE - 1)
- @Conditional(AliOssService.AliOssCondition.class)
- public class AliOssService extends AbstractDFsService {
- private static final String TYPE_ALI_OSS = "alioss";
- private static final String KEY_ENDPOINT = "endpoint";
- private static final String KEY_BUCKET = "bucket";
- private static final String KEY_CREDENTIAL_TYPE = "credential_type";
- private static final String KEY_AK = "ak";
- private static final String KEY_SK = "sk";
- private static final String KEY_TOKEN = "token";
- private OSS oss;
- private String bucket;
- private static final int DOWNLOAD_PART_SIZE = 10240;
- private static final String NO_SUCH_KEY = "NoSuchKey";
- //......
- void initOssClient(String endpoint, String bucket, String mode, String ak, String sk, String token) throws Exception {
- log.info("[AliOssService] init OSS by config: endpoint={},bucket={},credentialType={},ak={},sk={},token={}", endpoint, bucket, mode, ak, sk, token);
- if (StringUtils.isEmpty(bucket)) {
- throw new IllegalArgumentException("'oms.storage.dfs.alioss.bucket' can't be empty, please creat a bucket in aliyun oss console then config it to powerjob");
- }
- this.bucket = bucket;
- CredentialsProvider credentialsProvider;
- CredentialType credentialType = CredentialType.parse(mode);
- switch (credentialType) {
- case PWD:
- credentialsProvider = new DefaultCredentialProvider(ak, sk, token);
- break;
- case SYSTEM_PROPERTY:
- credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider();
- break;
- default:
- credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
- }
- this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider);
- log.info("[AliOssService] initialize successfully, THIS_WILL_BE_THE_STORAGE_LAYER.");
- }
- //......
- }
复制代码 AliOssService继承了AbstractDFsService
store
- @Override
- public void store(StoreRequest storeRequest) throws IOException {
- ObjectMetadata objectMetadata = new ObjectMetadata();
- PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, parseFileName(storeRequest.getFileLocation()), storeRequest.getLocalFile(), objectMetadata);
- oss.putObject(putObjectRequest);
- }
复制代码 store方法创建PutObjectRequest,使用oss.putObject举行上传
download
- @Override
- public void download(DownloadRequest downloadRequest) throws IOException {
- FileLocation dfl = downloadRequest.getFileLocation();
- DownloadFileRequest downloadFileRequest = new DownloadFileRequest(bucket, parseFileName(dfl), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE);
- try {
- FileUtils.forceMkdirParent(downloadRequest.getTarget());
- oss.downloadFile(downloadFileRequest);
- } catch (Throwable t) {
- ExceptionUtils.rethrow(t);
- }
- }
复制代码 download方法则根据DownloadRequest指定的FileLocation创建DownloadFileRequest,然后通过oss.downloadFile(downloadFileRequest)举行下载
fetchFileMeta
- @Override
- public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException {
- try {
- ObjectMetadata objectMetadata = oss.getObjectMetadata(bucket, parseFileName(fileLocation));
- return Optional.ofNullable(objectMetadata).map(ossM -> {
- Map<String, Object> metaInfo = Maps.newHashMap();
- metaInfo.putAll(ossM.getRawMetadata());
- if (ossM.getUserMetadata() != null) {
- metaInfo.putAll(ossM.getUserMetadata());
- }
- return new FileMeta()
- .setLastModifiedTime(ossM.getLastModified())
- .setLength(ossM.getContentLength())
- .setMetaInfo(metaInfo);
- });
- } catch (OSSException oe) {
- String errorCode = oe.getErrorCode();
- if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) {
- return Optional.empty();
- }
- ExceptionUtils.rethrow(oe);
- }
- return Optional.empty();
- }
复制代码 fetchFileMeta方法通过oss.getObjectMetadata获取ObjectMetadata
cleanExpiredFiles
- @Override
- public void cleanExpiredFiles(String bucket, int days) {
- /*
- 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
- 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
- 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54
- */
- }
复制代码 cleanExpiredFiles则是空操纵
init
- protected void init(ApplicationContext applicationContext) {
- Environment environment = applicationContext.getEnvironment();
- String endpoint = fetchProperty(environment, TYPE_ALI_OSS, KEY_ENDPOINT);
- String bkt = fetchProperty(environment, TYPE_ALI_OSS, KEY_BUCKET);
- String ct = fetchProperty(environment, TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE);
- String ak = fetchProperty(environment, TYPE_ALI_OSS, KEY_AK);
- String sk = fetchProperty(environment, TYPE_ALI_OSS, KEY_SK);
- String token = fetchProperty(environment, TYPE_ALI_OSS, KEY_TOKEN);
- try {
- initOssClient(endpoint, bkt, ct, ak, sk, token);
- } catch (Exception e) {
- ExceptionUtils.rethrow(e);
- }
- }
复制代码 init则是通过environment获取相关属性,然后执行initOssClient
小结
DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法;AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init;AliOssService继承了AbstractDFsService,通过ossClient实现了store、download、fetchFileMeta方法。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |