万万哇 发表于 2024-12-7 05:12:17

期末复习-Hadoop综合复习

说明

以下内容仅供参考,提到不代表考到,请结合实际情况本身复习
目录
说明
一、题型及分值
二、综合案例题-摆设Hadoop集群 或 摆设Hadoop HA集群
三、名称表明(8选5)
1.什么是大数据
2.大数据的5V特征
3.什么是SSH
4.HDFS(p32)
5.名称节点
6.数据节点
7.元数据
8.倒排索引
9.单点故障
10.高可用
11.数据堆栈
四、简答题
1、简述Hadoop的优点及其含义
2.简述独立模式、伪分布式模式和完全分布式模式摆设Hadoop的区别
3.简述HDFS的健壮性
4.简述YARN根本架构的组成部分及其作用
5.简述不同范例ZNode的区别
6.简述Hadoop高可用集群初次启动时的步骤
7.简述Hive中分区和桶的作用
五、Hive代码题
题目: 电商订单分析
答案
六、HDFS代码题
课本资料
相关题目
题目 1: 创建目录并上传文件
题目 2: 检察文件信息
题目 3: 数据移动与复制
题目 4: 删除操作
题目 5: 文件权限设置
题目 6: 文件备份与验证
七、MapReduce编程
题目 1: 单词计数 (Word Count)
题目 2: 最大值求解 (Max Value Finder)
 题目 3: 匀称值盘算 (Average Calculation)
题目 4: Top K 单词统计
思路
题目 5: 日志分析
 题目 6: 用户购买分析
题目 7: 倒排索引 (Inverted Index)
 题目 8: 用户商品共现分析
 题目 9: 数据去重
题目 10: 分组统计 (Group By)

一、题型及分值

1.综合案例题(35分)
2.名词表明(每个3分,共15分)
3.简答题(每题6分,共30分)
4.编程题(共3题,共20分)

二、综合案例题-摆设Hadoop集群 或 摆设Hadoop HA集群

这部分内容发起观看课本
tar -zxvf jdk-8u... -C /export/... 在Linux系统中,tar命令用于打包和解包文件。以下是tar命令中 -zxvf 和 -C 选项的含义:

-z:这个选项表示同时通过gzip进行压缩或解压缩。如果.tar文件实际上是一个.tar.gz或.tgz文件,这个选项是必要的。
-x:这个选项代表解包(extract)一个.tar文件。
-v:这个选项用于在处理文件时显示详细信息(verbose),它会列出正在处理的文件,这样用户可以看到解压的进度和具体内容。
-f:这个选项用于指定要处理的文件名。在tar命令中,-f通常是最后一个选项,并且后面直接跟着要操作的文件名。
所以,-zxvf组合起来就是告诉tar命令:解压缩一个用gzip压缩的.tar.gz文件,并在处理过程中显示详细信息。

-C:这个选项告诉tar命令在指定的目录中解包文件。在上面的命令中,-C /export/…表示将文件解压到/export/…这个目录下。
综上所述,tar -zxvf jdk-8u... -C /export/…命令的作用是:以详细方式解压名为jdk-8u...的gzip压缩的tar文件,并将其内容解压到/export/…目录中。注意,jdk-8u...是文件名的占位符,你需要替换为实际的文件名。 # 验证Hadoop是否安装成功
bin/hadoop version # 启动Hadoop
start-dfs.sh
# 启动yarn
start-yarn.sh
# 查看hadoop运行状态
jps 三、名称表明(8选5)

1.什么是大数据

大数据指的是海量数据;具有海量、流转快、数据范例丰富及价值密度低等特点
2.大数据的5V特征

大数据的特征包括大量(Volume)、真实(Veracity)、多样(Variety)、低价值密度(Value)和高速(Velocity)
3.什么是SSH

SSH是一种网络协议,主要用于在不安全网络上提供安全的长途登录和其他安全网络服务。它可以或许加密网络连接,确保在客户端和服务器之间传输的数据不会容易被窃取或篡改。
4.HDFS(p32)

HDFS是Hadoop Distributed File System的缩写,中文称为Hadoop分布式文件系统,专为大规模数据集的处理而计划,主要用于存储和管理海量数据文件。
5.名称节点

