Apache atlas 元数据管理治理平台利用和架构

打印 上一主题 下一主题

主题 653|帖子 653|积分 1959

1、媒介

Apache Atlas 是托管于 Apache 旗下的一款元数据管理和治理的产物,如今在大数据领域应用颇为广泛,可以很好的帮助企业管理数据资产,并对这些资产进行分类和治理,为数据分析,数据治理提供高质量的元数据信息。
随着企业业务量的渐渐膨胀,数据日益增多,不同业务线的数据大概在多种类型的数据库中存储,最终汇集到企业的数据仓库中进行整合分析,这个时候如果想要追踪数据来源,理清数据之间的关系将会是一件异常头疼的事情,倘若某个环节出了题目,追溯的成本将是巨大的,于是 Atlas 在这种背景下应运而生了,通过它,我们可以非常方便的管理元数据,并且可以追溯表级别,列级别之间的关系(血缘关系),为企业的数据资产提供强有力的支撑和保障。Atlas 支持从 HBase 、Hive、Sqoop、Storm、Kafka 中提取和管理元数据,同时也可以通过 Rest Api 的方式自行界说元数据模型,生成元数据。
本文我们着重先容一下 Atlas 的相关概念,帮助大家更好的明白 Atlas,同时详细解说怎样通过 Rest Api 的方式自界说数据模型,生成血缘关系,以便开发自己的个性化需求。
2、Atlas 原理及相关概念

元数据

元数据着实就是描述数据的数据,好比表,字段,视图等,每个业务系统大概都会自己界说表,字段,视图,这些数据从哪来到往哪去,数据之间是否存在关联,和其他系统的数据是否存在重复和抵牾字段,这些就是元数据管理要办理的题目,也是 Atlas 要办理的题目。
运行原理

Atlas 的原理着实并不难明白,主要是通过内部提供的脚本读取数仓中的数据库结构,生成数据模型,存储到 Atlas的 Hbase 中,同时通过 hook 的方式监听数仓中的数据变化,分析执行的 sql 语句,从而生成表与表,列与列的血缘关系依赖,在前台展示给用户查看。
数仓支持

Atlas 对 Hive 支持最好,我们都知道,Hive 是依赖于 Hadoop 的,数据存储在 HDFS 中,Atlas 有专门的 shell 脚本可以直接运行读取 Hive 的表结构等元数据信息同步到 Atlas 的存储库中,主动生成元数据模型,同时 Atlas 提供的 HiveHook 可以监听 Hive 的数据变化,根据 Hive 执行的 sql 推断出数据与数据之间的关系,生成血缘关系图,如果我们想要分析其他数据存储介质的元数据和血缘关系,Atlas 的支持并不是很理想。但通常情况下,我们会定时把业务库如 mysql,oracle 中的数据定时同步到数仓中整合分析,而数仓我们一样平常都会采用 Hadoop 的生态体系,所以这一点并不是题目。
架构图解

以下是 Atlas 的架构图解,可以看出,Atlas 所依赖的生态体系是异常庞大的,这也直接导致了它摆设起来十分繁琐,本文不再解说 Atlas 的摆设,网上相关的教程有很多,感爱好的朋友可以自己搜刮尝试。

核心组件概念

Atlas 中主要有以下核心组件,这些需要我们着重相识,接下来我们通过 Rest Api 自界说建模着实就是对以下组件的增删查改操纵。
1. Type
元数据类型界说,这里可以是数据库,表,列等,还可以细分 mysql 表( mysql_table ),oracle 表( oracle_table )等,atlas自带了很多类型,如 DataSet,Process 等,一样平常情况下,数据相关的类型在界说类型的时候都会继续 DataSet,而流程相关的类型则会继续 Process,便于生成血缘关系。我们也可以通过调用 api 自界说类型。这是一切的起点,界说完类型之后,才能生成不同类型的元数据实体,生成血缘关系,我个人更喜欢把元数据类型称之为建模。
2. Classification
分类,通俗点就是给元数据打标签,分类是可以传递的,好比 A 视图是基于 A 表生成的,那么如果 A 表打上了 a 这个标签,A 视图也会主动打上 a 标签,这样的好处就是便于数据的追踪。
3. Entity
实体,表现具体的元数据,Atlas 管理的对象就是各种 Type 的 Entity。
4. Lineage
数据血缘,表现数据之间的传递关系,通过 Lineage 我们可以清楚的知道数据的从何而来又流向何处,中间履历了哪些操纵,这样一旦数据出现题目,可以迅速追溯,定位是哪个环节出现错误。
3、Altas安装

