大数据:快速入门Scala+Flink

鼠扑  论坛元老 | 2025-3-12 20:24:54 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1067|帖子 1067|积分 3201

一、什么是Scala

Scala 是一种多范式编程语言,它结合了面向对象编程和函数式编程的特性。Scala 这个名字是“可扩展语言”(Scalable Language)的缩写,意味着它被设计为能够顺应不同规模的项目,从小型脚本到大型分布式体系。
以下是 Scala 的一些重要特点:

  • 兼容 Java:Scala 代码可以编译成 Java 字节码,并且可以在任何支持 Java 的平台上运行。这意味着 Scala 可以直接利用大量的 Java 库和框架。
  • 简洁性:Scala 提供了一种更加简洁的方式来表达复杂的逻辑。通过模式匹配、范例推断等特性,程序员可以用较少的代码完成更多的工作。
  • 函数式编程:Scala 支持函数作为一等公民,允许高阶函数、不可变数据结构和懒惰求值等函数式编程概念。
  • 面向对象:Scala 同样支持面向对象编程的全部核心概念,包括类、对象、继承、封装等。
  • 范例安全:Scala 有一个强大的静态范例体系,这有助于在编译时捕捉错误并提供更好的代码质量。
  • 并发模子:Scala 提供了 Actor 模子来处理并发问题,这是通过 Akka 框架实现的,非常得当构建高并发的应用程序。
  • 泛型:Scala 对泛型的支持非常强大,提供了更灵活和安全的泛型机制。
  • 交互性:Scala 有一个 REPL(读取-求值-打印循环)环境,允许开发者快速测试代码片段。
Scala 被广泛用于开发大规模的数据处理应用、Web 应用以及企业级软件。由于其与 Java 的良好集成,很多公司采用 Scala 来增强他们的 Java 生态体系中的应用程序。例如,Apache Spark 就是用 Scala 编写的,它是一个流行的大数据处理框架。
二、什么是Flink

Apache Flink 是一个开源的流处理框架,它为分布式、高性能、随时可用以及正确的流处理应用程序提供支持。Flink 的核心是一个流数据流引擎,它提供了数据分布、通信和状态管理等功能。Flink 可以处理有界数据(如固定巨细的数据集)和无界数据(如持续不断的事件流),这使得它既可以作为批处理也可以作为流处理框架来利用。
以下是 Apache Flink 的一些关键特性:

  • 实时处理:Flink 能够实现低耽误的实时数据处理。
  • 高吞吐量:设计用于处理大规模数据流,并能够维持高吞吐量。
  • 容错性:提供强大的容错机制,确保即使在节点故障的情况下也能保证盘算结果的正确性和一致性。
  • 精确一次(Exactly-Once)语义:保证每个事件只被处理一次,这对于须要精确结果的应用非常重要。
  • 窗口利用:支持基于时间、计数或会话的窗口利用,方便对流数据举行复杂的分析。
  • 状态管理:允许用户界说和维护应用状态,这对于实现复杂的业务逻辑是必需的。
  • 可扩展性:可以轻松地部署到各种集群环境中,包括 YARN, Kubernetes, Mesos 等。
  • API 支持:提供了多种语言的 API,包括 Java 和 Scala,也支持 Python 以及其他语言通过 Table API 或 SQL 接口。
Flink 被广泛应用于实时分析、事件驱动应用、ETL 利用、报警体系等范畴。随着大数据和实时数据处理需求的增长,Flink 在业界得到了越来越多的关注和应用。
三、流处理和批处理 的区别

流处理和批处理是数据处理的两种重要方式,它们各自实用于不同的场景,并且有着不同的特点。下面是这两种处理方式的重要区别:
批处理(Batch Processing)



  • 界说:批处理是指对固定巨细的数据集举行处理的过程,这些数据通常是一次性加载到体系中的。
  • 数据特性:处理的是静态的、汗青的数据集合,数据在处理之前就已经完全可用。
  • 耽误:由于须要收集完备的数据集后才能开始处理,因此批处理通常具有较高的耽误。
  • 应用场景:得当于不须要实时相应的场景,如日志分析、报告生成等。
  • 容错性:可以实现精确一次(Exactly-Once)语义,保证每个数据项被正确处理一次。
  • 资源利用:批处理任务可以在非高峰时段运行,以优化资源利用。
