neo4j实现表字段级血缘关系

打印 上一主题 下一主题

主题 889|帖子 889|积分 2667

需求背景

需要在前端页面展示当前表字段的所有上下游血缘关系,以进一步做数据诊断治理。大致效果图如下:

首先这里解释什么是表字段血缘关系,SQL 示例:
  1. CREATE TABLE IF NOT EXISTS table_b
  2. AS SELECT order_id, order_status FROM table_a;
复制代码
如上 DDL 语句中,创建的 table_b 的 order_id 和 order_status 字段来源于 table_a,代表table_a 就是 table_b 的来源表,也叫上游表,table_b 就是 table_a 下游表,另外 table_a.order_id 就是 table_b.order_id 的上游字段,它们之间就存在血缘关系。
  1. INSERT INTO table_c
  2. SELECT a.order_id, b.order_status
  3. FROM table_a a JOIN table_b b ON a.order_id = b.order_id;
复制代码
如上 DML 语句中,table_c 的 order_id 字段来源于 table_a,而 order_status 来源于 table_b,表示 table_c 和 table_a、table_b 之间也存在血缘关系。
由上也可看出想要存储血缘关系,还需要先解析 sql,这块儿主要使用了开源项目 calcite 的解析器,这篇文章不再展开,本篇主要讲如何存储和如何展示
环境配置

参考另一篇:springboot 配置内嵌式 neo4j
Node 数据结构定义

因为要展示表的字段之间的血缘关系,所以直接将表字段作为图节点存储,表字段之间的血缘关系就用图节点之间的关系表示,具体 node 定义如下:
  1. public class ColumnVertex {
  2.   // 唯一键
  3.   private String name;
  4.   public ColumnVertex(String catalogName, String databaseName, String tableName, String columnName) {
  5.     this.name = catalogName + "." + databaseName + "." + tableName + "." + columnName;
  6.   }
  7.   public String getCatalogName() {
  8.     return Long.parseLong(name.split("\\.")[0]);
  9.   }
  10.   public String getDatabaseName() {
  11.     return name.split("\\.")[1];
  12.   }
  13.   public String getTableName() {
  14.     return name.split("\\.")[2];
  15.   }
  16.   public String getColumnName() {
  17.     return name.split("\\.")[3];
  18.   }
  19. }
复制代码
通用 Service 定义
  1. public interface EmbeddedGraphService {
  2.     // 添加图节点以及与上游节点之间的关系
  3.     void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex);
  4.     // 寻找上游节点
  5.     List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex);
  6.     // 寻找下游节点
  7.     List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex);
  8. }
复制代码
Service 实现
  1. import javax.annotation.Resource;
  2. import org.neo4j.graphdb.GraphDatabaseService;
  3. import org.neo4j.graphdb.Result;
  4. import org.neo4j.graphdb.Transaction;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. public class EmbeddedGraphServiceImpl implements EmbeddedGraphService {
  8.   @Resource private GraphDatabaseService graphDb;
  9.   @Override
  10.   public void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex) {
  11.     try (Transaction tx = graphDb.beginTx()) {
  12.       tx.execute(
  13.           "MERGE (c:ColumnVertex {name: $currentName}) MERGE (u:ColumnVertex {name: $upstreamName})"
  14.               + " MERGE (u)-[:UPSTREAM]->(c)",
  15.           Map.of("currentName", currentVertex.getName(), "upstreamName", upstreamVertex.getName()));
  16.       tx.commit();
  17.     }
  18.   }
  19.   @Override
  20.   public List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex) {
  21.     List<ColumnVertex> result = new ArrayList<>();
  22.     try (Transaction tx = graphDb.beginTx()) {
  23.       Result queryResult =
  24.           tx.execute(
  25.               "MATCH (u:ColumnVertex)-[:UPSTREAM]->(c:ColumnVertex) WHERE c.name = $name RETURN"
  26.                   + " u.name AS name",
  27.               Map.of("name", currentVertex.getName()));
  28.       while (queryResult.hasNext()) {
  29.         Map<String, Object> row = queryResult.next();
  30.         result.add(new ColumnVertex().setName((String) row.get("name")));
  31.       }
  32.       tx.commit();
  33.     }
  34.     return result;
  35.   }
  36.   @Override
  37.   public List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex) {
  38.     List<ColumnVertex> result = new ArrayList<>();
  39.     try (Transaction tx = graphDb.beginTx()) {
  40.       Result queryResult =
  41.           tx.execute(
  42.               "MATCH (c:ColumnVertex)-[:UPSTREAM]->(d:ColumnVertex) WHERE c.name = $name RETURN"
  43.                   + " d.name AS name",
  44.               Map.of("name", currentVertex.getName()));
  45.       while (queryResult.hasNext()) {
  46.         Map<String, Object> row = queryResult.next();
  47.         result.add(new ColumnVertex().setName((String) row.get("name")));
  48.       }
  49.       tx.commit();
  50.     }
  51.     return result;
  52.   }
  53. }