(参考链接,请结合现原形况利用)
1、https://blog.csdn.net/hshudoudou/article/details/123899947
2、https://blog.csdn.net/javaThanksgiving/article/details/130505251
4、Altas利用

Altas 成功摆设之后,利用还是很简单的,这是登录界面,用户名暗码默认是 admin,admin:
进入主页,点击右上角 switch to new ,利用新版界面,更直观:


页面左侧便是 Atlas 的类型树,点击树节点的某个类型,可以查看下面的实体,这里我们点击 mysql_table:
可以看到下面有很多表,这些都是我之前自己利用 Rest Api 上传界说的。
下面我们来解说一下怎样通过 Rest Api 的方式自界说类型,生成实体,创建血缘关系。
5、Atlas Rest Api 详解及示例

我们点击主页上方的 Help-》API Documentation,便可以查看 Atlas 全部的开放接口:

有一点我们需要留意,Atlas 的接口在利用的时候是需要鉴权的,所以我们构建 http 请求的时候需要带上用户名和暗码认证信息,本次示例中我们利用 atlas-client-v2 开源组件来进行 Atlas 的 api 调用。
本次示例我们界说一个 my_db 类型,my_table 类型,并且让 my_db 一对多 my_table,然后创建 test_db 实体到 my_db 下,创建 test_table_source 和 test_table_target 实体到 my_table 下,并且界说 test_table_target 的数据来自 test_table_source,生成两个实体的血缘关系依赖。
自界说 my_db 和 my_table 类型

我们对 my_db 和 my_table 类型进行界说,在 Atlas 的Rest Api 中,允许一个请求界说多种类型,在这里我们先构建 json 请求体,然后再通过编码方式实现,二者对比,更容易明白,json 请求体如下(关键地方有注释):
  1. {
  2.   "enumDefs": [],
  3.   "structDefs": [],
  4.   "classificationDefs": [],
  5.   //类型定义
  6.   "entityDefs": [
  7.     {
  8.       "name": "my_db",
  9.       //数据类型的定义,约定俗成,继承Atlas自带的DataSet
  10.       "superTypes": [
  11.         "DataSet"
  12.       ],
  13.       //服务类型(便于在界面分组显示类型)
  14.       "serviceType": "my_type",
  15.       "typeVersion": "1.1",
  16.       "attributeDefs": []
  17.     },
  18.     {
  19.       "name": "my_table",
  20.       "superTypes": [
  21.         "DataSet"
  22.       ],
  23.       "serviceType": "my_type",
  24.       "typeVersion": "1.1",
  25.       "attributeDefs": []
  26.     }
  27.   ],
  28.   //定义类型之间的关系
  29.   "relationshipDefs": [
  30.     {
  31.       "name": "my_table_db",
  32.       "serviceType": "my_type",
  33.       "typeVersion": "1.1",
  34.       //关系类型:ASSOCIATION:关联关系,没有容器存在,1对1
  35.       //AGGREGATION:容器关系,1对多,而且彼此可以相互独立存在
  36.       //COMPOSITION:容器关系,1对多,但是容器中的实例不能脱离容器存在
  37.       "relationshipCategory": "AGGREGATION",
  38.       //节点一
  39.       "endDef1": {
  40.         "type": "my_table",
  41.         //表中关联的属性名称,对应下面的 my_db
  42.         "name": "db",
  43.         //代表这头是不是容器
  44.         "isContainer": false,
  45.         //cardinality: 三种类型SINGLE, LIST, SET
  46.         "cardinality": "SINGLE"
  47.       },
  48.       // 节点2
  49.       "endDef2": {
  50.         "type": "my_db",
  51.         "name": "tables",
  52.         "isContainer": true,
  53.         // db 包含 table,table不能重复,所以类型设置为 SET
  54.         "cardinality": "SET"
  55.       },
  56.       // 推导tag NONE 不推导
  57.       "propagateTags": "NONE"
  58.     }
  59.   ]
  60. }