流处理(Stream Processing)



  • 界说:流处理是对连续不断的数据流举行即时处理的过程,数据项一旦到达就立即被处理。
  • 数据特性:处理的是动态的、实时的数据流,数据是持续产生的。
  • 耽误:能够提供非常低的耽误,甚至靠近实时,因为数据一到达就可以被处理。
  • 应用场景:实用于须要快速反应的场景,如实时监控、在线广告投放、欺诈检测等。
  • 容错性:当代流处理框架如 Apache Flink 和 Kafka Streams 也支持精确一次(Exactly-Once)语义,但实现起来比批处理更复杂。
  • 资源利用:流处理通常要求更高的盘算资源和更复杂的底子设施来保证低耽误和高吞吐量。
混合模式

近年来,随着技术的发展,出现了一些混合处理模式,好比微批处理(Micro-batching),它将数据流分成小批次举行处理,试图结合流处理和批处理的优点。这种模式既保持了较低的耽误,又简化了处理逻辑和状态管理。
选择哪种处理方式取决于具体的应用需求、数据特性和业务目的。例如,如果应用须要基于最新数据做出决策,那么流处理可能更得当;而对于须要定期生成报表或分析大量汗青数据的情况,则批处理可能是更好的选择。
四、安装Scala

1、 起首确保jdk1.8安装成功
  起首在安装之前,确保当地已经安装了JDK1.5以上的版本,在此博主安装的是1.8版本。并且已经设置了JAVA_HOME 环境变量及JDK的bin目次。
2、下载对应的Scala安装文件scala-2.11.8.zip
接着我们从Scala官网地址 https://www.scala-lang.org/download/all.html 上下载Scala二进制的包。


3. 解压scala-2.11.8.zip
4. 设置Scala的环境变量

  • 打开环境变量
      右击我的电脑,单击"属性",进入如图所示页面。下面开始设置环境变量,右击【我的电脑】–【属性】–【高级体系设置】–【环境变量】,如图:

  • 设置 SCALA_HOME 变量
      单击新建,在变量名栏输入:SCALA_HOME: 变量值一栏输入:D:\scala 也就是 Scala 的安装目次,根据个人情况有所不同,如果安装在 C 盘,将 D 改成 C 即可。

  • 设置 Path 变量
      找到体系变量下的"ath"如图,单击编辑。在"变量值"一栏的最前面添加如下的路径: %SCALA_HOME%\bin;

4. 设置 Classpath 变量
   找到找到体系变量下的"Classpath"如图,单击编辑,如没有,则单击"新建":
变量名: ClassPath
变量值: .D:\scala.;

5. 查抄
   查抄环境变量是否设置好了:调出"cmd"查抄。单击 【开始】,在输入框中输入cmd,然后"回车",输入 scala,然后回车,如环境变量设置ok,你应该能看到这些信息。

6. 测试

Plugins库有很多插件可联网安装,但可以选择离线安装方式,单击红框,然后选择Scala插件所在的路径确认即可。

注:检察scala插件是否安装成功,这也是第二种检察scala是否安装的方法。
如图所示可在Plugins库列表中搜刮到即已完成安装

安装完scala插件后重启IDEA工具使其生效,单击【Restart】

五、大数据案例代码

1、批处理

Maven依赖

起首,确保你的pom.xml中包含以下依赖(实用于Maven构建):
  1. <dependencies>
  2.     <!-- Flink Core -->
  3.     <dependency>
  4.         <groupId>org.apache.flink</groupId>
  5.         <artifactId>flink-scala_2.12</artifactId>
  6.         <version>1.14.0</version>  <!-- 根据需要替换为您使用的Flink版本 -->
  7.     </dependency>
  8.     <!-- Flink Streaming Kafka Connector -->
  9.     <dependency>
  10.         <groupId>org.apache.flink</groupId>
  11.         <artifactId>flink-connector-kafka_2.12</artifactId>
  12.         <version>1.14.0</version>
  13.     </dependency>
  14.     <!-- Oracle JDBC Driver -->
  15.     <dependency>
  16.         <groupId>com.oracle.database.jdbc</groupId>
  17.         <artifactId>ojdbc8</artifactId>
  18.         <version>19.8.0.0</version> <!-- 确保版本与您的Oracle数据库兼容 -->
  19.     </dependency>
  20. </dependencies>
