hadoop集群搭建及编程实践

打印 上一主题 下一主题

主题 879|帖子 879|积分 2637

Hadoop集群搭建

1.前期准备及JDK,hadoop安装

1.1JDK的下载地址,hadoop下载地址

Java Downloads | Oracle 中国
选择JDK8
清华镜像源
选择hadoop-3.3.5
注意点
查看镜像是32位还是64位
  1. uname -m
复制代码
当输出为x86_64时,说明是64位,不是的就是32位,此时需要重新下载镜像,32位不方便
1.2创建hadoop用户

在安装完linus镜像之后,需要创建一个专门的"hadoop"用户,这里的用户名为 “prettyspider"
首先按 ctrl+alt+t 打开终端窗口,输入如下命令创建新用户 :
  1. sudo useradd -m prettyspider -s /bin/bash
复制代码
-m:将prettyspider作为用户放入到用户登录目录
-s:指定用户登入后使用的shell
为用户设置登录密码
  1. sudo passwd prettyspider
复制代码
为用户添加管理员权限
  1. sudo adduser prettyspider sudo
复制代码
之后登出,登录"hadoop"用户
1.3更新apt
  1. sudo apt-get update
复制代码
同步时间
  1. sudo apt-get install ntpdata
  2. ntpdata -u time2.aliyun.com # 同步为阿里云NTP服务器
复制代码
下载vim
  1. sudo apt-get install vim
复制代码
1.4安装SSH、配置SSH无密码登陆
  1. sudo apt-get install openssh-server
复制代码
安装完之后,登录本机
  1. ssh localhost
复制代码
在下方提示中输入yes,再根据提示输入“hadoop"用户的密码
设置免密登录之前,一定要先用密码登录一下
  1. exit                           # 退出刚才的 ssh localhost
  2. cd ~/.ssh/                     # 若没有该目录,请先执行一次ssh localhost
  3. ssh-keygen -t rsa              # 会有提示,都按回车就可以
  4. cat ./id_rsa.pub >> ./authorized_keys  # 加入授权
复制代码
再使用ssh localhost登录
1.5配置远程登录

远程登录实现种类比较多,最轻便的是用vscode进行远程登录,这里使用的是MobaXterm软件
可到官网中下载MobaXterm Xserver with SSH, telnet, RDP, VNC and X11 - Download (mobatek.net)
1.6JDK安装

JDK版本为1.8.0_371
  1. cd /usr/lib
  2. sudo mkdir jvm #创建/usr/lib/jvm目录用来存放JDK文件
  3. sudo tar -zxvf ~/jdk-8u371-linux-x64.tar.gz -C /usr/lib/jvm  #将
复制代码
设置环境变量
  1. cd ~
  2. vim ~/.bashrc
复制代码
在其中添加
  1. export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_371  # 对应的版本号为jdk1.8.0_对应下载版本8u后面的数字
  2. export JRE_HOME=${JAVA_HOME}/jre
  3. export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
  4. export PATH=${JAVA_HOME}/bin:$PATH
复制代码
是配置文件生效
  1. source ~/.bashrc
复制代码
查看是否安装成功
  1. java -version
复制代码
当出现下图,表明安装成功

1.7安装hadoop
  1. sudo tar -zxvf ~/hadoop-3.3.5.tar.gz -C /usr/local   # 解压到/usr/local中
  2. cd /usr/local/
  3. sudo mv ./hadoop-3.3.5/ ./hadoop            # 将文件夹名改为hadoop
  4. sudo chown -R prettyspider ./hadoop       # 修改文件权限,prettyspider为你的”hadoop"用户名
复制代码
查看hadoop是否可用
  1. cd /usr/local/hadoop
  2. ./bin/hadoop version
复制代码
出现下图,表示可用

依次配置3台主机,对应的hadoop用户名都为prettyspider
2.设置主机名和添加主机映射

2.1修改主机名
  1. sudo vim /etc/hostname
复制代码
3台主机分别设置为 node01 node02 node03
重启后,对应的主机名便会更改,如

2.2添加主机映射