复制代码
编码实现:
引入 pom 依赖,留意,如果要集成到自己的业务系统之中,业务系统如果利用了其他的日记框架,需要去除 slf4j-log4j12 依赖,否则日记框架会起辩论,导致启动失败,另外 atlas-client-common 中依赖了 commons-configuration 1.10,如果业务系统中有低版本依赖,记得清除,不然二者会辩论,导致 client 初始化失败。
  1. <dependencies>
  2.         <!-- Apache Atlas -->
  3.         <dependency>
  4.             <groupId>org.apache.atlas</groupId>
  5.             <artifactId>atlas-client-common</artifactId>
  6.             <version>2.1.0</version>
  7.             <exclusions>
  8.                 <exclusion>
  9.                     <artifactId>slf4j-log4j12</artifactId>
  10.                     <groupId>org.slf4j</groupId>
  11.                 </exclusion>
  12.             </exclusions>
  13.         </dependency>
  14.         <!-- Apache Atlas Client  Version2 -->
  15.         <dependency>
  16.             <groupId>org.apache.atlas</groupId>
  17.             <artifactId>atlas-client-v2</artifactId>
  18.             <version>2.1.0</version>
  19.             <exclusions>
  20.                 <exclusion>
  21.                     <artifactId>slf4j-log4j12</artifactId>
  22.                     <groupId>org.slf4j</groupId>
  23.                 </exclusion>
  24.                 <exclusion>
  25.                     <artifactId>log4j</artifactId>
  26.                     <groupId>log4j</groupId>
  27.                 </exclusion>
  28.             </exclusions>
  29.         </dependency>
  30.         <dependency>
  31.             <groupId>com.alibaba</groupId>
  32.             <artifactId>fastjson</artifactId>
  33.             <version>${fastjson.version}</version>
  34.         </dependency>
  35.     </dependencies>
