基于docker安装flink

打印 上一主题 下一主题

主题 900|帖子 900|积分 2700

环境准备

Flink

docker-compose方式

  1. version: "3"
  2. services:
  3.   jobmanager:
  4.     image: flink:latest
  5.     expose:
  6.       - "6123"
  7.     ports:
  8.       - "8081:8081"
  9.     command: jobmanager
  10.     environment:
  11.       - JOB_MANAGER_RPC_ADDRESS=jobmanager
  12.   taskmanager:
  13.     image: flink:latest
  14.     expose:
  15.       - "6121"
  16.       - "6122"
  17.     depends_on:
  18.       - jobmanager
  19.     command: taskmanager
  20.     links:
  21.       - "jobmanager:jobmanager"
  22.     environment:
  23.       - JOB_MANAGER_RPC_ADDRESS=jobmanager
复制代码
前端访问地点: http://192.168.56.112:8081/#/overview
二进制部署

  1. wget https://archive.apache.org/dist/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgz
  2. vim conf/flink-conf.yaml
  3. jobmanager.rpc.address: 192.168.56.112 # 修改为本机ip
  4. ./bin/start-cluster.sh
复制代码
Kafka

  1. version: '2'
  2. services:
  3.   zookeeper:
  4.     image: wurstmeister/zookeeper   ## 镜像
  5.     ports:
  6.       - "2181:2181"                 ## 对外暴露的端口号
  7.   kafka:
  8.     image: wurstmeister/kafka       ## 镜像
  9.     volumes:
  10.         - /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)
  11.     ports:
  12.       - "9092:9092"
  13.     environment:
  14.       KAFKA_ADVERTISED_HOST_NAME: 192.168.56.112    ## 修改:宿主机IP
  15.       KAFKA_ZOOKEEPER_CONNECT: 192.168.56.112:2181       ## 卡夫卡运行是基于zookeeper的
  16.   kafka-manager:
  17.     image: sheepkiller/kafka-manager                ## 镜像:开源的web管理kafka集群的界面
  18.     environment:
  19.         ZK_HOSTS:                    ## 修改:宿主机IP
  20.     ports:
  21.       - "9000:9000"
复制代码
Mysql

  1. docker run -d -p3306:3306 --name=mysql57 -e MYSQL_ROOT_PASSWORD=111111 mysql:5.7
复制代码

Flink 实行 SQL下令

进入SQL客户端CLI

  1. docker exec  -it flink_jobmanager_1  /bin/bash
  2. ./bin/sql-client.sh
复制代码

实行SQL查询

  1. SELECT 'Hello World';
复制代码

表格模式

表格模式(table mode)在内存中物化结果,并将结果用规则的分页表格的形式可视化展示出来。实行如下下令启用:
  1. SET sql-client.execution.result-mode = table;
复制代码
可以使用如下查询语句检察差异模式的的运行结果:
  1. SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
复制代码

变更日记模式

变更日记模式(changelog mode)不会物化结果。可视化展示由插入(+)和撤销(-)构成的持续查询结果流。
  1. SET sql-client.execution.result-mode = changelog;
复制代码

Tableau模式

Tableau模式(tableau mode)更靠近传统的数据库,会将实行的结果以制表的形式直接打在屏幕之上。具体显示的内容取决于作业实行模式(execution.type):
  1. SET sql-client.execution.result-mode = tableau;
复制代码

   注意:当你在流式查询上使用这种模式时,Flink 会将结果持续的打印在当前的控制台上。如果流式查询的输入是有限数据集,那么 Flink 在处理惩罚完全部的输入数据之后,作业会自动停止,同时控制台上的打印也会自动停止。如果你想提前竣事这个查询,那么可以直接使用 CTRL-C 按键,这个会停止作业同时停止在控制台上的打印。
  窗口计算

TUMBLE(time_attr, interval) 界说一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为隔断对行举行分组。滚动窗口可以界说在事件时间(批处理惩罚、流处理惩罚)或处理惩罚时间(流处理惩罚)上。
窗口计算

TUMBLE(time_attr, interval) 界说一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为隔断对行举行分组。滚动窗口可以界说在事件时间(批处理惩罚、流处理惩罚)或处理惩罚时间(流处理惩罚)上。
滚动窗口demo

