Spring Boot 集成 Kettle

打印 上一主题 下一主题

主题 883|帖子 883|积分 2649

Kettle 简介

Kettle 最初由 Matt Casters 开发,是 Pentaho 数据集成平台的一部分。它提供了一个用户友好的界面和丰富的功能集,使用户能够轻松地设计、执行和监控 ETL 任务。Kettle 通过其强盛的功能和灵活性,资助企业高效地处置惩罚大规模数据集成任务。
重要构成部分


  • Spoon

    • 用途:Spoon 是 Kettle 的图形化设计工具。用户可以使用 Spoon 设计和调试 ETL 转换和作业。
    • 功能:拖放式界面、预览数据、测试 ETL 流程、管理连接、编写脚本等。

  • Pan

    • 用途:Pan 是一个下令行工具,用于执行由 Spoon 设计的 ETL 转换。
    • 功能:通过下令行执行转换、调度作业、集成到其他自动化流程中。

  • Kitchen

    • 用途:Kitchen 是一个下令行工具,用于执行由 Spoon 设计的 ETL 作业。
    • 功能:通过下令行执行作业、调度作业、集成到其他自动化流程中。

  • Carte

    • 用途:Carte 是一个轻量级的 Web 服务器,提供远程执行和监控功能。
    • 功能:远程执行和监控 ETL 转换和作业、查察日志、管理集群等。

  • Repositories

    • 用途:存储和管理 ETL 转换和作业的地方。
    • 功能:可以使用数据库或文件系统作为存储库,支持版本控制和共享。

重要功能和特点


  • 数据提取

    • 支持多种数据源,如关系数据库、文件(CSV、Excel、XML 等)、大数据平台(Hadoop、Hive 等)、云存储(Amazon S3、Google Drive 等)、Web 服务和 API 等。

  • 数据转换

    • 丰富的转换步骤,包括数据清洗、数据聚合、数据过滤、数据排序、数据连接、数据拆分、数据类型转换等。

  • 数据加载

    • 支持将数据加载到多种目的系统中,如关系数据库、大数据平台、文件系统、云存储等。

  • 调度和自动化

    • 支持通过下令行工具(Pan 和 Kitchen)和调度器(如 cron 或 Windows 任务计划)举行调度和自动化执行。

  • 扩展性

    • 提供了插件机制,用户可以编写自界说插件,扩展 Kettle 的功能。
    • 支持 JavaScript 和 Java 举行脚本编写,增强转换和作业的灵活性。

  • 集群和并行处置惩罚

    • 支持集群模式,能够在分布式情况中并行处置惩罚大规模数据。
    • 提供了分布式 ETL 执行和负载均衡功能。

  • 数据质量和数据治理

    • 提供了数据验证、数据一致性检查和数据校验功能,资助确保数据的质量和一致性。

  • 实时数据处置惩罚

    • 支持实时数据流处置惩罚,通过集成 Kafka、MQTT 等流处置惩罚平台,实现实时数据的提取、转换和加载。

集成 Kettle

将 Kettle(Pentaho Data Integration, PDI)集成到 Spring Boot 项目中,可以实现 ETL 流程的自动化和集成化处置惩罚。以下是详细的集成过程:
准备工作


  • 下载 Kettle:从 Pentaho 官网下载 Kettle(PDI)的最新版本,并解压到本地目录。
  • Spring Boot 项目:确保已有一个 Spring Boot 项目,或新建一个 Spring Boot 项目。
引入 Kettle 依赖