NameNode是HDFS集群的名称节点,通常称为主节点。假如NameNode由于故障原因而无法利用,那么用户就无法访问HDFS。也就是说,NameNode作为HDFS的主节点,起着至关重要的部分
6.数据节点

DataNode是HDFS集群中的数据节点,通常称为从节点,其主要功能如下:


[*] 存储Block
[*] 根据NameNode的指令对Block举行创建、复制、删除等操作
[*] 定期向NameNode报告自身存储的Block列表及健康状态
[*] 负责为客户端发起的读写请求提供服务
7.元数据

MetaData用于记载HDFS文件系统的相关信息,这些信息称为元数据,元数据的内容包括文件系统的目录结构、文件名、文件路径、文件大小、文件副本数、文件与Block的映射关系,以及Block与DataNode的映射关系等信息
8.倒排索引

倒排索引是文档检索系统中最常用的数据结构,被广泛应用于全文搜索引擎。倒排索引主要用来存储某个单词或词组在一组文档中的存储位置的映射,提供了可以根据内容查找文档的方式,而不是根据文档确定内容,因此称为倒排索引。
9.单点故障

在HDFS集群中,NameNode是主节点,它的运行状态决定着HDFS集群是否可用。然而在Hadoop计划之初,HDFS集群只能存在一个NameNode节点,这种计划的缺点是NameNode节点一旦发生故障,就会导致HDFS集群不可用,这就是所谓的单点故障问题
10.高可用

Hadoop通过在HDFS集群中配置多个NameNode(一个Active,多个Standby)来确保系统连续运行,当Active NameNode故障时,自动选举新的Active NameNode,防止单点故障。
11.数据堆栈

数据堆栈是一个面向主题、集成的、相对稳固和反映汗青变化的数据集合,用于企业或组织的决策分析。

四、简答题

有的来自于书籍,有的来自于AI(因为书籍内容过多)
1、简述Hadoop的优点及其含义

    (1)低成本,可用多台廉价机组建集群,分布式处理大数据,降低成本。
        (2)高可靠性,自动保存数据副本,避免数据丢失。
    (3)高容错性,自动检测并应对故障,通过任务转移,防止任务失败。
    (4)高效率,Hadoop可高效的执行并行计算,且在各个计算机中动态地移动计算。
    (5)高扩展性,可随时添加更多的计算机,增加集群存储,计算能力。 2.简述独立模式、伪分布式模式和完全分布式模式摆设Hadoop的区别

(1)独立模式:本地独立模式不进行任何配置,是Hadoop的默认工作模式,所有组件都在同一台机器运行,适用于学习和体验。

(2)伪分布模式:也是在一台单机上运行,通过单节点模拟分布式,但部署的Hadoop集群是一个伪分布式系统,适合本地开发和验证。

(3)完全分布模式:是一种在多台计算机JVM进程中运行Hadoop集群的工作模式,所有组件分布在多台机器上,部署的集群是完全分布式系统,适用于生产环境。 3.简述HDFS的健壮性

其健壮性可表现为:在HDFS出现故障的情况下可靠的存储数据,其运用了心跳机制、副本机制、数据完备性校验、安全模式和快照 5 种计谋保证了数据存储的可靠性
4.简述YARN根本架构的组成部分及其作用

YARN 根本架构由 ResourceManager、ApplicationMaster、NodeManager 和 Container 组成,其中,ResourceManager 为全局资源管理器,负责整个系统的资源管理和分配;ApplicationMaster每个应用步伐特有的,负责单个应用步伐的管理;NodeManager 负责在节点上启动和管理Container(容器);Container 封装了每个应用步伐利用的资源。
5.简述不同范例ZNode的区别

ZooKeeper中的ZNode范例主要有以下区别:

[*] 持久节点:除非手动删除,否则一直存在。
[*] 临时节点:随客户端会话结束而自动删除,不能有子节点。
[*] 顺序节点:在持久或临时节点基础上,创建时带唯一递增序号,用于记载创建顺序。
6.简述Hadoop高可用集群初次启动时的步骤

