Flink从入门到实践(一):Flink入门、Flink部署

打印 上一主题 下一主题

主题 849|帖子 849|积分 2549

系列文章索引

Flink从入门到实践(一):Flink入门、Flink部署
Flink从入门到实践(二):Flink DataStream API
Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC
一、快速上手

1、导包

  1. <!-- fink 相关依赖 -->
  2. <dependency>
  3.     <groupId>org.apache.flink</groupId>
  4.     <artifactId>flink-clients</artifactId>
  5.     <version>1.18.0</version>
  6. </dependency>
复制代码
2、求词频demo

留意!自Flink 1.18以来,所有Flink DataSet api都已弃用,并将在未来的Flink主版本中删除。您仍旧可以在DataSet中构建应用程序,但是您应该转向DataStream和/或Table API。
(1)要读取的数据

定义data内容:
   pk,pk,pk
ruoze,ruoze
hello
  (2)demo1:批处置惩罚(离线处置惩罚)

  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.DataSource;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.util.Collector;
  7. /**
  8. * 使用Flink进行批处理,并统计wc
  9. *
  10. *
  11. * 结果:
  12. * (bye,2)
  13. * (hello,3)
  14. * (hi,1)
  15. */
  16. public class BatchWordCountApp {
  17.     public static void main(String[] args) throws Exception {
  18.         // step0: Spark中有上下文,Flink中也有上下文,MR中也有
  19.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  20.         // step1: 读取文件内容  ==> 一行一行的字符串而已
  21.         DataSource<String> source = env.readTextFile("data/wc.data");
  22.         // step2: 每一行的内容按照指定的分隔符进行拆分  1:N
  23.         source.flatMap(new FlatMapFunction<String, String>() {
  24.                     /**
  25.                      *
  26.                      * @param value 读取到的每一行数据
  27.                      * @param out 输出的集合
  28.                      */
  29.                     @Override
  30.                     public void flatMap(String value, Collector<String> out) throws Exception {
  31.                         // 使用,进行分割
  32.                         String[] splits = value.split(",");
  33.                         for(String split : splits) {
  34.                             out.collect(split.toLowerCase().trim());
  35.                         }
  36.                     }
  37.                 })
  38.                 .map(new MapFunction<String, Tuple2<String,Integer>>() {
  39.                     /**
  40.                      *
  41.                      * @param value 每一个元素 (hello, 1)(hello, 1)(hello, 1)
  42.                      */
  43.                     @Override
  44.                     public Tuple2<String, Integer> map(String value) throws Exception {
  45.                         return Tuple2.of(value, 1);
  46.                     }
  47.                 })
  48.                 .groupBy(0)  // step4: 按照单词进行分组  groupBy是离线的api,传下标
  49.                 .sum(1)  // ==> 求词频 sum,传下标
  50.                 .print(); // 打印
  51.     }
  52. }
复制代码
(3)demo2 - lambda优化:批处置惩罚(离线处置惩罚)

  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.ExecutionEnvironment;
  3. import org.apache.flink.api.java.operators.DataSource;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.util.Collector;
  6. /**
  7. * lambda表达式优化
  8. */
  9. public class BatchWordCountAppV2 {
  10.     public static void main(String[] args) throws Exception {
  11.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  12.         DataSource<String> source = env.readTextFile("data/wc.data");
  13.         /**
  14.          * lambda语法: (参数1,参数2,参数3...) -> {函数体}
  15.          */
  16. //        source.map(String::toUpperCase).print();
  17.         // 使用了Java泛型,由于泛型擦除的原因,需要显示的声明类型信息
  18.         source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
  19.             String[] splits = value.split(",");
  20.             for(String split : splits) {
  21.                 out.collect(Tuple2.of(split.trim(), 1));
  22.             }
  23.         }).returns(Types.TUPLE(Types.STRING, Types.INT))
  24.                 .groupBy(0).sum(1).print();
  25.     }
  26. }
复制代码
(4)demo3:流处置惩罚(实时处置惩罚)

  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.util.Collector;
  6. /**
  7. * 流式处理
  8. * 结果:
  9. * 8> (hi,1)
  10. * 6> (hello,1)
  11. * 5> (bye,1)
  12. * 6> (hello,2)
  13. * 6> (hello,3)
  14. * 5> (bye,2)
  15. */
  16. public class StreamWCApp {
  17.     public static void main(String[] args) throws Exception {
  18.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19.         DataStreamSource<String> source = env.readTextFile("data/wc.data");
  20.         source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
  21.             String[] splits = value.split(",");
  22.             for(String split : splits) {
  23.                 out.collect(Tuple2.of(split.trim(), 1));
  24.             }
  25.         }).returns(Types.TUPLE(Types.STRING, Types.INT))
  26.                 .keyBy(x -> x.f0) // 这种写法一定要掌握!流式的并没有groupBy,而是keyBy!根据第一个值进行sum
  27.                 .sum(1).print();
  28.         // 需要手动开启
  29.         env.execute("作业名字");
  30.     }
  31. }