复制代码
设置Kafka和Oracle

请确保你的Kafka主题已经创建并且你能够通过Kafka消费消息。同时,确保你具有Oracle数据库的访问权限,并且已创建适当的表格以插入数据。
Scala + Flink 程序

下面是一段示例代码,展示了如何从Kafka读取数据并插入到Oracle数据库中。
  1. import org.apache.flink.api.common.serialization.SimpleStringSchema
  2. import org.apache.flink.api.scala._
  3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  5. import org.apache.flink.streaming.api.environment.CheckpointingMode
  6. import org.apache.flink.streaming.api.functions.sink.jdbc.JdbcSink
  7. import java.sql.{Connection, PreparedStatement}
  8. import java.util.Properties
  9. object KafkaToOracle {
  10.   def main(args: Array[String]): Unit = {
  11.     // 创建StreamExecutionEnvironment
  12.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  13.     // 配置Kafka消费者
  14.     val kafkaProps = new Properties()
  15.     kafkaProps.setProperty("bootstrap.servers", "localhost:9092")  // Kafka Broker 地址
  16.     kafkaProps.setProperty("group.id", "test")                     // 消费者组
  17.     kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  18.     kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  19.     val kafkaConsumer = new FlinkKafkaConsumer[String]("your_topic", new SimpleStringSchema(), kafkaProps)
  20.     // 从Kafka读取数据
  21.     val stream = env.addSource(kafkaConsumer)
  22.     // 处理和插入数据到Oracle
  23.     stream.map(record => {
  24.       // 假设Kafka传来的数据是以逗号分隔的字符串
  25.       val fields = record.split(",")
  26.       (fields(0), fields(1)) // 返回元组(字段1,字段2)
  27.     }).addSink(new JdbcSink[(String, String)](
  28.       "jdbc:oracle:thin:@your_oracle_host:1521:your_service_name", // Oracle JDBC URL
  29.       (statement: PreparedStatement, t: (String, String)) => {
  30.         statement.setString(1, t._1) // 设置字段1
  31.         statement.setString(2, t._2) // 设置字段2
  32.       },
  33.       new JdbcStatementBuilder[(String, String)] {
  34.         override def accept(t: (String, String), preparedStatement: PreparedStatement): Unit = {
  35.           preparedStatement.setString(1, t._1)
  36.           preparedStatement.setString(2, t._2)
  37.         }
  38.       }
  39.     ))
  40.     // 执行任务
  41.     env.execute("Kafka to Oracle Example")
  42.   }
  43. }
复制代码
表结构

假设你在Oracle中有一个名为your_table的表,结构为:
  1. CREATE TABLE your_table (
  2.     field1 VARCHAR2(255),
  3.     field2 VARCHAR2(255)
  4. );
复制代码
确保表结构与上面代码中的插入逻辑相匹配。
补充阐明


  • Kafka的Topic: 修改your_topic为您实际利用的Topic名称。
  • JDBC URL: 确保jdbc连接字符串和根据是正确的。
  • 性能优化: 在生产环境中,可能须要对Flink设置举行调整,例如并行度、查抄点设置等。
确保全部依赖项正确并且可以访问Kafka和Oracle数据库后,编译并运行这个程序。它将从Kafka主题读取数据,举行处理后再插入到Oracle表中。
2、流处理

Maven依赖

