【Flink-Kafka-To-Hive】使用 Flink 实现 Kafka 数据写入 Hive

忿忿的泥巴坨  金牌会员 | 2024-8-2 11:30:12 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 881|帖子 881|积分 2643

需求形貌:
1、数据从 Kafka 写入 Hive。
2、相关配置存放于 Mysql 中,通过 Mysql 举办法态读取。
3、此案例中的 Kafka 是举行了 Kerberos 安全认证的,如果不需要自行修改。
4、Flink 集成 Kafka 写入 Hive 需要举行 checkpoint 才能落盘至 HDFS。
5、先在 Hive 中创建表然后动态获取 Hive 的表结构。
6、Kafka 数据为 Json 格式,通过 FlatMap 扁平化处理惩罚后,根据表结构封装到 Row 中后完成写入。
7、写入时转换成暂时视图模式,利用 Flink-Sql 实现数据写入。
8、当地测试时 Hive 相关文件要放置到 resources 目次下。
9、当地测试时可以编辑 resources.flink_backup_local.yml 通过 ConfigTools.initConf 方法获取配置。
1)导入相关依靠

这里的依靠比力冗余,各人可以根据各自需求做删除或保留。
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>example.cn.test</groupId>
  7.     <artifactId>kafka2hive</artifactId>
  8.     <version>1.0.0</version>
  9.     <properties>
  10.         <hbase.version>2.3.3</hbase.version>
  11.         <hadoop.version>3.1.1</hadoop.version>
  12.         <spark.version>3.0.2</spark.version>
  13.         <scala.version>2.12.10</scala.version>
  14.         <maven.compiler.source>8</maven.compiler.source>
  15.         <maven.compiler.target>8</maven.compiler.target>
  16.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  17.         <flink.version>1.14.0</flink.version>
  18.         <scala.binary.version>2.12</scala.binary.version>
  19.         <target.java.version>1.8</target.java.version>
  20.         <maven.compiler.source>${target.java.version}</maven.compiler.source>
  21.         <maven.compiler.target>${target.java.version}</maven.compiler.target>
  22.         <log4j.version>2.17.2</log4j.version>
  23.         <hadoop.version>3.1.2</hadoop.version>
  24.         <hive.version>3.1.2</hive.version>
  25.     </properties>
  26.     <dependencies>
  27.         <dependency>
  28.             <groupId>gaei.cn.x5l.bigdata.common</groupId>
  29.             <artifactId>x5l-bigdata-common</artifactId>
  30.             <version>1.1-SNAPSHOT</version>
  31.             <exclusions>
  32.                 <exclusion>
  33.                     <groupId>org.apache.logging.log4j</groupId>
  34.                     <artifactId>log4j-core</artifactId>
  35.                 </exclusion>
  36.                 <exclusion>
  37.                     <groupId>org.apache.logging.log4j</groupId>
  38.                     <artifactId>log4j-slf4j-impl</artifactId>
  39.                 </exclusion>
  40.                 <exclusion>
  41.                     <groupId>org.apache.logging.log4j</groupId>
  42.                     <artifactId>log4j-api</artifactId>
  43.                 </exclusion>
  44.             </exclusions>
  45.         </dependency>
  46.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-dist -->
  47.         <dependency>
  48.             <groupId>org.apache.flink</groupId>
  49.             <artifactId>flink-dist_2.12</artifactId>
  50.             <version>1.14.0-csa1.7.0.0</version>
  51.             <scope>provided</scope>
  52.             <exclusions>
  53.                 <exclusion>
  54.                     <groupId>org.slf4j</groupId>
  55.                     <artifactId>slf4j-log4j12</artifactId>
  56.                 </exclusion>
  57.             </exclusions>
  58.         </dependency>
  59.         <dependency>
  60.             <groupId>org.apache.flink</groupId>
  61.             <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
  62.             <version>${flink.version}</version>
  63.         </dependency>
  64.         <dependency>
  65.             <groupId>org.jyaml</groupId>
  66.             <artifactId>jyaml</artifactId>
  67.             <version>1.3</version>
  68.         </dependency>
  69.         <dependency>
  70.             <groupId>gaei.cn.x5l</groupId>
  71.             <artifactId>tsp-gb-decode</artifactId>
  72.             <version>1.0.0</version>
  73.             <exclusions>
  74.                 <exclusion>
  75.                     <groupId>org.apache.logging.log4j</groupId>
  76.                     <artifactId>log4j-core</artifactId>
  77.                 </exclusion>
  78.                 <exclusion>
  79.                     <groupId>org.apache.logging.log4j</groupId>
  80.                     <artifactId>log4j-api</artifactId>
  81.                 </exclusion>
  82.                 <exclusion>
  83.                     <groupId>org.apache.logging.log4j</groupId>
  84.                     <artifactId>log4j-slf4j-impl</artifactId>
  85.                 </exclusion>
  86.             </exclusions>
  87.         </dependency>
  88.         <dependency>
  89.             <groupId>mysql</groupId>
  90.             <artifactId>mysql-connector-java</artifactId>
  91.             <version>5.1.44</version>
  92.             <scope>runtime</scope>
  93.         </dependency>
  94.         <dependency>
  95.             <groupId>gaei.cn.x5l.flink.common</groupId>
  96.             <artifactId>x5l-flink-common</artifactId>
  97.             <version>1.2-SNAPSHOT</version>
  98.             <scope>compile</scope>
  99.             <exclusions>
  100.                 <exclusion>
  101.                     <artifactId>slf4j-api</artifactId>
  102.                     <groupId>org.slf4j</groupId>
  103.                 </exclusion>
  104.                 <exclusion>
  105.                     <groupId>org.apache.logging.log4j</groupId>
  106.                     <artifactId>log4j-core</artifactId>
  107.                 </exclusion>
  108.                 <exclusion>
  109.                     <groupId>org.apache.logging.log4j</groupId>
  110.                     <artifactId>log4j-api</artifactId>
  111.                 </exclusion>
  112.                 <exclusion>
  113.                     <groupId>org.apache.logging.log4j</groupId>
  114.                     <artifactId>log4j-slf4j-impl</artifactId>
  115.                 </exclusion>
  116.                 <exclusion>
  117.                     <groupId>org.apache.logging.log4j</groupId>
  118.                     <artifactId>log4j-1.2-api</artifactId>
  119.                 </exclusion>
  120.             </exclusions>
  121.         </dependency>
  122.         <!-- Flink Dependency -->
  123.         <dependency>
  124.             <groupId>org.apache.flink</groupId>
  125.             <artifactId>flink-connector-hive_2.12</artifactId>
  126.             <version>1.14.0</version>
  127.         </dependency>
  128.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3 -->
  129.         <dependency>
  130.             <groupId>org.apache.flink</groupId>
  131.             <artifactId>flink-shaded-hadoop-3</artifactId>
  132.             <version>3.1.1.7.2.8.0-224-9.0</version>
  133.             <scope>provided</scope>
  134.             <exclusions>
  135.                 <exclusion>
  136.                     <artifactId>slf4j-log4j12</artifactId>
  137.                     <groupId>org.slf4j</groupId>
  138.                 </exclusion>
  139.                 <exclusion>
  140.                     <groupId>org.apache.logging.log4j</groupId>
  141.                     <artifactId>log4j-core</artifactId>
  142.                 </exclusion>
  143.                 <exclusion>
  144.                     <groupId>org.apache.logging.log4j</groupId>
  145.                     <artifactId>log4j-api</artifactId>
  146.                 </exclusion>
  147.                 <exclusion>
  148.                     <groupId>org.apache.logging.log4j</groupId>
  149.                     <artifactId>log4j-slf4j-impl</artifactId>
  150.                 </exclusion>
  151.                 <exclusion>
  152.                     <groupId>log4j</groupId>
  153.                     <artifactId>log4j</artifactId>
  154.                 </exclusion>
  155.             </exclusions>
  156.         </dependency>
  157.         <dependency>
  158.             <groupId>cn.hutool</groupId>
  159.             <artifactId>hutool-all</artifactId>
  160.             <version>5.8.10</version>
  161.         </dependency>
  162.         <dependency>
  163.             <groupId>com.alibaba.ververica</groupId>
  164.             <artifactId>flink-connector-mysql-cdc</artifactId>
  165.             <version>1.4.0</version>
  166.         </dependency>
  167.         <dependency>
  168.             <groupId>org.apache.flink</groupId>
  169.             <artifactId>flink-connector-jdbc_2.11</artifactId>
  170.             <version>1.11.6</version>
  171.         </dependency>
  172.         <dependency>
  173.             <groupId>org.apache.flink</groupId>
  174.             <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
  175.             <version>${flink.version}</version>
  176.         </dependency>
  177.         <!-- 基础依赖  开始-->
  178.         <dependency>
  179.             <groupId>org.apache.flink</groupId>
  180.             <artifactId>flink-java</artifactId>
  181.             <version>${flink.version}</version>
  182.             <scope>provided</scope>
  183.         </dependency>
  184.         <dependency>
  185.             <groupId>org.apache.flink</groupId>
  186.             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  187.             <version>${flink.version}</version>
  188.             <scope>provided</scope>
  189.         </dependency>
  190.         <dependency>
  191.             <groupId>org.apache.flink</groupId>
  192.             <artifactId>flink-clients_${scala.binary.version}</artifactId>
  193.             <version>${flink.version}</version>
  194.             <scope>provided</scope>
  195.         </dependency>
  196.         <!-- 基础依赖  结束-->
  197.         <!-- TABLE  开始-->
  198.         <dependency>
  199.             <groupId>org.apache.flink</groupId>
  200.             <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
  201.             <version>1.14.0</version>
  202.             <scope>provided</scope>
  203.         </dependency>
  204.         <!-- 使用 hive sql时注销,其他时候可以放开 -->
  205.         <dependency>
  206.             <groupId>org.apache.flink</groupId>
  207.             <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
  208.             <version>${flink.version}</version>
  209.             <scope>provided</scope>
  210.         </dependency>
  211.         <dependency>
  212.             <groupId>org.apache.flink</groupId>
  213.             <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  214.             <version>${flink.version}</version>
  215.             <scope>provided</scope>
  216.         </dependency>
  217.         <dependency>
  218.             <groupId>org.apache.flink</groupId>
  219.             <artifactId>flink-table-common</artifactId>
  220.             <version>${flink.version}</version>
  221.             <scope>provided</scope>
  222.         </dependency>
  223.         <dependency>
  224.             <groupId>org.apache.flink</groupId>
  225.             <artifactId>flink-cep_${scala.binary.version}</artifactId>
  226.             <version>${flink.version}</version>
  227.         </dependency>
  228.         <!-- TABLE  结束-->
  229.         <!-- sql  开始-->
  230.         <!-- sql解析 开始 -->
  231.         <dependency>
  232.             <groupId>org.apache.flink</groupId>
  233.             <artifactId>flink-json</artifactId>
  234.             <version>${flink.version}</version>
  235.             <scope>provided</scope>
  236.         </dependency>
  237.         <dependency>
  238.             <groupId>org.apache.flink</groupId>
  239.             <artifactId>flink-csv</artifactId>
  240.             <version>${flink.version}</version>
  241.             <scope>provided</scope>
  242.         </dependency>
  243.         <!-- sql解析 结束 -->
  244.         <!-- sql连接 kafka -->
  245.         <dependency>
  246.             <groupId>org.apache.flink</groupId>
  247.             <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
  248.             <version>${flink.version}</version>
  249.         </dependency>
  250.         <!-- sql  结束-->
  251.         <!-- 检查点 -->
  252.         <dependency>
  253.             <groupId>org.apache.flink</groupId>
  254.             <artifactId>flink-state-processor-api_${scala.binary.version}</artifactId>
  255.             <version>${flink.version}</version>
  256.             <scope>provided</scope>
  257.         </dependency>
  258.         <dependency>
  259.             <groupId>org.apache.flink</groupId>
  260.             <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  261.             <version>${flink.version}</version>
  262.         </dependency>
  263.         <dependency>
  264.             <groupId>commons-lang</groupId>
  265.             <artifactId>commons-lang</artifactId>
  266.             <version>2.5</version>
  267.             <scope>compile</scope>
  268.         </dependency>
  269.         <dependency>
  270.             <groupId>org.apache.flink</groupId>
  271.             <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
  272.             <version>${flink.version}</version>
  273.             <scope>provided</scope>
  274.         </dependency>
  275.         <!-- 本地监控任务 结束 -->
  276.         <!-- DataStream 开始 -->
  277.         <dependency>
  278.             <groupId>org.apache.logging.log4j</groupId>
  279.             <artifactId>log4j-slf4j-impl</artifactId>
  280.             <version>${log4j.version}</version>
  281.             <scope>runtime</scope>
  282.         </dependency>
  283.         <dependency>
  284.             <groupId>org.apache.logging.log4j</groupId>
  285.             <artifactId>log4j-api</artifactId>
  286.             <version>${log4j.version}</version>
  287.             <scope>runtime</scope>
  288.         </dependency>
  289.         <dependency>
  290.             <groupId>org.apache.logging.log4j</groupId>
  291.             <artifactId>log4j-core</artifactId>
  292.             <version>${log4j.version}</version>
  293.             <scope>runtime</scope>
  294.         </dependency>
  295.         <!-- hdfs -->
  296.         <dependency>
  297.             <groupId>org.apache.hadoop</groupId>
  298.             <artifactId>hadoop-client</artifactId>
  299.             <version>3.3.1</version>
  300.         </dependency>
  301.         <!-- 重点,容易被忽略的jar -->
  302.         <dependency>
  303.             <groupId>org.apache.hadoop</groupId>
  304.             <artifactId>hadoop-auth</artifactId>
  305.             <version>${hadoop.version}</version>
  306.         </dependency>
  307.         <!-- rocksdb_2 -->
  308.         <dependency>
  309.             <groupId>org.apache.flink</groupId>
  310.             <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
  311.             <version>${flink.version}</version>
  312.             <scope>provided</scope>
  313.         </dependency>
  314.         <!-- 其他 -->
  315.         <dependency>
  316.             <groupId>com.alibaba</groupId>
  317.             <artifactId>fastjson</artifactId>
  318.             <version>1.1.23</version>
  319.         </dependency>
  320.         <dependency>
  321.             <groupId>org.projectlombok</groupId>
  322.             <artifactId>lombok</artifactId>
  323.             <version>1.16.18</version>
  324.             <scope>provided</scope>
  325.         </dependency>
  326.             <!-- kafka2mongo 离线任务 -->
  327.             <dependency>
  328.                 <groupId>org.mongodb</groupId>
  329.                 <artifactId>mongodb-driver</artifactId>
  330.                 <version>3.12.6</version>
  331.             </dependency>
  332.     </dependencies>
  333.     <build>
  334.         <plugins>
  335.             <plugin>
  336.                 <groupId>org.apache.maven.plugins</groupId>
  337.                 <artifactId>maven-shade-plugin</artifactId>
  338.                 <version>3.0.0</version>
  339.                 <executions>
  340.                     <execution>
  341.                         <phase>package</phase>
  342.                         <goals>
  343.                             <goal>shade</goal>
  344.                         </goals>
  345.                         <configuration>
  346.                             <createDependencyReducedPom>false</createDependencyReducedPom>
  347.                             <artifactSet>
  348.                                 <excludes>
  349.                                     <exclude>org.apache.flink:force-shading</exclude>
  350.                                     <exclude>com.google.code.findbugs:jsr305</exclude>
  351.                                     <exclude>org.slf4j:*</exclude>
  352.                                     <exclude>org.apache.logging.log4j:*</exclude>
  353.                                     <exclude>org.apache.flink:flink-runtime-web_2.11</exclude>
  354.                                 </excludes>
  355.                             </artifactSet>
  356.                             <filters>
  357.                                 <filter>
  358.                                     <artifact>*:*</artifact>
  359.                                     <excludes>
  360.                                         <exclude>META-INF/*.SF</exclude>
  361.                                         <exclude>META-INF/*.DSA</exclude>
  362.                                         <exclude>META-INF/*.RSA</exclude>
  363.                                     </excludes>
  364.                                 </filter>
  365.                             </filters>
  366.                             <transformers>
  367.                                 <transformer
  368.                                         implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  369.                                     <mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass>
  370.                                 </transformer>
  371.                                 <!-- flink sql 需要  -->
  372.                                 <!-- The service transformer is needed to merge META-INF/services files -->
  373.                                 <transformer
  374.                                         implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
  375.                                 <!-- ... -->
  376.                             </transformers>
  377.                         </configuration>
  378.                     </execution>
  379.                 </executions>
  380.             </plugin>
  381.         </plugins>
  382.         <pluginManagement>
  383.             <plugins>
  384.                 <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
  385.                 <plugin>
  386.                     <groupId>org.eclipse.m2e</groupId>
  387.                     <artifactId>lifecycle-mapping</artifactId>
  388.                     <version>1.0.0</version>
  389.                     <configuration>
  390.                         <lifecycleMappingMetadata>
  391.                             <pluginExecutions>
  392.                                 <pluginExecution>
  393.                                     <pluginExecutionFilter>
  394.                                         <groupId>org.apache.maven.plugins</groupId>
  395.                                         <artifactId>maven-shade-plugin</artifactId>
  396.                                         <versionRange>[3.0.0,)</versionRange>
  397.                                         <goals>
  398.                                             <goal>shade</goal>
  399.                                         </goals>
  400.                                     </pluginExecutionFilter>
  401.                                     <action>
  402.                                         <ignore/>
  403.                                     </action>
  404.                                 </pluginExecution>
  405.                                 <pluginExecution>
  406.                                     <pluginExecutionFilter>
  407.                                         <groupId>org.apache.maven.plugins</groupId>
  408.                                         <artifactId>maven-compiler-plugin</artifactId>
  409.                                         <versionRange>[3.1,)</versionRange>
  410.                                         <goals>
  411.                                             <goal>testCompile</goal>
  412.                                             <goal>compile</goal>
  413.                                         </goals>
  414.                                     </pluginExecutionFilter>
  415.                                     <action>
  416.                                         <ignore/>
  417.                                     </action>
  418.                                 </pluginExecution>
  419.                             </pluginExecutions>
  420.                         </lifecycleMappingMetadata>
  421.                     </configuration>
  422.                 </plugin>
  423.             </plugins>
  424.         </pluginManagement>
  425.     </build>
  426.     <repositories>
  427.         <repository>
  428.             <id>cdh.releases.repo</id>
  429.             <url>https://repository.cloudera.com/artifactory/libs-release-local/</url>
  430.             <name>Releases Repository</name>
  431.         </repository>
  432.     </repositories>
  433. </project>
复制代码
2)代码实现

2.1.resources

2.1.1.appconfig.yml

  1. mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false"
  2. mysql.username: "test"
  3. mysql.password: "123456"
  4. mysql.driver: "com.mysql.jdbc.Driver"
复制代码
2.1.2.log4j.properties

  1. log4j.rootLogger=info, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
复制代码
2.1.3.log4j2.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration monitorInterval="5">
  3.     <Properties>
  4.         <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
  5.         <property name="LOG_LEVEL" value="ERROR" />
  6.     </Properties>
  7.     <appenders>
  8.         <console name="console" target="SYSTEM_OUT">
  9.             <PatternLayout pattern="${LOG_PATTERN}"/>
  10.             <ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
  11.         </console>
  12.         <File name="log" fileName="tmp/log/job.log" append="false">
  13.             <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
  14.         </File>
  15.     </appenders>
  16.     <loggers>
  17.         <root level="${LOG_LEVEL}">
  18.             <appender-ref ref="console"/>
  19.             <appender-ref ref="log"/>
  20.         </root>
  21.     </loggers>
  22. </configuration>
复制代码
2.1.4.flink_backup_local.yml

  1. hdfs:
  2.   checkPointPath: 'hdfs://nameserver/user/flink/rocksdbcheckpoint'
  3.   checkpointTimeout: 360000
  4.   checkpointing: 300000
  5.   maxConcurrentCheckpoints: 1
  6.   minPauseBetweenCheckpoints: 10000
  7.   restartInterval: 60
  8.   restartStrategy: 3
  9. hive:
  10.   defaultDatabase: 'ods'
  11.   hiveConfDir: 'D:/WorkSpace/bigdata-flink-backup/kafka2hive/src/main/resources/'
  12.   sourceTopic: 'topicA,topicB'
  13.   tableName: 'table_name'
  14. kafka-consumer:
  15.   prop:
  16.     auto.offset.reset: 'earliest'
  17.     bootstrap.servers: 'kfk01:9092,kfk02:9092,kfk03:9092'
  18.     enable.auto.commit: 'false'
  19.     fetch.max.bytes: '52428700'
  20.     group.id: 'test'
  21.     isKerberized: '1'
  22.     keytab: 'D:/keytab/test.keytab'
  23.     krb5Conf: 'D:/keytab/krb5.conf'
  24.     max.poll.interval.ms: '300000'
  25.     max.poll.records: '1000'
  26.     principal: 'test@PRE.TEST.COM'
  27.     security_protocol: 'SASL_PLAINTEXT'
  28.     serviceName: 'kafka'
  29.     session.timeout.ms: '600000'
  30.     useTicketCache: 'false'
  31.   topics: 'topicA,topicB'
  32. kafka-producer:
  33.   defaultTopic: 'kafka2hive_error'
  34.   prop:
  35.     acks: 'all'
  36.     batch.size: '1048576'
  37.     bootstrap.servers: 'kfk01:9092,kfk02:9092,kfk03:9092'
  38.     compression.type: 'lz4'
  39.     key.serializer: 'org.apache.kafka.common.serialization.StringSerializer'
  40.     retries: '3'
  41.     value.serializer: 'org.apache.kafka.common.serialization.StringSerializer'
复制代码
2.2.utils

2.2.1.DBConn

  1. import java.sql.*;
  2. public class DBConn {
  3.     private static final String driver = "com.mysql.jdbc.Driver";                //mysql驱动
  4.     private static Connection conn = null;
  5.     private static PreparedStatement ps = null;
  6.     private static ResultSet rs = null;
  7.     private static final CallableStatement cs = null;
  8.     /**
  9.      * 连接数据库
  10.      * @return
  11.      */
  12.     public static Connection conn(String url,String username,String password) {
  13.         Connection conn = null;
  14.         try {
  15.             Class.forName(driver);  //加载数据库驱动
  16.             try {
  17.                 conn = DriverManager.getConnection(url, username, password);  //连接数据库
  18.             } catch (SQLException e) {
  19.                 e.printStackTrace();
  20.             }
  21.         } catch (ClassNotFoundException e) {
  22.             e.printStackTrace();
  23.         }
  24.         return conn;
  25.     }
  26.     /**
  27.      * 关闭数据库链接
  28.      * @return
  29.      */
  30.     public static void close() {
  31.         if(conn != null) {
  32.             try {
  33.                 conn.close();  //关闭数据库链接
  34.             } catch (SQLException e) {
  35.                 e.printStackTrace();
  36.             }
  37.         }
  38.     }
  39. }