1.启动JournalNode
hdfs -- daemon start journalnode 2.格式化HDFS文件系统
hdfs namenode -format 3.同步NameNode
scp -r /export/data/hadoop/namenode/ hadoop2:/export/data/hadoop/ 4.格式化ZKFC
hdfs zkfc -formatZK 5.启动HDFS
start-dfs.sh 6.启动YARN
start-yarn.sh 7.简述Hive中分区和桶的作用

分区:将表数据按规则分别存储在不同目录,减少全表扫描,提高查询效率。 桶:按规则将数据匀称分布在不同文件中,制止数据倾斜,优化查询性能。
五、Hive代码题

Hive实践作业三
7.5 数据库操作
7.6 表操作

1.在hadoop1中执行start-dfs.sh和start-yarn.sh分别启动hdfs和yarn,
保证hadoop完全分布式集群正常启动,
hadoop1 jps NameNode ResourceManager
hadoop2 jps NodeManager DataNode SecondaryNameNode
hadoop3 jps NodeManager DataNode
2.在hadoop3中执行systemctl status mysqld验证mysql80正常启动
3.在hadoop3中启动MetaStore服务,hive --service metastore
4.hadoop3复制会话,启动HiveServer2服务,hive --service hiveserver2
5.hadoop3再次复制会话,jps 多了两个RunJar进程,元数据存储系统和HiveServer2正常启动
6.在hadoop2中,执行 hive 登录
7.hadoop2复制会话,执行 beeline -u jdbc:hive2://hadoop3:10000 -n root 登录

8.在登录的hive中,输入:show databases;

9.数据库操作
创建数据库 create database homework;
查看数据库 describe homework;
切换数据库 use homework;
10.表操作
complex表的创建
create table complex(
col1 array<int>,
col2 map<int,string>,
col3 struct<a:string,b:int,c:double>,
col4 string,
col5 int
);

complex表的查看
desc complex;

user_p分区表的创建
create table user_p (id int, name string)
partitioned by (gender string)
row format delimited fields terminated by ',';

user_p分区表的查看
desc user_p;

array_test内部表的创建
create table array_test(
name string,
score array<int>
)
row format delimited fields terminated by '@'
collection items terminated by ',';

array_test内部表导入数据

zhangshan@89,88,97,80
lisi@90,95,99,97
wangwu@90,77,88,79
zhaoliu@91,79,98,89

文本文件array_test.txt需要先创建,在hadoop3上哦

load data local inpath '/root/array_test.txt' into table homework.array_test;

array_test内部表查询数据
select * from homework.array_test;

map_test内部表的创建
create table map_test(
name string,
score map<string,int>
)
row format delimited fields terminated by '@'
collection items terminated by ','
map keys terminated by ':';

map_test内部表导入数据

zhangshan@math:90,english:89,java:88,hive:80
lisi@math:98,english:79,java:96,hive:92
wangwu@math:88,english:86,java:89,hive:88
zhaoliu@math:89,english:78,java:79,hive:77

文本文件map_test.txt需要先创建,在hadoop3上哦

load data local inpath '/root/map_test.txt' into table homework.map_test;

map_test内部表查询数据
select * from homework.map_test;

select name from homework.map_test;

select score from homework.map_test; 题目: 电商订单分析

某电商公司必要构建订单管理系统,并在 Hive 中完成以下任务:
数据描述
订单数据由以下信息组成:

[*] order_id (订单ID, INT)
[*] customer_id (客户ID, INT)
[*] order_date (订单日期, STRING,格式:yyyy-MM-dd)
[*] order_items (订单商品,ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>)
[*] order_details (订单详情,MAP<STRING, STRING>,包含键值对如 payment_method -> credit_card, delivery_status -> delivered)
任务要求:
任务 1: 内部表创建
创建一个内部表 orders_internal,用于存储上述订单数据。
任务 2: 外部表创建
创建一个外部表 orders_external,数据存储在 HDFS 的 /data/orders_external/ 目录下。
任务 3: 分区表创建
创建一个按 order_date 分区的表 orders_partitioned,优化按日期范围查询的性能。
任务 4: 数据插入
为三个表分别插入以下样例数据:
订单1:
order_id: 101, customer_id: 1, order_date: '2024-12-01'
order_items: [
    {item_id: 201, item_name: 'Laptop', quantity: 1, price: 1000.0},
    {item_id: 202, item_name: 'Mouse', quantity: 2, price: 25.0}
]
order_details: {'payment_method': 'credit_card', 'delivery_status': 'delivered'}