复制代码
引入 atlas-application.properties(必须得有,否则会初始化失败):
atlas.rest.address=http://127.0.0.1:21000
代码实现如下(对照json非常容易明白):
  1.                 AtlasClientV2 atlasClientV2 = new AtlasClientV2(new String[]{"http://127.0.0.1:21000"}, new String[]{"admin", "admin"});
  2.                 //父类集合
  3.                 Set<String> superTypes = new HashSet<>();
  4.                 superTypes.add(AtlasBaseTypeDef.ATLAS_TYPE_DATASET);
  5.                 //定义myType
  6.                 AtlasTypesDef myType = new AtlasTypesDef();
  7.                 //定义myDb
  8.                 AtlasEntityDef myDb = new AtlasEntityDef();
  9.                 myDb.setName("my_db");
  10.                 myDb.setServiceType("my_type");
  11.                 myDb.setSuperTypes(superTypes);
  12.                 myDb.setTypeVersion("1.1");
  13.                 //定义mytable
  14.                 AtlasEntityDef myTable = new AtlasEntityDef();
  15.                 myTable.setName("my_table");
  16.                 myTable.setServiceType("my_type");
  17.                 myTable.setSuperTypes(superTypes);
  18.                 myTable.setTypeVersion("1.1");
  19.                 //定义relationshipDef
  20.                 AtlasRelationshipDef relationshipDef = new AtlasRelationshipDef();
  21.                 relationshipDef.setName("my_table_db");
  22.                 relationshipDef.setServiceType("my_type");
  23.                 relationshipDef.setTypeVersion("1.1");
  24.                 relationshipDef.setRelationshipCategory(AtlasRelationshipDef.RelationshipCategory.AGGREGATION);
  25.                 relationshipDef.setPropagateTags(AtlasRelationshipDef.PropagateTags.NONE);
  26.                 //定义endDef1
  27.                 AtlasRelationshipEndDef endDef1 = new AtlasRelationshipEndDef();
  28.                 endDef1.setType("my_table");
  29.                 endDef1.setName("db");
  30.                 endDef1.setIsContainer(false);
  31.                 endDef1.setCardinality(AtlasStructDef.AtlasAttributeDef.Cardinality.SINGLE);
  32.                 relationshipDef.setEndDef1(endDef1);
  33.                 //定义endDef2
  34.                 AtlasRelationshipEndDef endDef2 = new AtlasRelationshipEndDef();
  35.                 endDef2.setType("my_db");
  36.                 endDef2.setName("tables");
  37.                 endDef2.setIsContainer(true);
  38.                 endDef2.setCardinality(AtlasStructDef.AtlasAttributeDef.Cardinality.SET);
  39.                 relationshipDef.setEndDef2(endDef2);
  40.                 //entityDefs
  41.                 List<AtlasEntityDef> entityDefs = new ArrayList<>(2);
  42.                 entityDefs.add(myDb);
  43.                 entityDefs.add(myTable);
  44.                 myType.setEntityDefs(entityDefs);
  45.                 //relationshipDefs
  46.                 List<AtlasRelationshipDef> relationshipDefs = new ArrayList<>(1);
  47.                 relationshipDefs.add(relationshipDef);
  48.                 myType.setRelationshipDefs(relationshipDefs);
  49.                 //查询是否已有my_db类型,没有则创建
  50.                 SearchFilter filter = new SearchFilter();
  51.                 filter.setParam("name", "my_db");
  52.                 AtlasTypesDef allTypeDefs = atlasClientV2.getAllTypeDefs(filter);
  53.                 if (allTypeDefs.getEntityDefs().isEmpty()) {
  54.                         //请求 rest api
  55.                         atlasClientV2.createAtlasTypeDefs(myType);
  56.                 }
复制代码
执行以上代码,执行完毕后,前去 Atlas 主页查看,类型已成功创建:
查看类型模型图:
类型创建完毕,接下来我们进行实体的创建。
创建实体 test_db,test_table_source 和 test_table_target
json 如下:
  1. //my_db 实体
  2. {
  3.         "typeName": "my_db",
  4.         "attributes": {
  5.                 "qualifiedName": "test_db",
  6.                 "name": "test_db",
  7.                 "description": "测试创建db"
  8.         }
  9. }
  10. //test_table_source 实体
  11. {
  12.         "typeName": "my_table",
  13.         "attributes": {
  14.                 "qualifiedName": "test_table_source",
  15.                 "name": "test_table_source",
  16.                 "description": "测试创建test_table_source"
  17.         },
  18.         "relationshipAttributes": {
  19.                 "db": {
  20.                         "typeName": "my_db",
  21.                         //my_db的guid(创建完my_db后会返回)
  22.                         "guid": "xxxx"
  23.                 }
  24.         }
  25. }
  26. //test_table_target 实体
  27. {
  28.         "typeName": "my_table",
  29.         "attributes": {
  30.                 "qualifiedName": "test_table_target",
  31.                 "name": "test_table_target",
  32.                 "description": "测试创建test_table_target"
  33.         },
  34.         "relationshipAttributes": {
  35.                 "db": {
  36.                         "typeName": "my_db",
  37.                         "guid": "xxx"
  38.                 }
  39.         }
  40. }