复制代码
2.2.2.CommonUtils

  1. @Slf4j
  2. public class CommonUtils {
  3.     public static StreamExecutionEnvironment setCheckpoint(StreamExecutionEnvironment env) throws IOException {
  4. //        ConfigTools.initConf("local");
  5.         Map hdfsMap = (Map) ConfigTools.mapConf.get("hdfs");
  6.         env.enableCheckpointing(((Integer) hdfsMap.get("checkpointing")).longValue(), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟
  7.         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(((Integer) hdfsMap.get("minPauseBetweenCheckpoints")).longValue());
  8.         env.getCheckpointConfig().setCheckpointTimeout(((Integer) hdfsMap.get("checkpointTimeout")).longValue());
  9.         env.getCheckpointConfig().setMaxConcurrentCheckpoints((Integer) hdfsMap.get("maxConcurrentCheckpoints"));
  10.         env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  11.         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  12.                 (Integer) hdfsMap.get("restartStrategy"), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次
  13.                 Time.of(((Integer) hdfsMap.get("restartInterval")).longValue(), TimeUnit.SECONDS) // 延时
  14.         ));
  15.         //设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
  16.         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
  17.         //设置状态后端存储方式
  18.         env.setStateBackend(new RocksDBStateBackend((String) hdfsMap.get("checkPointPath"), true));
  19. //        env.setStateBackend(new FsStateBackend((String) hdfsMap.get("checkPointPath"), true));
  20. //        env.setStateBackend(new HashMapStateBackend(());
  21.         return env;
  22.     }
  23.     public static FlinkKafkaConsumer<ConsumerRecord<String, String>> getKafkaConsumer(Map<String, Object> kafkaConf) throws IOException {
  24.         String[] topics = ((String) kafkaConf.get("topics")).split(",");
  25.         log.info("监听的topic: {}", topics);
  26.         Properties properties = new Properties();
  27.         Map<String, String> kafkaProp = (Map<String, String>) kafkaConf.get("prop");
  28.         for (String key : kafkaProp.keySet()) {
  29.             properties.setProperty(key, kafkaProp.get(key).toString());
  30.         }
  31.         if (!StringUtils.isBlank((String) kafkaProp.get("isKerberized")) && "1".equals(kafkaProp.get("isKerberized"))) {
  32.             System.setProperty("java.security.krb5.conf", kafkaProp.get("krb5Conf"));
  33.             properties.put("security.protocol", kafkaProp.get("security_protocol"));
  34.             properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "
  35.                     + "useTicketCache=" + kafkaProp.get("useTicketCache") + " "
  36.                     + "serviceName="" + kafkaProp.get("serviceName") + "" "
  37.                     + "useKeyTab=true "
  38.                     + "keyTab="" + kafkaProp.get("keytab").toString() + "" "
  39.                     + "principal="" + kafkaProp.get("principal").toString() + "";");
  40.         }
  41.         properties.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
  42.         properties.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
  43.         FlinkKafkaConsumer<ConsumerRecord<String, String>> consumerRecordFlinkKafkaConsumer = new FlinkKafkaConsumer<ConsumerRecord<String, String>>(Arrays.asList(topics), new KafkaDeserializationSchema<ConsumerRecord<String, String>>() {
  44.             @Override
  45.             public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
  46.                 return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {
  47.                 });
  48.             }
  49.             @Override
  50.             public boolean isEndOfStream(ConsumerRecord<String, String> stringStringConsumerRecord) {
  51.                 return false;
  52.             }
  53.             @Override
  54.             public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
  55.                 return new ConsumerRecord<String, String>(
  56.                         record.topic(),
  57.                         record.partition(),
  58.                         record.offset(),
  59.                         record.timestamp(),
  60.                         record.timestampType(),
  61.                         record.checksum(),
  62.                         record.serializedKeySize(),
  63.                         record.serializedValueSize(),
  64.                         new String(record.key() == null ? "".getBytes(StandardCharsets.UTF_8) : record.key(), StandardCharsets.UTF_8),
  65.                         new String(record.value() == null ? "{}".getBytes(StandardCharsets.UTF_8) : record.value(), StandardCharsets.UTF_8));
  66.             }
  67.         }, properties);
  68.         return consumerRecordFlinkKafkaConsumer;
  69.     }
  70. }