订单2:
order_id: 102, customer_id: 2, order_date: '2024-12-02'
order_items: [
    {item_id: 203, item_name: 'Keyboard', quantity: 1, price: 50.0}
]
order_details: {'payment_method': 'paypal', 'delivery_status': 'shipped'}
任务 5: 查询操作

[*] 查询所有已完成(delivery_status = delivered)订单的客户ID及商品明细。
[*] 查询每个客户的总消费金额。
答案

任务 1: 内部表创建
CREATE TABLE orders_internal (
    order_id INT,
    customer_id INT,
    order_date STRING,
    order_items ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>,
    order_details MAP<STRING, STRING>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'; 任务 2: 外部表创建
CREATE EXTERNAL TABLE orders_external (
    order_id INT,
    customer_id INT,
    order_date STRING,
    order_items ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>,
    order_details MAP<STRING, STRING>
)
STORED AS TEXTFILE
LOCATION '/data/orders_external/'; 任务 3: 分区表创建
CREATE TABLE orders_partitioned (
    order_id INT,
    customer_id INT,
    order_items ARRAY<STRUCT<item_id: INT, item_name: STRING, quantity: INT, price: FLOAT>>,
    order_details MAP<STRING, STRING>
)
PARTITIONED BY (order_date STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'; 任务 4: 数据插入
插入内部表:
INSERT INTO TABLE orders_internal VALUES
(101, 1, '2024-12-01',
    ARRAY(
      NAMED_STRUCT('item_id', 201, 'item_name', 'Laptop', 'quantity', 1, 'price', 1000.0),
      NAMED_STRUCT('item_id', 202, 'item_name', 'Mouse', 'quantity', 2, 'price', 25.0)
    ),
    MAP('payment_method', 'credit_card', 'delivery_status', 'delivered')
),
(102, 2, '2024-12-02',
    ARRAY(
      NAMED_STRUCT('item_id', 203, 'item_name', 'Keyboard', 'quantity', 1, 'price', 50.0)
    ),
    MAP('payment_method', 'paypal', 'delivery_status', 'shipped')
); 插入外部表: 将数据手动存储在 /data/orders_external/,格式如下:
101    1    2024-12-01    [{"item_id":201,"item_name":"Laptop","quantity":1,"price":1000.0},{"item_id":202,"item_name":"Mouse","quantity":2,"price":25.0}]    {"payment_method":"credit_card","delivery_status":"delivered"}
102    2    2024-12-02    [{"item_id":203,"item_name":"Keyboard","quantity":1,"price":50.0}]    {"payment_method":"paypal","delivery_status":"shipped"} 加载数据后,直接查询外部表。
插入分区表:
INSERT INTO TABLE orders_partitioned PARTITION(order_date='2024-12-01') VALUES
(101, 1,
    ARRAY(
      NAMED_STRUCT('item_id', 201, 'item_name', 'Laptop', 'quantity', 1, 'price', 1000.0),
      NAMED_STRUCT('item_id', 202, 'item_name', 'Mouse', 'quantity', 2, 'price', 25.0)
    ),
    MAP('payment_method', 'credit_card', 'delivery_status', 'delivered')
);

INSERT INTO TABLE orders_partitioned PARTITION(order_date='2024-12-02') VALUES
(102, 2,
    ARRAY(
      NAMED_STRUCT('item_id', 203, 'item_name', 'Keyboard', 'quantity', 1, 'price', 50.0)
    ),
    MAP('payment_method', 'paypal', 'delivery_status', 'shipped')
); 任务 5: 查询操作
1. 查询所有已完成(delivery_status = delivered)订单的客户ID及商品明细:
SELECT
    customer_id,
    order_items
FROM
    orders_internal
WHERE
    order_details['delivery_status'] = 'delivered'; 2. 查询每个客户的总消费金额:
SELECT
    customer_id,
    SUM(total_price) AS total_spent
FROM (
    SELECT
      customer_id,
      item.quantity * item.price AS total_price
    FROM
      orders_internal
      LATERAL VIEW EXPLODE(order_items) exploded_items AS item
) t
GROUP BY
    customer_id; 六、HDFS代码题

课本资料

dfs常用的子命令选项
子命令选项功能描述-ls检察指定目录信息-du检察指定目录下每个文件和子目录的大小,子目录也可以看作单独的目录,因为它也可 以存在于目录-mv移动到指定文件或目录-cp复制指定文件或目录-rm删除指定文件或目录-put将本地文件系统中的指定文件传到 HDFS 指定目录-cat检察指定文件的内容-help检察资助文档-mkdir创建目录-get将 HDFS 的指定文件下载到本地文件系统 1.查看目录 /data的信息
hdfs dfs -ls -S /data
-S 按照由大到小的顺序显示指定目录的内容

根据文件内容大小,按照由小到大的顺序显示目录 /data的内容,并将默认的文件大小格式化为便于查看的格式进行显示
hdfs dfs -ls -r -h /data
-r 根据文件大小按照由小到大的顺序显示目录
-h 将默认文件大小(字节数)格式化为便于查看的格式进行显示

递归显示目录/data及其子目录的信息,信息中仅显示文件和子目录的路径
hdfs dfs -ls -R -C /data
-R 递归显示目录/data及其子目录的信息
-C 信息中仅显示文件和子目录的路径

2.在HDFS的目录/data中创建子目录/dataChild1。并在子目录/dataChild1中创建子目录/dataChild2
hdfs dfs -mkdir -p /data/dataChild1/dataChild2

3.查看/data中每个文件和子目录的大小,并将默认的文件和子目录大小格式化为便于查看的格式进行显示
hdfs dfs -du -h /data

4.将目录/data中的子目录/dataChild1 移动到目录/data/dataChild中
hdfs dfs -mv /dataChild1 /data/dataChild

将目录/data中的文件dataA 重命名为dataA_New
hdfs dfs -mv /data/dataA /data/dataA_New

5.将目录/data下的文件dataA_New 和 dataB复制到目录/data/dataChild
hdfs dfs -cp /data/dataA_New /data/dataB_New /data/dataChild

将目录/data下的文件 dataA_New复制到子目录/dataChild,并将其重命名为dataA
hdfs dfs -cp /data/dataA_New /data/dataChild/dataA

6.删除目录/data的子目录/dataChild
hdfs dfs -rm -r /data/dataChild

7.将本地文件系统中/export/data目录下文件a.txt 和 b.txt上传到HDFS的目录/data
hdfs dfs -put /export/data/a.txt /export/data/b.txt /data

8.查看目录/data中的文件a.txt的内容
hdfs dfs -cat /data/a.txt

9.将HDFS中目录/data中的文件a.txt和b.txt 下载到本地文件系统/opt目录下
hdfs dfs -get /data/a.txt /data/b.txt /opt
相关题目

题目 1: 创建目录并上传文件

描述: 假设你在 HDFS 的根目录下,必要完成以下操作:

[*] 在 HDFS 中创建一个名为 /user/yourname/data 的目录。
[*] 将本地目录 /local/data/input.txt 中的文件上传到刚创建的 HDFS 目录中。
要求: 编写 Shell 命令完成以上任务。
https://i-blog.csdnimg.cn/direct/c1e599ce16a84d79b9871dff6b602b72.png
题目 2: 检察文件信息

描述: 假设 HDFS 中已经存在目录 /user/yourname/data/input.txt,必要完成以下操作:

[*] 检察该文件的具体信息(包括文件权限、大小等)。
[*] 表现该文件的内容。
要求: 写出相应的 HDFS Shell 命令。
https://i-blog.csdnimg.cn/direct/d50c7410f25e4e17bc23384e0dcebb59.png

题目 3: 数据移动与复制

描述:

[*] 将文件 /user/yourname/data/input.txt 移动到 HDFS 中的 /user/yourname/archive/ 目录下。
[*] 将文件 /user/yourname/archive/input.txt 复制回 /user/yourname/data/ 目录。
要求: 提供具体的 HDFS Shell 命令。
https://i-blog.csdnimg.cn/direct/e848139696264dcdb245561a594ca431.png
题目 4: 删除操作

描述: 删除 HDFS 中的 /user/yourname/data 目录及其内容,并验证该目录是否被乐成删除。
要求: 写出执行以上操作的 Shell 命令。
https://i-blog.csdnimg.cn/direct/051a961e5dfd4657a2e582b20e4a2f23.png

题目 5: 文件权限设置

描述: 假设 /user/yourname/data/input.txt 文件必要满意以下权限要求:

[*] 文件所有者可以读写;
[*] 文件所在组成员只能读取;
[*] 其他用户无权限。
要求: 提供修改文件权限的 HDFS Shell 命令。
https://i-blog.csdnimg.cn/direct/b2db73e26d9b46088ac4e2759332be30.png
题目 6: 文件备份与验证

描述:

[*] 将 HDFS 中的 /user/yourname/data/input.txt 备份到 /user/yourname/backup/input.txt。
[*] 验证备份文件与原文件的内容是否同等。
要求: 写出完备的 Shell 命令。
https://i-blog.csdnimg.cn/direct/92293d1f3f2c44b38f8079bb6bba1542.png

七、MapReduce编程

题目 1: 单词计数 (Word Count)

https://i-blog.csdnimg.cn/direct/ce44c12c06934f5f9a9f9b9499159951.png
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCount {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] tokens = value.toString().split("\\s+");
            for (String token : tokens) {
                word.set(token);
                context.write(word, one);
            }
      }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
      private IntWritable result = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
      }
    }

    public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");
      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      FileInputFormat.addInputPath(job, new Path(args));
      FileOutputFormat.setOutputPath(job, new Path(args));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