确保你的pom.xml中有以下依赖:
  1. <dependencies>
  2.     <!-- Flink Streaming -->
  3.     <dependency>
  4.         <groupId>org.apache.flink</groupId>
  5.         <artifactId>flink-streaming-scala_2.12</artifactId>
  6.         <version>1.14.0</version>  <!-- 根据需要替换为您使用的Flink版本 -->
  7.     </dependency>
  8.     <!-- Flink Streaming Kafka Connector -->
  9.     <dependency>
  10.         <groupId>org.apache.flink</groupId>
  11.         <artifactId>flink-connector-kafka_2.12</artifactId>
  12.         <version>1.14.0</version>
  13.     </dependency>
  14.     <!-- Oracle JDBC Driver -->
  15.     <dependency>
  16.         <groupId>com.oracle.database.jdbc</groupId>
  17.         <artifactId>ojdbc8</artifactId>
  18.         <version>19.8.0.0</version>
  19.     </dependency>
  20. </dependencies>
复制代码
Scala + Flink 程序

以下是从Kafka读取数据并实时插入Oracle数据库的流处理示例代码:
  1. import java.sql.{Connection, PreparedStatement}
  2. import java.util.Properties
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema
  4. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  6. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
  7. import org.apache.flink.streaming.api.scala._
  8. object KafkaToOracleStreaming {
  9.   def main(args: Array[String]): Unit = {
  10.     // 创建 StreamExecutionEnvironment
  11.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  12.     // Kafka配置
  13.     val kafkaProps = new Properties()
  14.     kafkaProps.setProperty("bootstrap.servers", "localhost:9092") // Kafka Broker 地址
  15.     kafkaProps.setProperty("group.id", "test")                    // 消费者组
  16.     // 创建Kafka消费者
  17.     val kafkaConsumer = new FlinkKafkaConsumer[String]("your_topic", new SimpleStringSchema(), kafkaProps)
  18.     // 从Kafka读取数据流
  19.     val stream = env.addSource(kafkaConsumer)
  20.     // 处理数据并插入Oracle
  21.     stream.map(record => {
  22.       // 假设Kafka传来的数据是以逗号分隔的字符串
  23.       val fields = record.split(",")
  24.       (fields(0), fields(1)) // 返回元组 (字段1, 字段2)
  25.     }).addSink(new OracleSink)
  26.     // 执行任务
  27.     env.execute("Kafka to Oracle Streaming Example")
  28.   }
  29.   // 自定义Sink向Oracle插入数据
  30.   class OracleSink extends RichSinkFunction[(String, String)] {
  31.     var connection: Connection = _
  32.     var insertStmt: PreparedStatement = _
  33.     override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {
  34.       // 初始化JDBC连接
  35.       connection = java.sql.DriverManager.getConnection("jdbc:oracle:thin:@your_oracle_host:1521:your_service_name", "username", "password")
  36.       // 创建插入语句
  37.       insertStmt = connection.prepareStatement("INSERT INTO your_table (field1, field2) VALUES (?, ?)")
  38.     }
  39.     override def invoke(value: (String, String), context: Context): Unit = {
  40.       // 设置参数值
  41.       insertStmt.setString(1, value._1)
  42.       insertStmt.setString(2, value._2)
  43.       // 执行插入
  44.       insertStmt.executeUpdate()
  45.     }
  46.     override def close(): Unit = {
  47.       // 关闭连接和语句
  48.       if (insertStmt != null) insertStmt.close()
  49.       if (connection != null) connection.close()
  50.     }
  51.   }
  52. }
复制代码

  • Kafka消费者: 利用FlinkKafkaConsumer从Kafka主题获取数据。
  • 数据处理: 每条从Kafka获取的记载在此处被转换为一个元组(字段1, 字段2),假设它们是通过逗号分隔的。
  • 自界说Sink: OracleSink类继承自RichSinkFunction,负责与Oracle数据库的连接和数据插入。

    • 在open方法中,建立与Oracle的连接。
    • 在invoke方法中,实行插入利用。
    • 在close方法中,确保正确关闭连接和语句。

  • 实行环境: 末了,通过env.execute("Kafka to Oracle Streaming Example")来启动Flink流处理任务。

六、项目部署

Scala+Flink 打包以后仍旧是jar 通过Java程序的方式部署即可

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

鼠扑

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表