复制代码
2.3.conf

2.3.1.ConfigTools

  1. @Slf4j
  2. public class ConfigTools {
  3.     public static Map<String, Object> mapConf;
  4.     /**
  5.      * 获取对应的配置文件
  6.      *
  7.      * @param option
  8.      */
  9.     public static void initConf(String option) {
  10.         String confFile = "/flink_backup_" + option + ".yml";
  11.         try {
  12.             InputStream dumpFile = ConfigTools.class.getResourceAsStream(confFile);
  13.             mapConf = Yaml.loadType(dumpFile, HashMap.class);
  14.         } catch (Exception e) {
  15.             e.printStackTrace();
  16.         }
  17.     }
  18.     /**
  19.      * 获取对应的配置文件
  20.      *
  21.      * @param option
  22.      */
  23.     public static void initMySqlConf(String option, Class clazz) {
  24.         String className = clazz.getName();
  25.         String confFile = "/appconfig.yml";
  26.         Map<String, String> mysqlConf;
  27.         try {
  28.             InputStream dumpFile = ConfigTools.class.getResourceAsStream(confFile);
  29.             mysqlConf = Yaml.loadType(dumpFile, HashMap.class);
  30.             String username = mysqlConf.get("mysql.username");
  31.             String password = mysqlConf.get("mysql.password");
  32.             String url = mysqlConf.get("mysql.url");
  33.             Connection conn = DBConn.conn(url, username, password);
  34.             Map<String, Object> config = getConfig(conn, className, option);
  35.             if (config == null || config.size() == 0) {
  36.                 log.error("获取配置文件失败");
  37.                 return;
  38.             }
  39.             mapConf = config;
  40.         } catch (Exception e) {
  41.             e.printStackTrace();
  42.         }
  43.     }
  44.     private static Map<String, Object> getConfig(Connection conn, String className, String option) throws SQLException {
  45.         PreparedStatement preparedStatement = null;
  46.         try {
  47.             String sql = "select config_context from app_config where app_name = '%s' and config_name = '%s'";
  48.             preparedStatement = conn.prepareStatement(String.format(sql, className, option));
  49.             ResultSet rs = preparedStatement.executeQuery();
  50.             Map<String, String> map = new LinkedHashMap<>();
  51.             String config_context = "";
  52.             while (rs.next()) {
  53.                 config_context = rs.getString("config_context");
  54.             }
  55.             System.out.println("配置信息config_context:"+config_context);
  56. //            if(StringUtils.isNotBlank(config_context)){
  57. //                System.out.println(JSONObject.toJSONString(JSONObject.parseObject(config_context), SerializerFeature.PrettyFormat));
  58. //            }
  59.             Map<String, Object> mysqlConfMap = JSON.parseObject(config_context, Map.class);
  60.             return mysqlConfMap;
  61.         }finally {
  62.             if (preparedStatement != null) {
  63.                 preparedStatement.close();
  64.             }
  65.             if (conn != null) {
  66.                 conn.close();
  67.             }
  68.         }
  69.     }
  70.     public static void main(String[] args) {
  71. //        initMySqlConf("local", TboxPeriodBackoutA3K.class);
  72.         initConf("local");
  73.         String s = JSON.toJSONString(mapConf);
  74.         System.out.println(s);
  75.     }
  76. }