题目 2: 最大值求解 (Max Value Finder)

https://i-blog.csdnimg.cn/direct/0b85f0ffc9944d46affd5498c52a4ef8.png
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MaxValue {
    public static class MaxMapper extends Mapper<Object, Text, Text, IntWritable> {
      private final static Text keyOut = new Text("Max");

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            int num = Integer.parseInt(value.toString());
            context.write(keyOut, new IntWritable(num));
      }
    }

    public static class MaxReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int max = Integer.MIN_VALUE;
            for (IntWritable val : values) {
                max = Math.max(max, val.get());
            }
            context.write(key, new IntWritable(max));
      }
    }

    public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "max value");
      job.setJarByClass(MaxValue.class);
      job.setMapperClass(MaxMapper.class);
      job.setReducerClass(MaxReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      FileInputFormat.addInputPath(job, new Path(args));
      FileOutputFormat.setOutputPath(job, new Path(args));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
 题目 3: 匀称值盘算 (Average Calculation)


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class AverageCalculation {
    public static class AvgMapper extends Mapper<Object, Text, Text, IntWritable> {
      private final static Text keyOut = new Text("Average");

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            int num = Integer.parseInt(value.toString());
            context.write(keyOut, new IntWritable(num));
      }
    }

    public static class AvgReducer extends Reducer<Text, IntWritable, Text, Text> {
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0, count = 0;
            for (IntWritable val : values) {
                sum += val.get();
                count++;
            }
            double average = (double) sum / count;
            context.write(key, new Text(String.format("%.2f", average)));
      }
    }

    public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "average calculation");
      job.setJarByClass(AverageCalculation.class);
      job.setMapperClass(AvgMapper.class);
      job.setReducerClass(AvgReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      FileInputFormat.addInputPath(job, new Path(args));
      FileOutputFormat.setOutputPath(job, new Path(args));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
题目 4: Top K 单词统计

https://i-blog.csdnimg.cn/direct/4961a43eb0b74f308bd39c6495473396.png
答案:通过两阶段 MapReduce 实现:第一阶段统计单词频率,第二阶段从中找出频率最高的 K 个单词。
第一阶段:统计单词频率
这部分代码与常规单词计数雷同。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordFrequency {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] tokens = value.toString().split("\\s+");
            for (String token : tokens) {
                word.set(token);
                context.write(word, one);
            }
      }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
      private IntWritable result = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
      }
    }

    public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word frequency");
      job.setJarByClass(WordFrequency.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setReducerClass(IntSumReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      FileInputFormat.addInputPath(job, new Path(args));
      FileOutputFormat.setOutputPath(job, new Path(args));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
https://i-blog.csdnimg.cn/direct/054cf09d5cec4335ba3e82618e91247a.png

第二阶段:提取 Top K
思路

第二阶段通过输入第一阶段的输出结果,将频率和单词对交换,按频率降序排序,选出频率最高的 K 个单词
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.TreeMap;

public class TopKWords {
    public static class SwapMapper extends Mapper<Object, Text, IntWritable, Text> {
      private IntWritable frequency = new IntWritable();
      private Text word = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] parts = value.toString().split("\\s+");
            if (parts.length == 2) {
                word.set(parts);
                frequency.set(Integer.parseInt(parts));
                context.write(frequency, word); // 倒置键值对,频率作为 key
            }
      }
    }

    public static class TopKReducer extends Reducer<IntWritable, Text, Text, IntWritable> {
      private TreeMap<Integer, String> topKMap = new TreeMap<>();
      private int K = 10; // 设置需要的 Top K 值

      public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text val : values) {
                topKMap.put(key.get(), val.toString());
                if (topKMap.size() > K) {
                  topKMap.remove(topKMap.firstKey()); // 保持大小为 K
                }
            }
      }

      protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Integer freq : topKMap.descendingKeySet()) {
                context.write(new Text(topKMap.get(freq)), new IntWritable(freq));
            }
      }
    }

    public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "top k words");
      job.setJarByClass(TopKWords.class);
      job.setMapperClass(SwapMapper.class);
      job.setReducerClass(TopKReducer.class);
      job.setOutputKeyClass(IntWritable.class);
      job.setOutputValueClass(Text.class);
      FileInputFormat.addInputPath(job, new Path(args));
      FileOutputFormat.setOutputPath(job, new Path(args));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