复制代码
遍历图节点

实现逻辑:

  • restful 接口入参:当前表(catalogName, databaseName, tableName)
  • 定义返回给前端的数据结构,采用 nodes 和 edges 方式返回,然后前端再根据节点与边关系渲染出完整的血缘关系图
  1. public class ColumnLineageVO {
  2.   List<ColumnLineageNode> nodes;
  3.   List<ColumnLineageEdge> edges;
  4. }
  5. public class ColumnLineageNode {
  6.   private String databaseName;
  7.   private String tableName;
  8.   private List<String> columnNames;
  9. }
  10. public class ColumnLineageEdge {
  11.   private ColumnLineageEdgePoint source;
  12.   private ColumnLineageEdgePoint target;
  13. }
  14. public class ColumnLineageEdgePoint {
  15.   private String databaseName;
  16.   private String tableName;
  17.   private String columnName;
  18. }
复制代码

  • 查询表字段
  • 采用递归的方式,利用当前表字段遍历与当前表字段关联的所有上下游图节点
  • 将所有节点封装成 List ColumnLineageVO 返回给前端
  1. public ColumnLineageVO getColumnLineage(Table table) {
  2.     ColumnLineageVO columnLineageVO = new ColumnLineageVO();
  3.     List<ColumnLineageNode> nodes = new ArrayList<>();
  4.     List<ColumnLineageEdge> edges = new ArrayList<>();
  5.     // Deduplication
  6.     Set<String> visitedNodes = new HashSet<>();
  7.     Set<String> visitedEdges = new HashSet<>();
  8.     Map<String, List<ColumnVertex>> upstreamCache = new HashMap<>();
  9.     Map<String, List<ColumnVertex>> downstreamCache = new HashMap<>();
  10.     ColumnLineageNode currentNode =
  11.         ColumnLineageNode.builder()
  12.             .databaseName(table.getDatabaseName())
  13.             .tableName(table.getTableName())
  14.             .type(TableType.EXTERNAL_TABLE.getDesc())
  15.             .build();
  16.     nodes.add(currentNode);
  17.     visitedNodes.add(currentNode.getDatabaseName() + "." + currentNode.getTableName());
  18.     for (String columnName : table.getColumnNames()) {
  19.       ColumnVertex currentVertex =
  20.           new ColumnVertex(
  21.               table.getScriptId(), table.getDatabaseName(), table.getTableName(), columnName);
  22.       traverseUpstreamColumnVertex(
  23.           currentVertex, nodes, edges, visitedNodes, visitedEdges, upstreamCache);
  24.       traverseDownstreamColumnVertex(
  25.           currentVertex, nodes, edges, visitedNodes, visitedEdges, downstreamCache);
  26.     }
  27.     columnLineageVO.setNodes(nodes);
  28.     columnLineageVO.setEdges(edges);
  29.     return columnLineageVO;
  30.   }
  31. private void traverseUpstreamColumnVertex(
  32.       ColumnVertex currentVertex,
  33.       List<ColumnLineageNode> nodes,
  34.       List<ColumnLineageEdge> edges,
  35.       Set<String> visitedNodes,
  36.       Set<String> visitedEdges,
  37.       Map<String, List<ColumnVertex>> cache) {
  38.     List<ColumnVertex> upstreamVertices;
  39.     if (cache.containsKey(currentVertex.getName())) {
  40.       upstreamVertices = cache.get(currentVertex.getName());
  41.     } else {
  42.       upstreamVertices = embeddedGraphService.findUpstreamColumnVertex(currentVertex);
  43.       cache.put(currentVertex.getName(), upstreamVertices);
  44.     }
  45.     for (ColumnVertex upstreamVertex : upstreamVertices) {
  46.       String nodeKey = upstreamVertex.getDatabaseName() + "." + upstreamVertex.getTableName();
  47.       if (!visitedNodes.contains(nodeKey)) {
  48.         ColumnLineageNode upstreamNode =
  49.             ColumnLineageNode.builder()
  50.                 .databaseName(upstreamVertex.getDatabaseName())
  51.                 .tableName(upstreamVertex.getTableName())
  52.                 .type(TableType.EXTERNAL_TABLE.getDesc())
  53.                 .build();
  54.         nodes.add(upstreamNode);
  55.         visitedNodes.add(nodeKey);
  56.       }
  57.       String edgeKey =
  58.           upstreamVertex.getDatabaseName()
  59.               + upstreamVertex.getTableName()
  60.               + upstreamVertex.getColumnName()
  61.               + currentVertex.getDatabaseName()
  62.               + currentVertex.getTableName()
  63.               + currentVertex.getColumnName();
  64.       if (!visitedEdges.contains(edgeKey)) {
  65.         ColumnLineageEdge edge = createEdge(upstreamVertex, currentVertex);
  66.         edges.add(edge);
  67.         visitedEdges.add(edgeKey);
  68.       }
  69.       traverseUpstreamColumnVertex(upstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
  70.     }
  71.   }
  72.   
  73. private void traverseDownstreamColumnVertex(
  74.       ColumnVertex currentVertex,
  75.       List<ColumnLineageNode> nodes,
  76.       List<ColumnLineageEdge> edges,
  77.       Set<String> visitedNodes,
  78.       Set<String> visitedEdges,
  79.       Map<String, List<ColumnVertex>> cache) {
  80.     List<ColumnVertex> downstreamVertices;
  81.     if (cache.containsKey(currentVertex.getName())) {
  82.       downstreamVertices = cache.get(currentVertex.getName());
  83.     } else {
  84.       downstreamVertices = embeddedGraphService.findDownstreamColumnVertex(currentVertex);
  85.       cache.put(currentVertex.getName(), downstreamVertices);
  86.     }
  87.     for (ColumnVertex downstreamVertex : downstreamVertices) {
  88.       String nodeKey = downstreamVertex.getDatabaseName() + "." + downstreamVertex.getTableName();
  89.       if (!visitedNodes.contains(nodeKey)) {
  90.         ColumnLineageNode downstreamNode =
  91.             ColumnLineageNode.builder()
  92.                 .databaseName(downstreamVertex.getDatabaseName())
  93.                 .tableName(downstreamVertex.getTableName())
  94.                 .type(TableType.EXTERNAL_TABLE.getDesc())
  95.                 .build();
  96.         nodes.add(downstreamNode);
  97.         visitedNodes.add(nodeKey);
  98.       }
  99.       String edgeKey =
  100.           currentVertex.getDatabaseName()
  101.               + currentVertex.getTableName()
  102.               + currentVertex.getColumnName()
  103.               + downstreamVertex.getDatabaseName()
  104.               + downstreamVertex.getTableName()
  105.               + downstreamVertex.getColumnName();
  106.       if (!visitedEdges.contains(edgeKey)) {
  107.         ColumnLineageEdge edge = createEdge(currentVertex, downstreamVertex);
  108.         edges.add(edge);
  109.         visitedEdges.add(edgeKey);
  110.       }
  111.       traverseDownstreamColumnVertex(
  112.           downstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
  113.     }
  114.   }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

罪恶克星

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表