复制代码
2.4.po

2.4.1.SchemaPo

  1. /**
  2. * 字段属性对象
  3. */
  4. @Data
  5. @AllArgsConstructor
  6. @NoArgsConstructor
  7. public class SchemaPo implements Serializable {
  8.     private String signal;
  9.     private String type;
  10. }
复制代码
2.5.kafka2hive

2.5.1.Kafka2Hive-ODS

从 Kafka 中获取到的数据不做任那边置惩罚直接写入到 Hive 的 ODS 层
  1. public class Kafka2Hive_ODS {
  2.     public static Logger logger = Logger.getLogger(Kafka2Hive_ODS.class);
  3.    
  4.     public static void main(String[] args) throws Exception {
  5.         ConfigTools.initMySqlConf(args[0], AcpBackoutAll_X9E_ORIGINAL.class);
  6.         Map<String, Object> mapConf = ConfigTools.mapConf;
  7.         Map<String, Object> kafkaConsumerConf = (Map<String, Object>) mapConf.get("kafka-consumer");
  8.         Map<String, Object> hiveConf = (Map<String, Object>) mapConf.get("hive");
  9.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();
  10.         CommonUtils.setCheckpoint(env);
  11.         EnvironmentSettings fsSettings = EnvironmentSettings
  12.                 .newInstance()
  13.                 .useBlinkPlanner()
  14.                 .inStreamingMode()
  15.                 .build();
  16.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
  17.         StreamStatementSet statementSet = tableEnv.createStatementSet();
  18.         // 使用Hive的sql方言
  19.         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
  20.         //自定义一个名字(没有限制随意取)
  21.         String name = "myhive";
  22.         //这里需要配置hive表中的默认库(不是mysql的hive元数据库)
  23.         String defaultDatabase = "ods";
  24.         //hive-site.xml文件目录
  25.         String hiveConfDir = (String) hiveConf.get("hiveConfDir");
  26.         //创建HiveCatalog
  27.         HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, "3.1.2");
  28.         //注册HiveCatalog
  29.         tableEnv.registerCatalog("myhive", hive);
  30.         //使用HiveCatalog
  31.         tableEnv.useCatalog("myhive");
  32. //        FlinkKafkaConsumer<String> myConsumer = CommonUtils.getKafkaConsumer();
  33.         FlinkKafkaConsumer<ConsumerRecord<String, String>> myConsumer = CommonUtils.getKafkaConsumer(kafkaConsumerConf);
  34.         DataStream<ConsumerRecord<String, String>> stream = env.addSource(myConsumer);
  35.         String tableName = (String) hiveConf.get("tableName");
  36.         List<FieldSchema> schemas = hive.getHiveTable(new ObjectPath(defaultDatabase, tableName)).getSd().getCols();
  37.         List<FieldSchema> partitionKeys = hive.getHiveTable(new ObjectPath(defaultDatabase, tableName)).getPartitionKeys();
  38.         schemas.addAll(partitionKeys);
  39.         List<SchemaPo> schemaPos = new ArrayList<>();
  40.         List<String> fieldLists = new ArrayList<>();
  41.         List<TypeInformation> typeList = new ArrayList<>();
  42.         for (FieldSchema schema : schemas) {
  43.             SchemaPo schemaPo = new SchemaPo();
  44.             schemaPo.setSignal(schema.getName());
  45.             schemaPo.setType(schema.getType());
  46.             schemaPos.add(schemaPo);
  47.             fieldLists.add(schema.getName());
  48.             String type = schema.getType();
  49.             if (type.equalsIgnoreCase("bigint")) {
  50.                 typeList.add(Types.LONG);
  51.             } else {
  52.                 typeList.add(Types.STRING);
  53.             }
  54.         }
  55.         String[] fieldNames = fieldLists.toArray(new String[fieldLists.size()]);
  56.         TypeInformation[] types = typeList.toArray(new TypeInformation[typeList.size()]);
  57.         SingleOutputStreamOperator<Row> originalRow = stream.flatMap(new KafkaMsgFormatFunction(schemaPos), new RowTypeInfo(types, fieldNames)).uid("ORIGINAL");
  58.         tableEnv.createTemporaryView("originalRow", originalRow);
  59.         StringBuilder sql = new StringBuilder();
  60.         tableEnv.executeSql("alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore')");
  61.         sql.append("insert into `table_name` select ");
  62.         sql.append(" * ");
  63.         sql.append(" from `myhive`.`ods`.originalRow");
  64.         tableEnv.executeSql(sql.toString());
  65. //        env.execute();
  66.     }
  67.     static class KafkaMsgFormatFunction extends RichFlatMapFunction<ConsumerRecord<String,String>, Row> {
  68.         private List<SchemaPo> schemaPos;
  69.         public KafkaMsgFormatFunction(List<SchemaPo> schemaPos) {
  70.             this.schemaPos = schemaPos;
  71.         }
  72.         @Override
  73.         public void open(Configuration parameters) {
  74.         
  75.         }
  76.         @Override
  77.         public void flatMap(ConsumerRecord<String,String> record, Collector<Row> out) {
  78.             String key = null;
  79.             try {
  80.                 HashMap<String, Object> infoMap = JSON.parseObject((String) record.value(), HashMap.class);
  81.                
  82.                 for (String signalkey : infoMap.keySet()) {
  83.                     resultMap.put(signalkey.toLowerCase(), String.valueOf(infoMap.get(signalkey)));
  84.                 }
  85.                 Row row = new Row(schemaPos.size());
  86.                 for (int i = 0; i < schemaPos.size(); i++) {
  87.                     SchemaPo schemaPo = schemaPos.get(i);
  88.                     String v = resultMap.get(schemaPo.getSignal());
  89.                     if (StringUtils.isBlank(v)) {
  90.                         row.setField(i, null);
  91.                         continue;
  92.                     }
  93.                     if ("bigint".equalsIgnoreCase(schemaPo.getType())) {
  94.                         Long svalue = Long.valueOf(resultMap.get(schemaPo.getSignal()));
  95.                         row.setField(i, svalue);
  96.                     } else {
  97.                         String svalue = resultMap.get(schemaPo.getSignal());
  98.                         row.setField(i, svalue);
  99.                     }
  100.                 }
  101.                 out.collect(row);
  102.             } catch (Exception e) {
  103.                     e.printStackTrace();
  104.             }
  105.         }
  106.     }
  107. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

忿忿的泥巴坨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表