在 Spring Boot 项目的 pom.xml 文件中添加 Kettle 所需的依赖。你可以将 Kettle 的 JAR 文件添加到本地 Maven 仓库,或直接在项目中引入这些 JAR 文件。
  1. <dependencies>
  2.     <!-- Spring Boot 依赖 -->
  3.     <!-- Kettle 依赖 -->
  4.     <dependency>
  5.         <groupId>pentaho-kettle</groupId>
  6.         <artifactId>kettle-core</artifactId>
  7.         <version>9.4.0.0-343</version>
  8.     </dependency>
  9.     <dependency>
  10.         <groupId>pentaho-kettle</groupId>
  11.         <artifactId>kettle-engine</artifactId>
  12.         <version>9.4.0.0-343</version>
  13.     </dependency>
  14.     <dependency>
  15.         <groupId>pentaho-kettle</groupId>
  16.         <artifactId>kettle-dbdialog</artifactId>
  17.         <version>9.4.0.0-343</version>
  18.     </dependency>
  19.     <dependency>
  20.         <groupId>org.apache.commons</groupId>
  21.         <artifactId>commons-vfs2</artifactId>
  22.         <version>2.7.0</version>
  23.     </dependency>
  24.     <!-- 根据需要添加其他 Kettle 依赖 -->
  25.    
  26.     <!-- 操作数据库数据时添加相应的数据库依赖 -->
  27.    
  28. </dependencies>
复制代码
处置惩罚密码加密

在 resources 目录下创建 kettle-password-encoder-plugins.xml 文件,用于设置密码加密插件:
  1. <password-encoder-plugins>
  2.     <password-encoder-plugin id="Kettle">
  3.         <description>Kettle Password Encoder</description>
  4.         <classname>org.pentaho.support.encryption.KettleTwoWayPasswordEncoder</classname>
  5.     </password-encoder-plugin>
  6. </password-encoder-plugins>
复制代码
kettle-core依赖中org.pentaho.support.encryption.KettleTwoWayPasswordEncoder类实现了TwoWayPasswordEncoderInterface接口,用于处置惩罚密码的加密息争密利用。
添加 Spoon 的任务文件

在 Kettle(Pentaho Data Integration,PDI)中,作业(Job)和转换(Transformation)是两种核心的 ETL 组件,它们在设计和功能上有着本质的区别。
转换(Transformation)


  • 数据处置惩罚流程:转换是一个数据处置惩罚流程,专注于数据的提取(Extract)、转换(Transform)和加载(Load)。
  • 行级处置惩罚:转换以行级处置惩罚数据,每次处置惩罚一行数据,并将其传递给下一步骤。
  • 任务文件为.ktr文件。
作业(Job)


  • 任务管理和控制流程:作业是一个任务管理和控制流程,负责调度和控制一系列任务的执行顺序。
  • 步骤级处置惩罚:作业以步骤为单元处置惩罚任务,每次执行一个步骤,然后根据条件决定执行下一个步骤。
  • 任务文件为.kjb文件。
区别


  • 转换处置惩罚数据行,作业处置惩罚任务步骤。
  • 转换中的步骤是并行执行的,而作业中的步骤是顺序执行的。
  • 转换偏重于数据的处置惩罚和转换,作业偏重于任务的调度和管理。
  • 转换重要通过数据流控制,作业提供了丰富的逻辑控制(条件判断、循环、错误处置惩罚等)。
  • 转换实用于复杂的数据处置惩罚流程,作业实用于任务调度和控制。
在 Spring Boot 项目的 resources 目录下,创建一个 kettle 目录,并将 Kettle 的任务文件(如 转换1.ktr)复制到该目录中。
编写 Kettle 服务类

