IT评测·应用市场-qidao123.com技术社区

标题: 踩坑完毕:手把手带你使用Flink尝鲜Paimon入门案例(强烈建议收藏) [打印本页]

作者: 用户国营    时间: 2025-3-26 10:26
标题: 踩坑完毕:手把手带你使用Flink尝鲜Paimon入门案例(强烈建议收藏)
本文为大家讲授如何使用Flink完成Paimon官方的入门案例,建议大家收藏(对英文文档有恐惧感)。
本文会用到Flink情况,还不清晰如何摆设Flink的同砚可以检察:
《基于Flink CDC实现Mysql实时同步到Doris系列教程一:Flink情况的摆设》
本文演示使用的Ubuntu情况。
下载并解压Flink

下载最新版本的Flink
  1. wget https://dlcdn.apache.org/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz
复制代码

解压后会得到flink-1.19.1文件夹,并cd进入该目录。后续全部的操纵都是在该目录下执行。
  1. tar -xzf flink-1.19.1-bin-scala_2.12.tgz
  2. cd flink-1.19.1
复制代码

下载Paimon的jar包

使用Paimon必要用到paimon-flink.jar和flink-shaded-hadoop-2-uber-2.8.3-10.0.jarjar包,下载与Flink相对应的版本,并复制到flink-1.19.1/lib目录下。
  1. wget https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-flink-1.19/0.9-SNAPSHOT/paimon-flink-1.19-0.9-20240628.002224-23.jar
  2. mv paimon-flink-1.19-0.9-20240628.002224-23.jar lib/
  3. wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
  4. mv flink-shaded-hadoop-2-uber-2.8.3-10.0.jar lib/
复制代码


启动Flink

在启动Flink之前,一定要注意,必要修改一个设置,否则后续会报错(非常打击士气)。
在flink-1.19.1/conf/conf.yaml中找到taskmanager,修改numberOfTaskSlots设置为2,numberOfTaskSlots表示可以同时运行的使命数,也可以修改的更大一点。


启动Flink情况
  1. ./bin/start-cluster.sh
复制代码

访问Flink Web UIhttp://localhost:8081。
启动Flink Sql客户端
  1. ./bin/sql-client.sh
复制代码


出现上图则代表成功了。
创建Catalog和表

  1. create catalog my_catalog with (
  2. 'type'='paimon',
  3. 'warehouse'='file:/home/liuyq/paimon'
  4. );
复制代码
注意,此处file:/home/liuyq/paimon为本地的一个绝对路径,会主动创建paimon文件夹。

可以看到,paimon文件夹下会有default.db,代表我们创建的my_catalog下有一个默认的default数据库。
切换到我们要使用的my_catalog下
  1. use catalog my_catalog;
复制代码

创建word_count表
  1. create table word_count (
  2. word string primary key not enforced,
  3. cnt bigint
  4. );
复制代码

此时,我们观察一下文件夹,发现多出来word_count文件夹,对应的正好就是我们刚刚创建好的表。
我们再看一下文件夹内的文件

写入数据

创建一个暂时表,用于模仿天生单词
  1. create temporary table word_table (
  2. word string
  3. ) with (
  4. 'connector' = 'datagen',
  5. 'fields.word.length' = '1'
  6. );
复制代码
datagen连接器用于模仿天生数据,该表中只有一个字符串字段word,而且使用fields.word.length指定天生的数据长度为1,注意此处1要带引号,否则会报错。天生的值如:a、b、c、1、2等。
设置checkpoint时长并写入数据
  1. set 'execution.checkpointing.interval' = '10 s';
  2. insert into word_count select word, count(*) from word_table group by word;
复制代码

insert into操纵相称于给flink中提交了一个实时计算的使命。该使命会把10s内word_table新产生的word数据聚合计算个数后把效果存入到word_count表中。

OLAP查询

设置查询效果的样式为tableau
  1. set 'sql-client.execution.result-mode' = 'tableau';
复制代码

从流处理模式切换到批处理模式
  1. reset 'execution.checkpointing.interval';
  2. set 'execution.runtime-mode' = 'batch';
复制代码

查询word_count表中的数据
  1. select * from word_count;
复制代码

如何你卡在这里没有任何反应,说明你忘记修改numberOfTaskSlots设置了,只必要退出后修改,再重复上面的操纵即可。

批处理模式下的一次查询,也相称于给Flink中提交了一个使命,只不外该使命执行一次完成后就竣事了。
流式查询

切换到流模式下
  1. set 'execution.runtime-mode' = 'streaming';
复制代码

实时查询
  1. select `interval`, count(*) as interval_cnt from (select cnt / 10000 as `interval` from word_count) group by `interval`;
复制代码


拉到最下面,仔细观察,你会发现数据每个约莫10s会有变更。
此时,也相称于在Flink中提交了一个使命

在界面上取消使命,就会看到刚才的实时查询也中断了。

退出

退出SQL客户端
  1. exit;
复制代码

停止Flink情况
  1. ./bin/stop-cluster.sh
复制代码

至此,一个我们完整体验了在Flink中使用Paimon。
进阶

现在,如果问你Paimon是什么?你心中会有怎样的答案呢。我们再看一下/paimon/default.db/word_count下面有什么

这时我们是不是可以说Paimon是具有严格布局的一组文件夹和文件的组合,这些文件会描述出数据库、数据表、数据等,用这种方式表达数据库也正是数据湖。
有一点点绕,希望大家可以体会到。想要学习更多Paimon干系的内容,可以关注:遇码,复兴Paimon获取官方文档。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4