在node01结点上
  1. sudo vim /etc/hosts
复制代码
添加主机的映射,设置成下图

相应的其他结点也需要设置成上图一样
3.验证连通性

用ping指令验证连通性
  1. ping node02 -c 3
复制代码
连通成功的结果

4.SSH无密码登录

在最开始配置的SSH是只针对当前主机而言的SSH密匙,但是不利用集群的操作,所以需要统一的配置SSH密匙
4.1在主节点上删除原有SSH,并再创建一个统一的SSH密匙
  1. cd ~/.ssh              # 如果没有该目录,先执行一次ssh localhost
  2. rm ./id_rsa*           # 删除之前生成的公匙(如果已经存在)
  3. ssh-keygen -t rsa       # 执行该命令后,遇到提示信息,一直按回车就可以
复制代码
将生成的密匙添加到用户的~/.ssh/authorized_keys,用于身份验证
  1. cat ./id_rsa.pub >> ./authorized_keys
复制代码
将密匙传入到对应的从结点上 传输到node02,node03
  1. scp ~/.ssh/id_rsa.pub prettyspider@node02:/home/prettyspider/ # 此处@前后的名称为自定义的用户名和主机名 ,/home/后的为自定义的用户名
复制代码
在对应的结点上实现
  1. mkdir ~/.ssh       # 如果不存在该文件夹需先创建,若已存在,则忽略本命令
  2. cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
  3. rm ~/id_rsa.pub    # 用完以后就可以删掉
复制代码
4.3查看是否成功
  1. ssh nod02
复制代码
如下,表示成功

4.4为hadoop添加PATH

在~/.bashrc中添加
  1. export PATH=$PATH:/usr/local/hadoop/bin:/usr/local/hadoop/sbin  # 指向对应hadoop路径下的hadoop启动文件夹的目录
复制代码
5.配置集群/分布式环境

5.1进入/usr/local/hadoop/etc/hadoop
  1. /usr/local/hadoop/etc/hadoop
复制代码
5.2修改workers

workers的作用:配置为DateNode的主机名,如下,删除localhost

5.3修改文件core-site.xml

指定namenode的位置和设置hadoop文件系统的基本配置
5.4修改hdfs-site.xml

配置namenode和datanode存放文件的基本路径及配置副本的数量,最小值为3

5.5修改mapred-site.xml


5.6修改yarn-site.xml

设置resourceManager运行在哪台机器上,设置NodeManager的通信方式

6.分发到其他结点

6.1分发其他结点
  1. cd /usr/local
  2. sudo rm -r ./hadoop/tmp     # 删除 Hadoop 临时文件
  3. sudo rm -r ./hadoop/logs/*   # 删除日志文件
  4. tar -zcf ~/hadoop.master.tar.gz ./hadoop   # 先压缩再复制
  5. cd ~
  6. scp ./hadoop.master.tar.gz node02:/home/prettyspider
复制代码
其中
sudo rm -r ./hadoop/tmp     # 删除 Hadoop 临时文件
sudo rm -r ./hadoop/logs/*   # 删除日志文件
很重要,在后期配置hbase集群时有用
6.2从节点解压并设置用户组
  1. sudo rm -r /usr/local/hadoop    # 删掉旧的(如果存在)
  2. sudo tar -zxf ~/hadoop.master.tar.gz -C /usr/local
  3. sudo chown -R prettyspider /usr/local/hadoop
复制代码
7.格式化namenode

在从结点上完成了部署hadoop,在主节点上执行名称结点的格式化
  1. hdfs namenode -format
复制代码
自此,hadoop集群搭建完成,启动集群
  1. start-dfs.sh
  2. start-yarn.sh
  3. mr-jobhistory-daemon.sh start historyserver
复制代码

hadoop集群的规划为

8.执行分布式实例

8.1创建HDFS上的用户目录
  1. hdfs dfs -mkdir -p /user/prettyspider
复制代码
hadoop用户名是什么,user后的用户就是什么
8.2创建input目录
  1. hdfs dfs -mkdir input # input文件夹默认在用户目录下,也就是prettyspider目录下
  2. hdfs dfs -put /usr/local/hadoop/etc/hadoop/*.xml input
复制代码
8.3运行MapReduce作业

这个测试是用正则表达式获取指定前缀的任意长的字段
  1. hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.5.jar grep input output 'dfs[a-z.]+'
复制代码
结果为

9.java API与HDFS的编程

1.导入Maven依赖
  1. <dependencies>
  2.     <dependency>
  3.            <groupId>org.apache.hadoop</groupId>
  4.            <artifactId>hadoop-common</artifactId>
  5.            <version>2.7.5</version>
  6.    </dependency>
  7.        <dependency>
  8.            <groupId>org.apache.hadoop</groupId>
  9.            <artifactId>hadoop-client</artifactId>
  10.            <version>2.7.5</version>
  11. </dependency>
  12.        <dependency>
  13.            <groupId>org.apache.hadoop</groupId>
  14.            <artifactId>hadoop-hdfs</artifactId>
  15.            <version>2.7.5</version>
  16. </dependency>
  17. <dependency>
  18.            <groupId>org.apache.hadoop</groupId>
  19.            <artifactId>hadoop-mapreduce-client-core</artifactId>
  20.            <version>2.7.5</version>
  21. </dependency>
  22.        <dependency>
  23.            <groupId>junit</groupId>
  24.            <artifactId>junit</artifactId>
  25.            <version>RELEASE</version>
  26.        </dependency>
  27.    </dependencies>
  28.    <build>
  29.        <plugins>
  30.            <plugin>
  31.                <groupId>org.apache.maven.plugins</groupId>
  32.                <artifactId>maven-compiler-plugin</artifactId>
  33.                <version>3.1</version>
  34.                <configuration>
  35.                    <source>1.8</source>
  36.                    <target>1.8</target>
  37.                    <encoding>UTF-8</encoding>
  38.                    
  39.                </configuration>
  40.            </plugin>
  41.            <plugin>
  42.                <groupId>org.apache.maven.plugins</groupId>
  43.                <artifactId>maven-shade-plugin</artifactId>
  44.                <version>2.4.3</version>
  45.                <executions>
  46.                    <execution>
  47.                        <phase>package</phase>
  48.                        <goals>
  49.                            <goal>shade</goal>
  50.                        </goals>
  51.                        <configuration>
  52.                            <minimizeJar>true</minimizeJar>
  53.                        </configuration>
  54.                    </execution>
  55.                </executions>
  56.            </plugin>
  57.        </plugins>
  58.    </build>
复制代码
2.上传本地文件到HDFS文件系统,将HDFS文件系统中的文件下载到本地并压缩

1.创建ConnectionJavaBean类,用于登录HDFS
  1. package com.prettyspider.hadoop;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import java.net.URISyntaxException;
  7. /**
  8. * @author prettyspider
  9. * @ClassName Connection
  10. * @description: TODO
  11. * @date 2023/10/7 19:00
  12. * @Version V1.0
  13. */
  14. public class Connection {
  15.     // HDFS文件系统web地址
  16.     private String hdfsUrl;
  17.     // hadoop用户名
  18.     private String hadoopHost;
  19.     // 文件系统对象
  20.     private FileSystem fs;
  21.     public Connection() {}
  22.     public Connection(String hdfsUrl, String hadoopHost) {
  23.         this.hdfsUrl = hdfsUrl;
  24.         this.hadoopHost = hadoopHost;
  25.     }
  26.     public Connection(String hdfsUrl, String hadoopHost, FileSystem fs) {
  27.         this.hdfsUrl = hdfsUrl;
  28.         this.hadoopHost = hadoopHost;
  29.         this.fs = fs;
  30.     }
  31.     public String getHadoopHost() {
  32.         return hadoopHost;
  33.     }
  34.     /**
  35.      * 将web地址和hadoop用户名传入,生成文件系统对象
  36.      * @return HDFS文件系统对象
  37.      * @throws Exception
  38.      */
  39.     public FileSystem init() {
  40.         Configuration configuration = new Configuration();
  41.         try {
  42.             fs = FileSystem.newInstance(new URI(hdfsUrl), configuration, hadoopHost);
  43.         } catch (IOException e) {
  44.             throw new RuntimeException(e);
  45.         } catch (InterruptedException e) {
  46.             throw new RuntimeException(e);
  47.         } catch (URISyntaxException e) {
  48.             throw new RuntimeException(e);
  49.         }
  50.         return fs;
  51.     }
  52.     public void fsClose() {
  53.         try {
  54.             fs.close();
  55.         } catch (IOException e) {
  56.             throw new RuntimeException(e);
  57.         }
  58.     }
  59.     /**
  60.      * 获取
  61.      * @return hdfsUrl
  62.      */
  63.     public String getHdfsUrl() {
  64.         return hdfsUrl;
  65.     }
  66.     /**
  67.      * 设置
  68.      * @param hdfsUrl
  69.      */
  70.     public void setHdfsUrl(String hdfsUrl) {
  71.         this.hdfsUrl = hdfsUrl;
  72.     }
  73.     /**
  74.      * 设置
  75.      * @param hadoopHost
  76.      */
  77.     public void setHadoopHost(String hadoopHost) {
  78.         this.hadoopHost = hadoopHost;
  79.     }
  80.     /**
  81.      * 获取
  82.      * @return fs
  83.      */
  84.     public FileSystem getFs() {
  85.         return fs;
  86.     }
  87.     /**
  88.      * 设置
  89.      * @param fs
  90.      */
  91.     public void setFs(FileSystem fs) {
  92.         this.fs = fs;
  93.     }
  94.     public String toString() {
  95.         return "Connection{hdfsUrl = " + hdfsUrl + ", hadoopHost = " + hadoopHost + ", fs = " + fs + "}";
  96.     }
  97. }
复制代码
2.创建文件转化工具类FileTransferUtil,实现对文件夹的上传和下载
  1. package com.prettyspider.hadoop.updateanddownload;
  2. import org.apache.hadoop.fs.*;
  3. import java.io.*;
  4. import java.util.zip.ZipEntry;
  5. import java.util.zip.ZipOutputStream;
  6. /**
  7. * @author prettyspider
  8. * @ClassName update
  9. * @description: TODO
  10. * @date 2023/10/7 19:23
  11. * @Version V1.0
  12. */
  13. public class FileTransferUtil {
  14.     private FileTransferUtil() {
  15.     }
  16.     /**
  17.      * 将本地指定路径下的文件上传到HDFS文件系统上
  18.      *
  19.      * @param localPath 本地文件路径
  20.      * @param hdfsPath  HDFS文件系统路径
  21.      * @param fs        HDFS文件系统对象
  22.      */
  23.     public static void update(String localPath, String hdfsPath, FileSystem fs) {
  24.         /**
  25.          * 细节:
  26.          *      两次getName()的意义不同,第一次是获取文件夹或者文件的名称,第二次是获取文件的名称,不能共用
  27.          */
  28.         String name1 = new File(localPath).getName();
  29.         hdfsPath = hdfsPath + "/" + name1;
  30.         // 获取本地文件的文件集合
  31.         File[] files = new File(localPath).listFiles();
  32.         if (files != null) {
  33.             for (File file : files) {
  34.                 // 当为文件是便上传
  35.                 if (file.isFile()) {
  36.                     String absolutePath = file.getAbsolutePath();
  37.                     String name = file.getName();
  38.                     try {
  39.                         System.out.println(hdfsPath + "/" + name);
  40.                         fs.copyFromLocalFile(new Path("file:///" + absolutePath), new Path(hdfsPath + "/" + name));
  41.                     } catch (IOException e) {
  42.                         throw new RuntimeException(e);
  43.                     }
  44.                 } else {
  45.                     update(file.toString(), hdfsPath, fs);
  46.                 }
  47.             }
  48.         }
  49.     }
  50.     /**
  51.      *
  52.      * @param localPath 本地文件路径
  53.      * @param hdfsPath HDFS文件系统路径
  54.      * @param fs HDFS文件系统对象
  55.      * @param username 用户名
  56.      * @throws IOException
  57.      */
  58.     public static void download(String localPath, String hdfsPath, FileSystem fs,String username) throws IOException {
  59.         RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = locatedFileStatusRemoteIterator = fs.listFiles(new Path(hdfsPath), true);
  60.         while (locatedFileStatusRemoteIterator.hasNext()) {
  61.             LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
  62.             // 用用户名做切分点,获取从用户名开始的文件路径
  63.             String name = next.getPath().toString().split(username)[1];
  64.             /**
  65.              * 细节:
  66.              *      将获取的用户名进行切分,再组合
  67.              */
  68.             String[] arr = name.split("/");
  69.             String fileName = "";
  70.             for (int i = 0; i < arr.length - 1; i++) {
  71.                 fileName += arr[i] + "/";
  72.             }
  73.             // 获取HDFS文件系统的路径
  74.             Path path = next.getPath();
  75.             FSDataInputStream getMessage = fs.open(path);
  76.             BufferedReader reader = new BufferedReader(new InputStreamReader(getMessage));
  77.             /**
  78.              * 细节:
  79.              *      输出时需要先创建文件目录
  80.              */
  81.             File file = new File(localPath, fileName);
  82.             if (!file.exists()) {
  83.                 file.mkdirs();
  84.             }
  85.             BufferedWriter writer = new BufferedWriter(new FileWriter(new File(file, arr[arr.length - 1])));
  86.             String line;
  87.             while ((line = reader.readLine()) != null) {
  88.                 writer.write(line);
  89.                 writer.newLine();
  90.             }
  91.             writer.close();
  92.             reader.close();
  93.         }
  94.         // 压缩
  95.         ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(new File(localPath, hdfsPath + ".zip")));
  96.         toZIp(new File(localPath,hdfsPath), zipOutputStream, hdfsPath);
  97.     }
  98.     /**
  99.      *
  100.      * @param src 文件夹对象
  101.      * @param zipOutputStream 压缩流
  102.      * @param path 指定文件夹下的根目录
  103.      * @throws IOException
  104.      */
  105.     private static void toZIp(File src, ZipOutputStream zipOutputStream, String path) throws IOException {
  106.         File[] files = src.listFiles();
  107.         if (files != null) {
  108.             for (File file : files) {
  109.                 if (file.isFile()) {
  110.                     ZipEntry zipEntry = new ZipEntry(path + "\" + file.getName());
  111.                     zipOutputStream.putNextEntry(zipEntry);
  112.                     BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file));
  113.                     byte[] bytes = new byte[1024 * 1024 * 8];
  114.                     int len;
  115.                     while ((len = bufferedInputStream.read(bytes))!=-1) {
  116.                         zipOutputStream.write(bytes, 0, len);
  117.                     }
  118.                     bufferedInputStream.close();
  119.                 } else {
  120.                     toZIp(file, zipOutputStream, path + "\" + file.getName());
  121.                 }
  122.             }
  123.             zipOutputStream.close();
  124.         }
  125.     }
  126. }
复制代码
测试类
FileTransferTest
  1. import org.apache.hadoop.fs.FileSystem;
  2. import org.junit.Test;
  3. import java.io.IOException;
  4. /**
  5. * @author prettyspider
  6. * @ClassName fileTransferTest
  7. * @description: TODO
  8. * @date 2023/10/7 19:47
  9. * @Version V1.0
  10. */
  11. public class fileTransferTest {
  12.     @Test
  13.     public void testUpdate() throws IOException {
  14.         Connection connection = new Connection("hdfs://node01:9000", "prettyspider");
  15.         FileSystem fs = connection.init();
  16.         FileTransferUtil.update("E:\\test\\wordcount","input",new ConnectionTest().testInit());
  17.         // fileTransfer.download("E:\\test","input",fs,connection.getHadoopHost());
  18.         connection.fsClose();
  19.     }
  20. }
复制代码
结果
本地

HDFS文件系统Web端

3.根据HDFS文件系统查看学生是否提交作业

假设用HDFS文件系统管理学生作业,如何获取学生是否提交作业
实现:
1.根据HDFS文件系统获取指定”班级"下的所有的已经提交作业的学生
2.与班级的学生名单进行比较,获取没有提交作业的学生
创建JobSunmissionUtil工具类,实现获取没有提交做的学生
  1. package com.prettyspider.hadoop.jobsubmission;
  2. import org.apache.hadoop.fs.FileStatus;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import java.io.*;
  6. import java.util.ArrayList;
  7. /**
  8. * @author prettyspider
  9. * @ClassName Search
  10. * @description: TODO
  11. * @date 2023/10/8 11:23
  12. * @Version V1.0
  13. */
  14. public class JobSubmissionUtil {
  15.     private JobSubmissionUtil(){}
  16.     public static void search(FileSystem fs) throws Exception {
  17.         File file = new File(".\\src\\main\\java\\com\\prettyspider\\hadoop\\jobsubmission\\stu.txt");
  18.         BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
  19.         String line;
  20.         ArrayList<String> list = new ArrayList<>();
  21.         ArrayList<String> nameList = new ArrayList<>();
  22.         while ((line = reader.readLine()) != null) {
  23.             list.add(line.split("-")[0]);
  24.         }
  25.         System.out.println(list);
  26.         FileStatus[] fileStatuses = fs.listStatus(new Path("input/stu"));
  27.         for (FileStatus fileStatus : fileStatuses) {
  28.             String[] arr = fileStatus.getPath().toString().split("/");
  29.             String s = arr[arr.length - 1].split("\\.")[0];
  30.             nameList.add(s);
  31.         }
  32.         System.out.println(nameList);
  33.         // 去重
  34.         for (String name : nameList) {
  35.             list.remove(name);
  36.         }
  37.         System.out.println("没有交作业的是"+list);
  38.     }
  39. }
复制代码
测试类
JobsubmissionTest
  1. package com.prettyspider.hadoop.updateanddownload;
  2. import com.prettyspider.hadoop.Connection;
  3. import com.prettyspider.hadoop.jobsubmission.JobSubmissionUtil;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.junit.Test;
  6. /**
  7. * @author prettyspider
  8. * @ClassName SearchTest
  9. * @description: TODO
  10. * @date 2023/10/8 11:30
  11. * @Version V1.0
  12. */
  13. public class JobSubmissionTest {
  14.     @Test
  15.     public void testsearch() throws Exception {
  16.         Connection connection = new Connection("hdfs://node01:9000", "prettyspider");
  17.         FileSystem fs = connection.init();
  18.         JobSubmissionUtil.search(fs);
  19.         connection.fsClose();
  20.     }
  21. }
复制代码
测试数据
4.实现HDFS文件系统指定文件夹内的文件词频统计(手搓)

MapReduce是hadoop两个核心之一,MapReduce框架由Map和Reduce组成。 Map ()负责把一个大的block块进行切片并计算。 Reduce () 负责把Map ()切片的数据进行汇总、计算。
那么可以通过简化,实现切片和数据统计
实现步骤:
1.将HDFS文件系统指定文件夹下的文件合并到一个文件中
2.对文件进行切分
3.将切分之后的数据利用Map集合实现统计
创建WordCountUtil工具类
  1. package com.prettyspider.hadoop.wordcount;
  2. import org.apache.hadoop.fs.*;
  3. import java.io.*;
  4. import java.util.*;
  5. /**
  6. * @author prettyspider
  7. * @ClassName wordcount
  8. * @description: TODO
  9. * @date 2023/10/8 12:46
  10. * @Version V1.0
  11. */
  12. public class WordCountUtil {
  13.     private WordCountUtil() {}
  14.     /**
  15.      * 将指定文件夹下的文件合并到一个文件中,再对文件进行词频统计
  16.      * @param fs HDFS文件系统对象
  17.      * @param hdfsPath 要统计词频的文件夹地址
  18.      * @param mergePath 合并后的文件地址
  19.      * @throws IOException
  20.      */
  21.     public static void wordcount(FileSystem fs,String hdfsPath,String mergePath) throws IOException {
  22.         merge(fs, hdfsPath, mergePath);
  23.         wordcount(fs, mergePath);
  24.     }
  25.     /**
  26.      * 利用Map对数据进行统计
  27.      * @param fs HDFS文件系统
  28.      * @param mergePath 合并的文件地址
  29.      * @throws IOException
  30.      */
  31.     private static void wordcount(FileSystem fs, String mergePath) throws IOException {
  32.         FSDataInputStream open = fs.open(new Path(mergePath));
  33.         // 用集合获取数据
  34.         ArrayList<String> list = new ArrayList<>();
  35.         BufferedReader reader = new BufferedReader(new InputStreamReader(open));
  36.         String line;
  37.         while ((line = reader.readLine()) != null) {
  38.             list.add(line);
  39.         }
  40.         StringBuilder stringBuilder = new StringBuilder();
  41.         for (String s : list) {
  42.             stringBuilder.append(s);
  43.         }
  44.         String[] arr = stringBuilder.toString().split("\\W+");
  45.         // 词频统计
  46.         wordstatistic(arr);
  47.     }
  48.     /**
  49.      *
  50.      * @param arr 被拆分后的词的数组
  51.      */
  52.     private static void wordstatistic(String[] arr) {
  53.         HashMap<String, Integer> map = new HashMap<>();
  54.         for (int i = 0; i < arr.length; i++) {
  55.             String s = arr[i];
  56.             // map中不存在数据
  57.             if (!map.containsKey(s)) {
  58.                 map.put(s, 1);
  59.             } else {
  60.                 int count = map.get(s) + 1;
  61.                 map.put(s,count);
  62.             }
  63.         }
  64.         // 输出结果
  65.         Set<Map.Entry<String, Integer>> entries = map.entrySet();
  66.         for (Map.Entry<String, Integer> entry : entries) {
  67.             String key = entry.getKey();
  68.             Integer value = entry.getValue();
  69.             System.out.println("key="+key+",value="+value);
  70.         }
  71.     }
  72.     /**
  73.      *
  74.      * @param fs HDFS文件系统对象
  75.      * @param hdfsPath 要统计的文件夹地址
  76.      * @param mergePath 合并后文件地址
  77.      * @throws IOException
  78.      */
  79.     private static void merge(FileSystem fs, String hdfsPath, String mergePath) throws IOException {
  80.         FSDataOutputStream fsDataOutputStream = fs.create(new Path(mergePath));
  81.         FileStatus[] fileStatuses = fs.listStatus(new Path(hdfsPath));
  82.         BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream));
  83.         for (FileStatus fileStatus : fileStatuses) {
  84.             FSDataInputStream open = fs.open(new Path(fileStatus.getPath().toUri()));
  85.             BufferedReader reader = new BufferedReader(new InputStreamReader(open));
  86.             String line;
  87.             while ((line = reader.readLine()) != null) {
  88.                 writer.write(line);
  89.                 writer.newLine();
  90.             }
  91.             reader.close();
  92.             open.close();
  93.         }
  94.         writer.close();
  95.         fsDataOutputStream.close();
  96.     }
  97. }
复制代码
测试类
WordCountTest
  1. package com.prettyspider.hadoop.updateanddownload;
  2. import com.prettyspider.hadoop.Connection;
  3. import com.prettyspider.hadoop.wordcount.WordCountUtil;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.junit.Test;
  6. import java.io.IOException;
  7. /**
  8. * @author prettyspider
  9. * @ClassName WordCountTest
  10. * @description: TODO
  11. * @date 2023/10/8 13:15
  12. * @Version V1.0
  13. */
  14. public class WordCountTest {
  15.     @Test
  16.     public void testwordcount() throws IOException {
  17.         Connection connection = new Connection("hdfs://node01:9000", "prettyspider");
  18.         FileSystem fs = connection.init();
  19.         WordCountUtil.wordcount(fs,"input/wordcount","output/merge.txt");
  20.                 connection.fsClose();
  21.     }
  22. }
复制代码
结果


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

耶耶耶耶耶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表