根据订单信息使用kafka作为数据源表,JDBC作为数据结果表统计用户在5秒内的订单数目,并根据窗口的订单id和窗口开启时间作为主键,将结果及时统计到JDBC中:

  • 在MySQL的flink数据库下创建表order_count,创建语句如下:
  1. CREATE TABLE `flink`.`order_count` (
  2.         `user_id` VARCHAR(32) NOT NULL,
  3.         `window_start` TIMESTAMP NOT NULL,
  4.         `window_end` TIMESTAMP NULL,
  5.         `total_num` BIGINT UNSIGNED NULL,
  6.         PRIMARY KEY (`user_id`, `window_start`)
  7. )        ENGINE = InnoDB
  8.         DEFAULT CHARACTER SET = utf8mb4
  9.         COLLATE = utf8mb4_general_ci;
复制代码

  • 创建flink opensource sql作业,并提交运行作业
  1. CREATE TABLE orders (
  2.   order_id string,
  3.   order_channel string,
  4.   order_time timestamp(3),
  5.   pay_amount double,
  6.   real_pay double,
  7.   pay_time string,
  8.   user_id string,
  9.   user_name string,
  10.   area_id string,
  11.   watermark for order_time as order_time - INTERVAL '3' SECOND
  12. ) WITH (
  13.   'connector' = 'kafka',
  14.   'topic' = 'order_topic',
  15.   'properties.bootstrap.servers' = '192.168.56.112:9092',
  16.   'properties.group.id' = 'order_group',
  17.   'scan.startup.mode' = 'latest-offset',
  18.   'format' = 'json'
  19. );
  20. CREATE TABLE jdbcSink (
  21.   user_id string,
  22.   window_start timestamp(3),
  23.   window_end timestamp(3),
  24.   total_num BIGINT,
  25.   primary key (user_id, window_start) not enforced
  26. ) WITH (
  27.   'connector' = 'jdbc',
  28.   'url' = 'jdbc:mysql://192.168.56.112:3306/flink',
  29.   'table-name' = 'order_count',
  30.   'username' = 'root',
  31.   'password' = '111111',
  32.   'sink.buffer-flush.max-rows' = '1'
  33. );
  34. SELECT
  35.     'WINDOW',-- window_start,window_end,
  36.     group_key,record_num,create_time,
  37.     SUM(record_num) OVER w AS sum_amount
  38. FROM temp
  39. WINDOW w AS (
  40.   PARTITION BY group_key
  41.   ORDER BY rowtime
  42.   RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)
  43.   
  44. select
  45.     user_id,
  46.     TUMBLE_START(order_time, INTERVAL '5' SECOND),
  47.     TUMBLE_END(order_time, INTERVAL '5' SECOND),
  48.     COUNT(*) from orders
  49.     GROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;
  50.    
  51. SELECT
  52.     'WINDOW',
  53.     user_id,order_id,real_pay,order_time
  54.     COUNT(*) OVER w AS sum_amount
  55. FROM orders
  56. WINDOW w AS (
  57.   PARTITION BY user_id
  58.   ORDER BY order_time
  59.   RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW)
  60.    
  61. insert into jdbcSink select
  62.     user_id,
  63.     TUMBLE_START(order_time, INTERVAL '5' SECOND),
  64.     TUMBLE_END(order_time, INTERVAL '5' SECOND),
  65.     COUNT(*) from orders
  66.     GROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;
复制代码

  • Kafka 相关操作
  1. bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --list
  2. bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --create --replication-factor 1 --partitions 1 --topic order_topic
  3. bin/kafka-console-producer.sh --broker-list 192.168.56.112:9092 --topic order_topic
  4. bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.112:9092 --topic order_topic --from-beginning
  5. bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --describe --topic order_topic
  6. bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --delete --topic order_topic
复制代码
发送数据样例
  1. {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-09-26 15:20:11", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  2. {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:28:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  3. {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  4. {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  5. {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  6. {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  7. {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
复制代码
滑动窗口

  1. SELECT * FROM TABLE(
  2.     HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND));
  3. SELECT * FROM TABLE(
  4.     HOP(
  5.       DATA => TABLE orders,
  6.       TIMECOL => DESCRIPTOR(order_time),
  7.       SLIDE => INTERVAL '5' MINUTES,
  8.       SIZE => INTERVAL '10' MINUTES));
  9.       
  10.       
  11. SELECT window_start, window_end, SUM(pay_amount)
  12.   FROM TABLE(
  13.     HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND))
  14.   GROUP BY window_start, window_end;
复制代码
踩坑


  • Could not find any factory for identifier ‘kafka’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.
检察flink version
flink-sql-connector-kafka-1.17.1.jar
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka/1.17.1
下载对应版本jar,放到lib目次下,重启

  • Could not find any factory for identifier ‘jdbc’ that implements 'org.apache.flink.table.factories.DynamicTableFactory
    flink-connector-jdbc-3.1.0-1.17.jar
    https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
  • Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.31

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天津储鑫盛钢材现货供应商

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表