复制代码
代码实现如下:
  1.                 //创建实体 test_db
  2.                 AtlasEntity testDb = new AtlasEntity();
  3.                 testDb.setTypeName("my_db");
  4.                 Map<String, Object> attributes = new HashMap<>();
  5.                 attributes.put("qualifiedName", "test_db");
  6.                 attributes.put("name", "test_db");
  7.                 attributes.put("description", "测试创建db");
  8.                 testDb.setAttributes(attributes);
  9.                 Map<String, String> queryAttributes = new HashMap<>();
  10.                 queryAttributes.put("qualifiedName", "test_db");
  11.                 String myDbGuid = null;
  12.                 try {
  13.                         //查询不到会报错
  14.                         AtlasEntity.AtlasEntityWithExtInfo extInfo = atlasClientV2.getEntityByAttribute("my_db", queryAttributes);
  15.                         myDbGuid = extInfo.getEntity().getGuid();
  16.                 } catch (AtlasServiceException e) {
  17.                         if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
  18.                                 AtlasEntity.AtlasEntityWithExtInfo extInfo = new AtlasEntity.AtlasEntityWithExtInfo(testDb);
  19.                                 //请求
  20.                                 EntityMutationResponse response = atlasClientV2.createEntity(extInfo);
  21.                                 myDbGuid = response.getGuidAssignments().values().toArray(new String[]{})[0];
  22.                         }
  23.                 }
  24.                 //创建与db的关系
  25.                 Map<String, Object> relationShipAttr = new HashMap<>();
  26.                 Map<String, String> dbMap = new HashMap<>();
  27.                 dbMap.put("guid", myDbGuid);
  28.                 dbMap.put("typeName", "my_db");
  29.                 relationShipAttr.put("db", dbMap);
  30.                 //创建实体 test_table_source
  31.                 AtlasEntity testTableSource = new AtlasEntity();
  32.                 testTableSource.setTypeName("my_table");
  33.                 attributes.put("qualifiedName", "test_table_source");
  34.                 attributes.put("name", "test_table_source");
  35.                 attributes.put("description", "测试创建test_table_source");
  36.                 testTableSource.setAttributes(attributes);
  37.                 testTableSource.setRelationshipAttributes(relationShipAttr);
  38.                 queryAttributes.put("qualifiedName", "test_table_source");
  39.                 try {
  40.                         //atlasClientV2.updateEntity(new AtlasEntity.AtlasEntityWithExtInfo(testTableSource));
  41.                         AtlasEntity.AtlasEntityWithExtInfo extInfo = atlasClientV2.getEntityByAttribute("my_table", queryAttributes);
  42.                         testTableSource = extInfo.getEntity();
  43.                 } catch (AtlasServiceException e) {
  44.                         if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
  45.                                 AtlasEntity.AtlasEntityWithExtInfo extInfo = new AtlasEntity.AtlasEntityWithExtInfo(testTableSource);
  46.                                 //请求
  47.                                 EntityMutationResponse response = atlasClientV2.createEntity(extInfo);
  48.                                 testTableSource.setGuid(response.getGuidAssignments().values().toArray(new String[]{})[0]);
  49.                         }
  50.                 }
  51.                 //创建实体 test_table_target
  52.                 AtlasEntity testTableTarget = new AtlasEntity();
  53.                 testTableTarget.setTypeName("my_table");
  54.                 attributes.put("qualifiedName", "test_table_target");
  55.                 attributes.put("name", "test_table_target");
  56.                 attributes.put("description", "测试创建test_table_target");
  57.                 testTableTarget.setAttributes(attributes);
  58.                 testTableTarget.setRelationshipAttributes(relationShipAttr);
  59.                 queryAttributes.put("qualifiedName", "test_table_target");
  60.                 try {
  61.                         //atlasClientV2.updateEntity(new AtlasEntity.AtlasEntityWithExtInfo(testTableTarget));
  62.                         AtlasEntity.AtlasEntityWithExtInfo extInfo = atlasClientV2.getEntityByAttribute("my_table", queryAttributes);
  63.                         testTableTarget = extInfo.getEntity();
  64.                 } catch (AtlasServiceException e) {
  65.                         if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
  66.                                 AtlasEntity.AtlasEntityWithExtInfo extInfo = new AtlasEntity.AtlasEntityWithExtInfo(testTableTarget);
  67.                                 //请求
  68.                                 EntityMutationResponse response = atlasClientV2.createEntity(extInfo);
  69.                                 testTableTarget.setGuid(response.getGuidAssignments().values().toArray(new String[]{})[0]);
  70.                         }
  71.                 }