创建一个服务类,用于执行 Kettle 转换或作业。
  1. package com.example.kettletest.service.impl;
  2. import com.example.kettletest.service.KettleJobService;
  3. import org.pentaho.di.core.KettleEnvironment;
  4. import org.pentaho.di.core.exception.KettleException;
  5. import org.pentaho.di.core.exception.KettleXMLException;
  6. import org.pentaho.di.core.util.EnvUtil;
  7. import org.pentaho.di.job.Job;
  8. import org.pentaho.di.job.JobMeta;
  9. import org.pentaho.di.trans.Trans;
  10. import org.pentaho.di.trans.TransMeta;
  11. import org.springframework.core.io.ClassPathResource;
  12. import org.springframework.stereotype.Service;
  13. import java.io.File;
  14. import java.io.IOException;
  15. /**
  16. * @author 罗森
  17. * @date 2024/6/6 13:21
  18. */
  19. @Service
  20. public class KettleJobServiceImpl implements KettleJobService {
  21.     @Override
  22.     public void runTaskFile(String taskFileName) {
  23.         // 初始化 Kettle 环境
  24.         try {
  25.             KettleEnvironment.init();
  26.             EnvUtil.environmentInit();
  27.         } catch (KettleException e) {
  28.             throw new RuntimeException(e);
  29.         }
  30.         // 执行任务文件
  31.         if (taskFileName.endsWith(".ktr")) {
  32.             taskFileKTR(taskFileName);
  33.         } else if (taskFileName.endsWith(".kjb")) {
  34.             taskFileKJB(taskFileName);
  35.         } else {
  36.             throw new IllegalArgumentException("Unsupported file type: " + taskFileName);
  37.         }
  38.     }
  39.     /**
  40.      * 针对kjb文件的操作
  41.      * @param taskFileName
  42.      */
  43.     public void taskFileKJB(String taskFileName) {
  44.         try {
  45.             // 获取资源文件路径
  46.             ClassPathResource resource = new ClassPathResource("kettle/" + taskFileName);
  47.             File jobFile = resource.getFile();
  48.             // 加载 KJB 文件
  49.             JobMeta jobMeta = new JobMeta(jobFile.getAbsolutePath(), null);
  50.             // 创建作业对象
  51.             Job job = new Job(null, jobMeta);
  52.             // 启动作业
  53.             job.start();
  54.             // 等待作业完成
  55.             job.waitUntilFinished();
  56.             if (job.getErrors() > 0) {
  57.                 System.out.println("There were errors during job execution.");
  58.             } else {
  59.                 System.out.println("Job executed successfully.");
  60.             }
  61.         } catch (IOException | KettleXMLException e) {
  62.             e.printStackTrace();
  63.         }
  64.     }
  65.     /**
  66.      * 针对ktr文件的操作
  67.      * @param taskFileName
  68.      */
  69.     public void taskFileKTR(String taskFileName) {
  70.         try {
  71.             // 获取资源文件路径
  72.             ClassPathResource resource = new ClassPathResource("kettle/" + taskFileName);
  73.             File transFile = resource.getFile();
  74.             // 加载 KTR 文件
  75.             TransMeta transMeta = new TransMeta(transFile.getAbsolutePath());
  76.             // 创建转换对象
  77.             Trans trans = new Trans(transMeta);
  78.             // 启动作业
  79.             trans.execute(null);
  80.             // 等待作业完成
  81.             trans.waitUntilFinished();
  82.             if (trans.getErrors() > 0) {
  83.                 System.err.println("There were errors during Transformation execution.");
  84.             } else {
  85.                 System.out.println("Transformation executed successfully!");
  86.             }
  87.         } catch (IOException | KettleException e) {
  88.             e.printStackTrace();
  89.         }
  90.     }
  91. }
复制代码
常见问题办理办法


  • 运行后报错信息为:Unable to find plugin with ID 'Kettle'. If this is a test, make sure kettle-core tests jar is a dependency. If this is live make sure a kettle-password-encoder-plugins.xml exits in the classpath.
    **办理办法:**在 resources 目录下创建 kettle-password-encoder-plugins.xml 文件。
  • 运行后报错信息为:ERROR (version 9.4.0.0-343, build 0.0 from 2022-11-08 07.50.27 by buildguy) : A serious error occurred during job execution: 无法找到作业的开始点.
    **办理办法:**为Spoon制作的作业任务增长开始节点。
  • 运行后报错信息为:Can't run transformation due to plugin missing.
    **办理办法:**此问题通常出现在涉及类似于导出excel文件、json文件时。在初始化 Kettle 情况之前指明相关插件的绝对路径(相关插件通常在Kettle本地解压文件夹中的plugins目录下),新增以下代码:
    1. StepPluginType.getInstance().getPluginFolders().add(new PluginFolder("E:\Kettle\pdi-ce-9.4.0.0-343\data-integration\plugins", false, true));
    复制代码
    将代码中的地址换成您本地的绝对地址。

(END)
by luosen.

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

惊雷无声

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表