https://i-blog.csdnimg.cn/direct/808eba0e69244979af00eef854ccf7bd.png
题目 5: 日志分析


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogAnalysis {
    public static class LogMapper extends Mapper<Object, Text, Text, IntWritable> {
      private final static IntWritable one = new IntWritable(1);

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] parts = value.toString().split(" ");
            if (parts.length > 0) {
                String ip = parts; // 提取 IP
                context.write(new Text(ip), one);
            }
      }
    }

    public static class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
      }
    }

    public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "log analysis");
      job.setJarByClass(LogAnalysis.class);
      job.setMapperClass(LogMapper.class);
      job.setReducerClass(LogReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      FileInputFormat.addInputPath(job, new Path(args));
      FileOutputFormat.setOutputPath(job, new Path(args));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
 题目 6: 用户购买分析

https://i-blog.csdnimg.cn/direct/bd2520ed8fdc4f649479002418d6d09b.png
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class UserPurchaseAnalysis {
    public static class PurchaseMapper extends Mapper<Object, Text, Text, DoubleWritable> {
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] parts = value.toString().split(" ");
            if (parts.length == 2) {
                String userId = parts;
                double amount = Double.parseDouble(parts);
                context.write(new Text(userId), new DoubleWritable(amount));
            }
      }
    }

    public static class PurchaseReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
      public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            double sum = 0.0;
            for (DoubleWritable val : values) {
                sum += val.get();
            }
            context.write(key, new DoubleWritable(sum));
      }
    }

    public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "user purchase analysis");
      job.setJarByClass(UserPurchaseAnalysis.class);
      job.setMapperClass(PurchaseMapper.class);
      job.setReducerClass(PurchaseReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(DoubleWritable.class);
      FileInputFormat.addInputPath(job, new Path(args));
      FileOutputFormat.setOutputPath(job, new Path(args));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
题目 7: 倒排索引 (Inverted Index)

https://i-blog.csdnimg.cn/direct/f29cabc83d1848398247db27e175f237.png
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.HashSet;

public class InvertedIndex {
    public static class IndexMapper extends Mapper<Object, Text, Text, Text> {
      private Text word = new Text();
      private Text documentId = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split("\t", 2); // 输入格式: 文档ID \t 文本内容
            if (line.length < 2) return;

            documentId.set(line);
            String[] words = line.split("\\s+");
            for (String w : words) {
                word.set(w);
                context.write(word, documentId);
            }
      }
    }

    public static class IndexReducer extends Reducer<Text, Text, Text, Text> {
      public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            HashSet<String> docIds = new HashSet<>();
            for (Text docId : values) {
                docIds.add(docId.toString());
            }
            context.write(key, new Text(String.join(", ", docIds)));
      }
    }

    public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "inverted index");
      job.setJarByClass(InvertedIndex.class);
      job.setMapperClass(IndexMapper.class);
      job.setReducerClass(IndexReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
      FileInputFormat.addInputPath(job, new Path(args));
      FileOutputFormat.setOutputPath(job, new Path(args));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
 题目 8: 用户商品共现分析

https://i-blog.csdnimg.cn/direct/926b2a0f8406481c842d852d98ed0c6e.png
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;

public class CoOccurrence {
    public static class CoOccurrenceMapper extends Mapper<Object, Text, Text, Text> {
      private Text user = new Text();
      private Text itemList = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] parts = value.toString().split("\\s+");
            if (parts.length < 2) return;

            user.set(parts);
            itemList.set(String.join(",", parts, 1, parts.length));
            context.write(user, itemList);
      }
    }

    public static class CoOccurrenceReducer extends Reducer<Text, Text, Text, Text> {
      public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            ArrayList<String> items = new ArrayList<>();
            for (Text val : values) {
                String[] parts = val.toString().split(",");
                for (String item : parts) {
                  items.add(item);
                }
            }

            for (int i = 0; i < items.size(); i++) {
                for (int j = i + 1; j < items.size(); j++) {
                  String pair = items.get(i) + " " + items.get(j);
                  context.write(new Text(pair), new Text("1"));
                }
            }
      }
    }

    public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "co-occurrence");
      job.setJarByClass(CoOccurrence.class);
      job.setMapperClass(CoOccurrenceMapper.class);
      job.setReducerClass(CoOccurrenceReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
      FileInputFormat.addInputPath(job, new Path(args));
      FileOutputFormat.setOutputPath(job, new Path(args));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
 题目 9: 数据去重

​​​​​​​https://i-blog.csdnimg.cn/direct/008ab846890c46b99bc05c08339655d7.png
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Deduplication {
    public static class DedupMapper extends Mapper<Object, Text, Text, Text> {
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value, new Text(""));
      }
    }

    public static class DedupReducer extends Reducer<Text, Text, Text, Text> {
      public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(key, new Text(""));
      }
    }

    public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "data deduplication");
      job.setJarByClass(Deduplication.class);
      job.setMapperClass(DedupMapper.class);
      job.setReducerClass(DedupReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
      FileInputFormat.addInputPath(job, new Path(args));
      FileOutputFormat.setOutputPath(job, new Path(args));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
题目 10: 分组统计 (Group By)

https://i-blog.csdnimg.cn/direct/a5d5507c19bf41d6ac28fb670acf6ca9.png
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class GroupBy {
    public static class GroupMapper extends Mapper<Object, Text, Text, IntWritable> {
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] parts = value.toString().split("\\s+");
            if (parts.length == 2) {
                String group = parts;
                int number = Integer.parseInt(parts);
                context.write(new Text(group), new IntWritable(number));
            }
      }
    }

    public static class GroupReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
      }
    }

    public static void main(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "group by");
      job.setJarByClass(GroupBy.class);
      job.setMapperClass(GroupMapper.class);
      job.setReducerClass(GroupReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      FileInputFormat.addInputPath(job, new Path(args));
      FileOutputFormat.setOutputPath(job, new Path(args));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 期末复习-Hadoop综合复习