复制代码
执行代码完毕后,查看类的树形图,发现已经产生了实体:
我们点击右侧的 test_db 实体,可以看到它的基本信息,也可以看到它的 relationship 信息,包含了 test_table_source 和 test_table_target 两个实体:
查看 relationship 信息,包含 test_table_source 和 test_table_target:
创建 test_table_source 和 test_table_target 的血缘关系依赖
前面我们提到,界说 test_table_target 的数据来自 test_table_source,血缘关系依赖在 Atlas 中着实也是作为实体 Entity 存在,只不过继续的父类是 Process,这样可以界说 inputs 和 outputs 属性,构建血缘关系,json 如下:
  1. {
  2.         "typeName": "Process",
  3.         "attributes": {
  4.                 "name": "test_process",
  5.                 "qualifiedName": "test_process",
  6.                 "description": "test_table_target 的数据来自 test_table_source",
  7.                 "inputs": [{
  8.                         "typeName": "my_table",
  9.                         //test_table_source的guid,创建实体从返回的信息中获取
  10.                         "guid": "xxx"
  11.                 }],
  12.                 "outputs": [{
  13.                         "typeName": "my_table",
  14.                         test_table_target的guid,创建实体从返回的信息中获取
  15.                         "guid": "xxx"
  16.                 }]
  17.         }
  18. }
复制代码
代码实现如下:
  1.                 AtlasEntity lineage = new AtlasEntity();
  2.                 //设置为process类型构建血缘
  3.                 lineage.setTypeName(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS);
  4.                 attributes.put("qualifiedName", "test_process");
  5.                 attributes.put("name", "test_process");
  6.                 attributes.put("description", "test_table_target 的数据来自 test_table_source");
  7.                 attributes.put("inputs", getLineAgeInfo(testTableSource));
  8.                 attributes.put("outputs", getLineAgeInfo(testTableTarget));
  9.                 lineage.setAttributes(attributes);
  10.                 queryAttributes.put("qualifiedName", "test_process");
  11.                 System.out.println(SingletonObject.OBJECT_MAPPER.writeValueAsString(lineage));
  12.                 try {
  13.                         //查询是否存在
  14.                         atlasClientV2.getEntityByAttribute(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS, queryAttributes);
  15.                 } catch (AtlasServiceException e)  {
  16.                         if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())) {
  17.                                 //创建
  18.                                 AtlasEntity.AtlasEntityWithExtInfo extInfo = new AtlasEntity.AtlasEntityWithExtInfo(lineage);
  19.                                 atlasClientV2.createEntity(extInfo);
  20.                         }
  21.                 }
  22.                
  23.         //构建inputs和outputs
  24.         private static List<Map<String, String>> getLineAgeInfo(AtlasEntity entity) {
  25.                 List<Map<String, String>> list = new ArrayList<>();
  26.                 Map<String, String> map = new HashMap<>();
  27.                 map.put("guid", entity.getGuid());
  28.                 map.put("typeName", entity.getTypeName());
  29.                 list.add(map);
  30.                 return list;
  31.         }
复制代码
执行以上代码,然后打开主页,点击 my_table 中的 test_table_source,查看 lineage 标签,血缘关系已成功构建:
至此,我们通过 Atlas Rest Api 的方式自行建模,创建实体,构建血缘关系就完成了。
元数据管理,数据治理,在当下仍然是一个热门的话题,同时,它也可以帮助我们更好的支撑企业的数据资产,更好的分析数据,为企业的发展决策提供有效的帮助。
引用链接:
1、https://blog.csdn.net/hshudoudou/article/details/123899947
2、https://blog.csdn.net/javaThanksgiving/article/details/130505251
3、原文 https://blog.csdn.net/m0_37719874/article/details/124245209

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

千千梦丶琪

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表