环境准备
Flink
docker-compose方式
- version: "3"
- services:
- jobmanager:
- image: flink:latest
- expose:
- - "6123"
- ports:
- - "8081:8081"
- command: jobmanager
- environment:
- - JOB_MANAGER_RPC_ADDRESS=jobmanager
- taskmanager:
- image: flink:latest
- expose:
- - "6121"
- - "6122"
- depends_on:
- - jobmanager
- command: taskmanager
- links:
- - "jobmanager:jobmanager"
- environment:
- - JOB_MANAGER_RPC_ADDRESS=jobmanager
复制代码 前端访问地点: http://192.168.56.112:8081/#/overview
二进制部署
- wget https://archive.apache.org/dist/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgz
- vim conf/flink-conf.yaml
- jobmanager.rpc.address: 192.168.56.112 # 修改为本机ip
- ./bin/start-cluster.sh
复制代码 Kafka
- version: '2'
- services:
- zookeeper:
- image: wurstmeister/zookeeper ## 镜像
- ports:
- - "2181:2181" ## 对外暴露的端口号
- kafka:
- image: wurstmeister/kafka ## 镜像
- volumes:
- - /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)
- ports:
- - "9092:9092"
- environment:
- KAFKA_ADVERTISED_HOST_NAME: 192.168.56.112 ## 修改:宿主机IP
- KAFKA_ZOOKEEPER_CONNECT: 192.168.56.112:2181 ## 卡夫卡运行是基于zookeeper的
- kafka-manager:
- image: sheepkiller/kafka-manager ## 镜像:开源的web管理kafka集群的界面
- environment:
- ZK_HOSTS: ## 修改:宿主机IP
- ports:
- - "9000:9000"
复制代码 Mysql
- docker run -d -p3306:3306 --name=mysql57 -e MYSQL_ROOT_PASSWORD=111111 mysql:5.7
复制代码
Flink 实行 SQL下令
进入SQL客户端CLI
- docker exec -it flink_jobmanager_1 /bin/bash
- ./bin/sql-client.sh
复制代码
实行SQL查询
表格模式
表格模式(table mode)在内存中物化结果,并将结果用规则的分页表格的形式可视化展示出来。实行如下下令启用:
- SET sql-client.execution.result-mode = table;
复制代码 可以使用如下查询语句检察差异模式的的运行结果:
- SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
复制代码
变更日记模式
变更日记模式(changelog mode)不会物化结果。可视化展示由插入(+)和撤销(-)构成的持续查询结果流。
- SET sql-client.execution.result-mode = changelog;
复制代码
Tableau模式
Tableau模式(tableau mode)更靠近传统的数据库,会将实行的结果以制表的形式直接打在屏幕之上。具体显示的内容取决于作业实行模式(execution.type):
- SET sql-client.execution.result-mode = tableau;
复制代码
注意:当你在流式查询上使用这种模式时,Flink 会将结果持续的打印在当前的控制台上。如果流式查询的输入是有限数据集,那么 Flink 在处理惩罚完全部的输入数据之后,作业会自动停止,同时控制台上的打印也会自动停止。如果你想提前竣事这个查询,那么可以直接使用 CTRL-C 按键,这个会停止作业同时停止在控制台上的打印。
窗口计算
TUMBLE(time_attr, interval) 界说一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为隔断对行举行分组。滚动窗口可以界说在事件时间(批处理惩罚、流处理惩罚)或处理惩罚时间(流处理惩罚)上。
窗口计算
TUMBLE(time_attr, interval) 界说一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为隔断对行举行分组。滚动窗口可以界说在事件时间(批处理惩罚、流处理惩罚)或处理惩罚时间(流处理惩罚)上。
滚动窗口demo
根据订单信息使用kafka作为数据源表,JDBC作为数据结果表统计用户在5秒内的订单数目,并根据窗口的订单id和窗口开启时间作为主键,将结果及时统计到JDBC中:
- 在MySQL的flink数据库下创建表order_count,创建语句如下:
- CREATE TABLE `flink`.`order_count` (
- `user_id` VARCHAR(32) NOT NULL,
- `window_start` TIMESTAMP NOT NULL,
- `window_end` TIMESTAMP NULL,
- `total_num` BIGINT UNSIGNED NULL,
- PRIMARY KEY (`user_id`, `window_start`)
- ) ENGINE = InnoDB
- DEFAULT CHARACTER SET = utf8mb4
- COLLATE = utf8mb4_general_ci;
复制代码
- 创建flink opensource sql作业,并提交运行作业
- CREATE TABLE orders (
- order_id string,
- order_channel string,
- order_time timestamp(3),
- pay_amount double,
- real_pay double,
- pay_time string,
- user_id string,
- user_name string,
- area_id string,
- watermark for order_time as order_time - INTERVAL '3' SECOND
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'order_topic',
- 'properties.bootstrap.servers' = '192.168.56.112:9092',
- 'properties.group.id' = 'order_group',
- 'scan.startup.mode' = 'latest-offset',
- 'format' = 'json'
- );
- CREATE TABLE jdbcSink (
- user_id string,
- window_start timestamp(3),
- window_end timestamp(3),
- total_num BIGINT,
- primary key (user_id, window_start) not enforced
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://192.168.56.112:3306/flink',
- 'table-name' = 'order_count',
- 'username' = 'root',
- 'password' = '111111',
- 'sink.buffer-flush.max-rows' = '1'
- );
- SELECT
- 'WINDOW',-- window_start,window_end,
- group_key,record_num,create_time,
- SUM(record_num) OVER w AS sum_amount
- FROM temp
- WINDOW w AS (
- PARTITION BY group_key
- ORDER BY rowtime
- RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)
-
- select
- user_id,
- TUMBLE_START(order_time, INTERVAL '5' SECOND),
- TUMBLE_END(order_time, INTERVAL '5' SECOND),
- COUNT(*) from orders
- GROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;
-
- SELECT
- 'WINDOW',
- user_id,order_id,real_pay,order_time
- COUNT(*) OVER w AS sum_amount
- FROM orders
- WINDOW w AS (
- PARTITION BY user_id
- ORDER BY order_time
- RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW)
-
- insert into jdbcSink select
- user_id,
- TUMBLE_START(order_time, INTERVAL '5' SECOND),
- TUMBLE_END(order_time, INTERVAL '5' SECOND),
- COUNT(*) from orders
- GROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;
复制代码- bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --list
- bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --create --replication-factor 1 --partitions 1 --topic order_topic
- bin/kafka-console-producer.sh --broker-list 192.168.56.112:9092 --topic order_topic
- bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.112:9092 --topic order_topic --from-beginning
- bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --describe --topic order_topic
- bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --delete --topic order_topic
复制代码 发送数据样例
- {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-09-26 15:20:11", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:28:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
- {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
复制代码 滑动窗口
- SELECT * FROM TABLE(
- HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND));
- SELECT * FROM TABLE(
- HOP(
- DATA => TABLE orders,
- TIMECOL => DESCRIPTOR(order_time),
- SLIDE => INTERVAL '5' MINUTES,
- SIZE => INTERVAL '10' MINUTES));
-
-
- SELECT window_start, window_end, SUM(pay_amount)
- FROM TABLE(
- HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND))
- GROUP BY window_start, window_end;
复制代码 踩坑
- Could not find any factory for identifier ‘kafka’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.
检察flink version
flink-sql-connector-kafka-1.17.1.jar
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka/1.17.1
下载对应版本jar,放到lib目次下,重启
- Could not find any factory for identifier ‘jdbc’ that implements 'org.apache.flink.table.factories.DynamicTableFactory
flink-connector-jdbc-3.1.0-1.17.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
- Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.31
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |