Hadoop编写MapReduce步伐计算超市贩卖数据月份贩卖总额

打印 上一主题 下一主题

主题 550|帖子 550|积分 1650

前言:超市贩卖数据的管理和分析对于商家来说至关紧张。借助Hadoop生态系统中的MapReduce框架,我们可以有效地处置惩罚大规模的贩卖数据,并从中提取有价值的信息,比如每个月的贩卖总额。本文将先容怎样利用Hadoop编写MapReduce步伐来计算超市斲丧数据的月份贩卖总额。
一. 数据准备

首先,我们需要准备超市的贩卖数据。假设我们的数据以文本文件的形式存储,每行代表一笔贩卖记录,包罗日期、商品名称和贩卖额等信息。例如:

二.Hadoop分布式文件系统(HDFS)

先容

Hadoop分布式文件系统(HDFS)是Apache Hadoop生态系统的核心组件之一,用于存储和管理大规模数据集。它设计用于在便宜的硬件上运行,并且提供高可靠性、高性能的分布式存储解决方案。本文将深入探究HDFS的工作原理、架构和优势。
HDFS架构

HDFS的架构由以下几个紧张组件构成:

  • NameNode:NameNode是HDFS的关键组件之一,负责管理文件系统的定名空间和存储块的元数据信息。它维护了文件系统树及其相关属性,并且记录了每个文件块的位置信息。
  • DataNode:DataNode是另一个紧张组件,负责现实存储数据块。每个DataNode管理着其所在节点上的存储,定期向NameNode报告其存储容量和康健状况。
  • Secondary NameNode:Secondary NameNode并不是NameNode的备份,而是协助NameNode举行元数据的周期性归并和编辑日志的滚动。它资助减轻了NameNode的压力,并且有助于提高系统的可靠性。
  • 客户端:客户端是与HDFS交互的应用步伐,负责向HDFS读取和写入数据。
HDFS工作原理

HDFS的工作原理可以简要概括为以下几个步骤:

  • 文件分块:当一个文件被上传到HDFS时,它会被分别成固定大小的数据块(通常为128MB或256MB)。这些数据块被分散存储在多个DataNode上,以实现数据的分布式存储和处置惩罚。
  • 定名空间管理:NameNode负责管理文件系统的定名空间和元数据信息。它维护文件系统树,记录文件和数据块的位置信息,并处置惩罚客户端的文件系统操纵请求。
  • 数据复制:为了提高数据的可靠性和容错性,HDFS会将每个数据块复制到多个DataNode上。这些副本通常分布在不同的机架上,以减少硬件故障对数据的影响。
  • 容错机制:HDFS通过周期性的心跳检测和块报告来监视DataNode的康健状况。如果发现某个数据块的副本丢失或损坏,HDFS会主动从其他DataNode上的副本举行规复。
HDFS的优势

HDFS作为大数据存储解决方案,具有以下几个显著的优势:

  • 高可靠性:HDFS通过数据复制和容错机制,包管了数据的可靠性和容错性,即使在硬件故障的环境下也能保持数据的完备性。
  • 高扩展性:HDFS可以轻松地扩展到数以千计的节点,以应对不断增长的数据量和处置惩罚需求。
  • 高性能:由于数据块的分布式存储和并行处置惩罚,HDFS可以实现高吞吐量和低延迟的数据访问。
  • 本钱效益:HDFS运行在便宜的标准硬件上,并且采用了大规模并行处置惩罚的模式,使得它成为一种经济高效的数据存储解决方案。
在HDFS的/<个人学号>/data/路径(例如/202201/data)下创建并存储该数据文件;
通过hdfs dfs -cat/<个人学号>/data/xxx下令查看数据文件的截图,用以证实数据已成功存储在HDFS 上。
1.首先打开HDFS进程,并用hdfs dfs mkdir -p /学号/data 下令在HDFS中创建文件夹,并用hdfs dfs -ls/查看是否创建。

2. 将csv文件准备好,并移动到linxu操纵系统的 /opt/ 目录下,运用下令 hdfs dfs -put /opt/超市斲丧数据.csv /202201/data/(把在Linx下的数据转到HDFS上)。
并使用:hdfs dfs -cat查看是否成功上传。

三.MapReduce指标计算与存储

弁言

MapReduce是一种用于大规模数据处置惩罚的编程模型,最初由Google提出,并在Apache Hadoop项目中得到了广泛的实现和应用。MapReduce分布式文件系统联合了分布式文件系统和MapReduce计算模型,为大规模数据处置惩罚提供了高效、可靠的解决方案。本文将深入探究MapReduce分布式文件系统的原理、架构和应用场景。
MapReduce分布式文件系统架构

MapReduce分布式文件系统的架构通常由以下几个核心组件构成:

  • Master节点:Master节点是MapReduce系统的控制节点,负责协调解个作业的执行过程。它包含了JobTracker(作业跟踪器)和Resource Manager(资源管理器)等关键组件。
  • Worker节点:Worker节点是MapReduce集群中的计算节点,负责执行Map和Reduce使命。每个Worker节点包含了TaskTracker(使命跟踪器)和DataNode(数据节点)等关键组件。
  • 分布式文件系统:MapReduce分布式文件系统通常基于Hadoop分布式文件系统(HDFS)或其他分布式文件系统实现。它负责存储输入数据和中间计算结果,并提供高可靠性、高扩展性的分布式存储服务。
工作原理

MapReduce分布式文件系统的工作原理可以简述为以下几个步骤:

  • 作业提交:用户通过客户端提交MapReduce作业到Master节点,包罗输入数据的位置和MapReduce使命的代码。
  • 作业调度:Master节点吸收到作业后,将其分配给空闲的Worker节点举行执行。它负责调度Map和Reduce使命的执行顺序,并监控整个作业的执行进度。
  • 数据处置惩罚:Worker节点根据作业中指定的Map和Reduce函数对输入数据举行处置惩罚。Map使命将输入数据分别为多个键值对,并根据键将其分组;Reduce使命则对每个键值对组举行聚合计算。
  • 中间结果存储:MapReduce分布式文件系统将Map使命产生的中间结果存储在分布式文件系统中,以便Reduce使命举行读取和处置惩罚。
  • 结果输出:终极,Reduce使命将聚合结果写回到分布式文件系统中,并通知Master节点作业执行完成。
Mapper类

  1. import java.io.IOException;
  2. import java.text.ParseException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Calendar;
  5. import java.util.Date;
  6. import org.apache.hadoop.io.FloatWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. public class SalesMapper extends Mapper<LongWritable, Text, Text, FloatWritable> {
  11.     private Text outputKey = new Text();
  12.     private FloatWritable outputValue = new FloatWritable();
  13.     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  14.         String[] fields = value.toString().split(",");
  15.         if (fields.length == 3) {
  16.             String dateStr = fields[0]; // 日期字段
  17.             String amountStr = fields[1]; // 销售金额字段
  18.             try {
  19.                 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
  20.                 Date date = dateFormat.parse(dateStr);
  21.                 Calendar calendar = Calendar.getInstance();
  22.                 calendar.setTime(date);
  23.                 int month = calendar.get(Calendar.MONTH) + 1; // 月份从0开始,需要加1
  24.                 outputKey.set(String.valueOf(month));
  25.                 outputValue.set(Float.parseFloat(amountStr));
  26.                 context.write(outputKey, outputValue);
  27.             } catch (ParseException e) {
  28.                 e.printStackTrace();
  29.             }
  30.         }
  31.     }
  32. }
复制代码
Reducer类

  1. import java.io.IOException;
  2. import org.apache.hadoop.io.FloatWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. public class SalesReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
  6.     private FloatWritable result = new FloatWritable();
  7.     public void reduce(Text key, Iterable<FloatWritable> values, Context context)
  8.             throws IOException, InterruptedException {
  9.         float sum = 0;
  10.         for (FloatWritable value : values) {
  11.             sum += value.get();
  12.         }
  13.         result.set(sum);
  14.         context.write(key, result);
  15.     }
  16. }
复制代码
Driver类

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.FloatWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. public class MonthlySalesDriver {
  9.     public static void main(String[] args) throws Exception {
  10.         if (args.length != 2) {
  11.             System.err.println("Usage: MonthlySalesDriver <input path> <output path>");
  12.             System.exit(-1);
  13.         }
  14.         Configuration conf = new Configuration();
  15.         Job job = Job.getInstance(conf, "Monthly Sales");
  16.         job.setJarByClass(MonthlySalesDriver.class);
  17.         job.setMapperClass(SalesMapper.class);
  18.         job.setReducerClass(SalesReducer.class);
  19.         job.setOutputKeyClass(Text.class);
  20.         job.setOutputValueClass(FloatWritable.class);
  21.         FileInputFormat.addInputPath(job, new Path(args[0]));
  22.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  23.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  24.     }
  25. }
复制代码
将计算结果存储到Linux上的MySQL数据库中的t_<个人学号>表中(例如t_202201)
首先登录MySQL数据库:mysql -uroot -p123456,并手动创建数据库:create database mydata; 然后并查看是否创建成功:show databases;

使用use mydata 切换到目的数据库,然后使用show tables 查看全部的位于mydata数据库下的全部表。

使用select * from t_20220322,查看是否有数据存在,如如有,则阐明成功


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

滴水恩情

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表