复制代码
(5)总结:实时vs离线

离线:效果是一次性出来的。
实时:来一个数据处置惩罚一次,数据是带状态的。
(6)demo4:批流一体

  1. import org.apache.flink.api.common.RuntimeExecutionMode;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.util.Collector;
  7. /**
  8. * 采用批流一体的方式进行处理
  9. */
  10. public class FlinkWordCountApp {
  11.     public static void main(String[] args) throws Exception {
  12.         // 统一使用StreamExecutionEnvironment这个执行上下文环境
  13.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 选择处理方式 批/流/自动
  15.         DataStreamSource<String> source = env.readTextFile("data/wc.data");
  16.         source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
  17.             String[] splits = value.split(",");
  18.             for(String split : splits) {
  19.                 out.collect(Tuple2.of(split.trim(), 1));
  20.             }
  21.         }).returns(Types.TUPLE(Types.STRING, Types.INT))
  22.                 .keyBy(x -> x.f0) // 这种写法一定要掌握
  23.                 .sum(1).print();
  24.         // 执行
  25.         env.execute("作业名字");
  26.     }
  27. }
复制代码
(7)对接Socket

  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.util.Collector;
  6. /**
  7. * 使用Flink对接Socket的数据并进行词频统计
  8. *
  9. * 大数据处理的三段论: 输入  处理  输出
  10. *
  11. */
  12. public class FlinkSocket {
  13.     public static void main(String[] args) throws Exception {
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         /**
  16.          * 数据源:可以通过多种不同的数据源接入数据:socket  kafka  text
  17.          *
  18.          * 官网上描述的是 env.addSource(...)
  19.          *
  20.          * socket的方式对应的并行度是1,因为它来自于SourceFunction的实现
  21.          */
  22.         DataStreamSource<String> source = env.socketTextStream("localhost", 9527);
  23.         System.out.println(source.getParallelism());
  24.         // 处理
  25.         source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {
  26.                     String[] splits = value.split(",");
  27.                     for(String split : splits) {
  28.                         out.collect(Tuple2.of(split.trim(), 1));
  29.                     }
  30.                 }).returns(Types.TUPLE(Types.STRING, Types.INT))
  31.                 .keyBy(x -> x.f0) // 这种写法一定要掌握
  32.                 .sum(1)
  33.                 // 数据输出
  34.                 .print();  // 输出到外部系统中去
  35.         env.execute("作业名字");
  36.     }
  37. }
复制代码
二、Flink部署

1、Flink架构

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/
Flink是一个分布式的带有状态管理的计算框架,可以运行在常用/常见的集群资源管理器上(YARN、K8S)。
一个JobManager(和谐/分配),一个或多个TaskManager(工作)。


2、Standalone部署

按照官网下载执行即可:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation/
可以根据官网来安装,必要下载、解压、安装。
也可以使用docker安装。
启动之后,localhost:8081就可以访问管控台了。
3、自运行flink-web

  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-runtime-web</artifactId>
  4.     <version>1.18.0</version>
  5. </dependency>
复制代码
  1. Configuration configuration = new Configuration();
  2. configuration.setInteger("rest.port", 8082); // 指定web端口,开启webUI,不写的话默认8081
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
  4. // 新版本可以直接使用getExecutionEnvironment(conf)
复制代码
以上亲测并不好使……详细缘故原由未知,设置为flink1.16版本大概就好用了。
4、通过参数传递

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. // 通过参数传递进来Flink引用程序所需要的参数,flink自带的工具类
  3. ParameterTool tool = ParameterTool.fromArgs(args);
  4. String host = tool.get("host");
  5. int port = tool.getInt("port");
  6. DataStreamSource<String> source = env.socketTextStream(host, port);
  7. System.out.println(source.getParallelism());
复制代码
可以通过命令行参数:–host localhost --port 8765
5、通过webui提交job



6、制止作业


7、常用命令

  1. # 查看作业列表
  2. flink list -a  # 所有
  3. flink list -r  # 正在运行的
  4. # 停止作业
  5. flink cancel <jobid>
  6. # 提交job
  7. # -c,--class <classname> 指定main方法
  8. # -C,--classpath <url> 指定classpath
  9. # -p,--parallelism <paralle> 指定并行度
  10. flink run -c com.demo.FlinkDemo FlinkTest.jar
复制代码
8、集群

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/#flink-application-execution
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/
单机部署Session Mode和Application Mode:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/standalone/overview/
k8s:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/native_kubernetes/
YARN:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/yarn/
参考资料

https://flink.apache.org/
https://nightlies.apache.org/flink/flink-docs-stable/

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

拉不拉稀肚拉稀

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表