Apache Spark
简介
是一个开源的同一分析引擎,专为大规模数据处理而设计。它提供了高级API,支持Java、Scala、Python和R语言,并且包含了一个优化过的实行引擎,该引擎支持循环计算(如呆板学习算法)和交互式查询。以下是Spark的一些关键特性和概念
焦点特性
- 速率:Spark通过内存计算提高了数据处理的速率,比Hadoop MapReduce快达10到100倍。
- 易用性:提供丰富的高层次API,包罗DataFrame和Dataset API,简化了数据操纵。
- 通用性:除了Map和Reduce操纵之外,还支持SQL查询、流处理、呆板学习和图计算等多种工作负载。
- 可扩展性:能够有效地在数千个节点上并行运行。
- 容错性:利用RDD(Resilient Distributed Dataset)抽象层,自动处理节点故障恢复。
根本概念
1. RDD (Resilient Distributed Dataset)
- 定义:
RDD 是 Spark 的最底层抽象,表示分布在集群节点上的不可变、可分区的数据集合。它提供低级别的 API,强调对数据的细粒度控制。
- 特点:
- 强类型:存储任意类型的对象(如 RDD[User])。
- 手动优化:需要开辟者自行处理序列化、分区、缓存等优化。
- 函数式操纵:通过 map、filter、reduce 等函数式算子处理数据。
- 容错性:通过血统(Lineage)机制重建丢失的分区。
- 适用场景:
非布局化数据、需要精细控制的分布式计算(如自定义分区计谋)。
2. DataFrame
- 定义:
DataFrame 是基于 RDD 构建的布局化数据抽象,类似于关系型数据库的表或 Pandas 的 DataFrame。在 Spark 1.3 中引入。
- 特点:
- Schema 束缚:数据具有明确的布局(列名、数据类型),通过 Row 对象表示行。
- 优化实行:利用 Catalyst 优化器和 Tungsten 实行引擎,自动优化查询操持。
- API 类型:弱类型 API(列名在运行时检查),支持 SQL 语法。
- 跨语言支持:在 Java、Scala、Python、R 中接口一致。
- 适用场景:
布局化/半布局化数据(如 JSON、CSV)、SQL 式查询、需要自动优化的批处理。
3. Dataset
- 定义:
Dataset 是 Spark 1.6 引入的 API,联合了 RDD 的强类型特性和 DataFrame 的优化引擎。仅在 Scala 和 Java 中可用。
- 特点:
- 强类型 + 布局化:兼具 RDD 的类型安全(如 Dataset[User])和 DataFrame 的优化能力。
- 同一 API:与 DataFrame API 兼容(DataFrame = Dataset[Row])。
- 编码器(Encoder):利用高效的二进制序列化(优于 Java 序列化)。
- 适用场景:
需要类型安全的复杂业务逻辑、联合函数式和关系式操纵的场景。
焦点区别对比
特性RDDDataFrameDataset数据类型任意对象(强类型)布局化的 Row 对象(弱类型)强类型对象(如 Dataset[User])序列化Java 序列化(较慢)Tungsten 二进制编码(高效)Encoder 序列化(高效)优化无自动优化,需手动调优Catalyst 优化器自动优化Catalyst 优化器自动优化API 风格函数式(map, filter)声明式(SQL/DSL)混淆式(强类型 API + DSL)类型安全编译时类型检查运行时类型检查编译时类型检查语言支持所有 Spark 语言所有 Spark 语言仅 Scala/Java 演进关系
- Spark 1.x:RDD → DataFrame(为布局化数据优化)。
- Spark 2.x:DataFrame 和 Dataset 同一为 Dataset[T](DataFrame = Dataset[Row])。
如何选择?
- 优先用 DataFrame/Dataset:
大多数场景下,布局化数据处理更高效(Catalyst 优化 + Tungsten)。
- 需要类型安全时用 Dataset:
如 Scala/Java 中复杂业务逻辑。
- 仅底层控制时用 RDD:
如自定义分区、非布局化数据,或需直接操纵分布式数据。
代码示例
spark-submit 参数
在 Spark 中,spark-submit 是提交作业到集群的焦点下令。以下是常用参数及其作用,分为 基础参数、资源参数和 调优参数:
一、基础参数
参数说明示例--master指定集群模式yarn, local
, spark://host:port, k8s://...--deploy-mode部署模式(客户端或集群)client(默认)或 cluster(得当生产)--class主类名(含包路径)--class com.example.MainApp--name作业名称(显示在集群UI)--name "My Spark Job"--files上传文件到 Executor(如设置文件)--files config.json--jars添加依赖的 JAR 包(逗号分隔)--jars lib1.jar,lib2.jar--packages从堆栈自动下载依赖(Maven格式)--packages org.apache.kafka:kafka-clients:3.4.0 二、资源参数
参数说明示例注意事项--executor-memory每个 Executor 的内存--executor-memory 4g需预留内存给系统和开销(如总内存的10%)--driver-memoryDriver 进程的内存--driver-memory 2g客户端模式下需本地足够内存--num-executorsExecutor 数量--num-executors 10根据集群资源动态调整--executor-cores每个 Executor 的 CPU 核数--executor-cores 2总核数 = num-executors * executor-cores--total-executor-cores所有 Executor 的总核数(Standalone 模式)--total-executor-cores 20优先级低于 num-executors 三、调优参数
参数说明示例用途--conf spark.serializer指定序列化方式--conf spark.serializer=org.apache.spark.serializer.KryoSerializer优化序列化性能--conf spark.sql.shuffle.partitions调整 Shuffle 分区数--conf spark.sql.shuffle.partitions=200避免数据倾斜或分区过大--conf spark.default.parallelism默认并行度--conf spark.default.parallelism=100控制 RDD 的分区数--conf spark.memory.fractionExecutor 内存中用于实行和存储的比例--conf spark.memory.fraction=0.6调整内存分配计谋--conf spark.dynamicAllocation.enabled启用动态资源分配--conf spark.dynamicAllocation.enabled=true按需增减 Executor(需集群支持)
示例下令
关键注意事项
- 资源分配
- 总内存和核数不能凌驾集群资源上限。
- 在 YARN 模式下,--executor-memory 包含堆外内存,需预留约 10% 的额外内存(如申请 4g,现实可用约 4g * 0.9)。
- 动态资源分配
- 启用 spark.dynamicAllocation.enabled=true 时需设置 spark.shuffle.service.enabled=true(YARN 需启动 Shuffle Service)。
- 依赖管理
- 优先利用 --packages 自动下载依赖,避免手动传 JAR。
- 本地依赖用 --jars,集群依赖需预先上传到 HDFS 或共享存储。
- 日记与调试
- 添加 --conf spark.eventLog.enabled=true 记录变乱日记。
- 在客户端模式下,Driver 日记输出到控制台;集群模式下需通过集群 UI 查看。
参数优化场景
- 数据倾斜:增大 spark.sql.shuffle.partitions 或利用 repartition。
- OOM 错误:增长 executor-memory 或调整 spark.memory.fraction。
- CPU 瓶颈:增长 num-executors 或 executor-cores。
- 网络超时:调整 spark.network.timeout(默认 120s)。
hive
Hive 是构建在 Hadoop 生态系统之上的数据堆栈工具,旨在简化大规模数据的查询和管理。它通过类 SQL 语法(HiveQL)将布局化数据操纵转化为 MapReduce、Tez 或 Spark 任务,得当处理海量数据(如日记、用户举动等)。以下是其焦点概念和用法:
Hive 焦点特性
特性描述SQL-like 语法支持类似 SQL 的查询语言(HiveQL),降低大数据处理的学习成本。数据存储数据存储在 HDFS(Hadoop 分布式文件系统)中,支持多种文件格式(如 ORC、Parquet)。元数据管理利用 Metastore(如 MySQL)存储表布局、分区等元信息。扩展性支持自定义函数(UDF)、SerDe(序列化/反序列化工具)等扩展功能。批处理基于 MapReduce 或 Tez 引擎,得当离线批处理,不适用于实时查询。
Hive 架构
- 用户接口
CLI、JDBC、Web UI 等工具提交 HiveQL 查询。
- Driver
剖析查询,天生实行操持,管理任务生命周期。
- 编译器
将 HiveQL 转换为 MapReduce/Tez/Spark 任务。
- 元数据存储
Metastore 存储表布局、分区、字段类型等信息。
- 实行引擎
运行编译后的任务,读写 HDFS 数据。
Hive 数据模子
表(Table)
类似关系型数据库的表,支持内部表(数据由 Hive 管理)和外部表(数据由用户管理)。
CREATE TABLE users (id INT, name STRING) STORED AS ORC;
分区(Partition)
按某一列的值划分数据目次,加速查询(如按日期分区)。
CREATE TABLE logs (log_time STRING, content STRING)
PARTITIONED BY (dt STRING);
分桶(Bucket)
按哈希值将数据分到多个文件,优化 JOIN 和采样服从。
CREATE TABLE orders (order_id INT, user_id INT)
CLUSTERED BY (user_id) INTO 10 BUCKETS;
Hive 应用场景
场景说明离线数据分析处理 TB/PB 级历史数据(如用户举动分析、日记统计)。ETL 流程洗濯、转换数据后导入数据堆栈(如将 CSV 转换为 ORC 格式)。数据挖掘联合呆板学习库(如 Hive + Mahout)举行聚类、分类等操纵。报表天生定时天生统计报表(如每日销售额汇总)。 Hive 优缺点
优点缺点易用性强(SQL 语法)耽误高(分钟级相应,不得当实时查询)可扩展性高(自定义 UDF)不支持变乱和行级更新(Hive 3 部分支持)兼容 Hadoop 生态(HDFS、HBase 等)需要优化分区和存储格式提升性能
Hive vs 传统数据库
对比项Hive传统数据库(如 MySQL)数据规模支持 PB 级数据得当 GB/TB 级数据相应速率高耽误(批处理)低耽误(实时查询)变乱支持有限支持(Hive 3+)完整支持 ACID存储与计算分离(HDFS + 计算引擎)耦合(本地存储 + 计算
Hive 利用示例
创建表并加载数据
CREATE EXTERNAL TABLE user_logs (
ip STRING,
url STRING,
time STRING
) PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/hive/data/user_logs';
LOAD DATA INPATH '/input/log_20231001.txt' INTO TABLE user_logs PARTITION (dt='2023-10-01');
聚合查询
SELECT dt, COUNT(*) AS pv
FROM user_logs
WHERE dt BETWEEN '2023-10-01' AND '2023-10-07'
GROUP BY dt;
毗连多个表
SELECT u.name, SUM(o.amount)
FROM orders o
JOIN users u ON o.user_id = u.id
GROUP BY u.name;
生态工具
- Hive Metastore:独立元数据服务(供 Spark、Presto 等共用)。
- Hive on Spark:用 Spark 替代 MapReduce 提升计算速率。
- Hive LLAP(Live Long and Process):低耽误交互式查询。
总结
- 适用场景:离线批处理、海量数据堆栈管理。
- 替代方案:实时查询用 Impala 或 Presto;复杂分析用 Spark SQL。
- 学习建议:掌握 HiveQL 语法、分区优化和存储格式(ORC/Parquet)。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |