前言:超市贩卖数据的管理和分析对于商家来说至关紧张。借助Hadoop生态系统中的MapReduce框架,我们可以有效地处置惩罚大规模的贩卖数据,并从中提取有价值的信息,比如每个月的贩卖总额。本文将先容怎样利用Hadoop编写MapReduce步伐来计算超市斲丧数据的月份贩卖总额。
一. 数据准备
首先,我们需要准备超市的贩卖数据。假设我们的数据以文本文件的形式存储,每行代表一笔贩卖记录,包罗日期、商品名称和贩卖额等信息。例如:
二.Hadoop分布式文件系统(HDFS)
先容
Hadoop分布式文件系统(HDFS)是Apache Hadoop生态系统的核心组件之一,用于存储和管理大规模数据集。它设计用于在便宜的硬件上运行,并且提供高可靠性、高性能的分布式存储解决方案。本文将深入探究HDFS的工作原理、架构和优势。
HDFS架构
HDFS的架构由以下几个紧张组件构成:
- NameNode:NameNode是HDFS的关键组件之一,负责管理文件系统的定名空间和存储块的元数据信息。它维护了文件系统树及其相关属性,并且记录了每个文件块的位置信息。
- DataNode:DataNode是另一个紧张组件,负责现实存储数据块。每个DataNode管理着其所在节点上的存储,定期向NameNode报告其存储容量和康健状况。
- Secondary NameNode:Secondary NameNode并不是NameNode的备份,而是协助NameNode举行元数据的周期性归并和编辑日志的滚动。它资助减轻了NameNode的压力,并且有助于提高系统的可靠性。
- 客户端:客户端是与HDFS交互的应用步伐,负责向HDFS读取和写入数据。
HDFS工作原理
HDFS的工作原理可以简要概括为以下几个步骤:
- 文件分块:当一个文件被上传到HDFS时,它会被分别成固定大小的数据块(通常为128MB或256MB)。这些数据块被分散存储在多个DataNode上,以实现数据的分布式存储和处置惩罚。
- 定名空间管理:NameNode负责管理文件系统的定名空间和元数据信息。它维护文件系统树,记录文件和数据块的位置信息,并处置惩罚客户端的文件系统操纵请求。
- 数据复制:为了提高数据的可靠性和容错性,HDFS会将每个数据块复制到多个DataNode上。这些副本通常分布在不同的机架上,以减少硬件故障对数据的影响。
- 容错机制:HDFS通过周期性的心跳检测和块报告来监视DataNode的康健状况。如果发现某个数据块的副本丢失或损坏,HDFS会主动从其他DataNode上的副本举行规复。
HDFS的优势
HDFS作为大数据存储解决方案,具有以下几个显著的优势:
- 高可靠性:HDFS通过数据复制和容错机制,包管了数据的可靠性和容错性,即使在硬件故障的环境下也能保持数据的完备性。
- 高扩展性:HDFS可以轻松地扩展到数以千计的节点,以应对不断增长的数据量和处置惩罚需求。
- 高性能:由于数据块的分布式存储和并行处置惩罚,HDFS可以实现高吞吐量和低延迟的数据访问。
- 本钱效益:HDFS运行在便宜的标准硬件上,并且采用了大规模并行处置惩罚的模式,使得它成为一种经济高效的数据存储解决方案。
在HDFS的/<个人学号>/data/路径(例如/202201/data)下创建并存储该数据文件;
通过hdfs dfs -cat/<个人学号>/data/xxx下令查看数据文件的截图,用以证实数据已成功存储在HDFS 上。
1.首先打开HDFS进程,并用hdfs dfs mkdir -p /学号/data 下令在HDFS中创建文件夹,并用hdfs dfs -ls/查看是否创建。
2. 将csv文件准备好,并移动到linxu操纵系统的 /opt/ 目录下,运用下令 hdfs dfs -put /opt/超市斲丧数据.csv /202201/data/(把在Linx下的数据转到HDFS上)。
并使用:hdfs dfs -cat查看是否成功上传。
三.MapReduce指标计算与存储
弁言
MapReduce是一种用于大规模数据处置惩罚的编程模型,最初由Google提出,并在Apache Hadoop项目中得到了广泛的实现和应用。MapReduce分布式文件系统联合了分布式文件系统和MapReduce计算模型,为大规模数据处置惩罚提供了高效、可靠的解决方案。本文将深入探究MapReduce分布式文件系统的原理、架构和应用场景。
MapReduce分布式文件系统架构
MapReduce分布式文件系统的架构通常由以下几个核心组件构成:
- Master节点:Master节点是MapReduce系统的控制节点,负责协调解个作业的执行过程。它包含了JobTracker(作业跟踪器)和Resource Manager(资源管理器)等关键组件。
- Worker节点:Worker节点是MapReduce集群中的计算节点,负责执行Map和Reduce使命。每个Worker节点包含了TaskTracker(使命跟踪器)和DataNode(数据节点)等关键组件。
- 分布式文件系统:MapReduce分布式文件系统通常基于Hadoop分布式文件系统(HDFS)或其他分布式文件系统实现。它负责存储输入数据和中间计算结果,并提供高可靠性、高扩展性的分布式存储服务。
工作原理
MapReduce分布式文件系统的工作原理可以简述为以下几个步骤:
- 作业提交:用户通过客户端提交MapReduce作业到Master节点,包罗输入数据的位置和MapReduce使命的代码。
- 作业调度:Master节点吸收到作业后,将其分配给空闲的Worker节点举行执行。它负责调度Map和Reduce使命的执行顺序,并监控整个作业的执行进度。
- 数据处置惩罚:Worker节点根据作业中指定的Map和Reduce函数对输入数据举行处置惩罚。Map使命将输入数据分别为多个键值对,并根据键将其分组;Reduce使命则对每个键值对组举行聚合计算。
- 中间结果存储:MapReduce分布式文件系统将Map使命产生的中间结果存储在分布式文件系统中,以便Reduce使命举行读取和处置惩罚。
- 结果输出:终极,Reduce使命将聚合结果写回到分布式文件系统中,并通知Master节点作业执行完成。
Mapper类
- import java.io.IOException;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.Calendar;
- import java.util.Date;
- import org.apache.hadoop.io.FloatWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- public class SalesMapper extends Mapper<LongWritable, Text, Text, FloatWritable> {
- private Text outputKey = new Text();
- private FloatWritable outputValue = new FloatWritable();
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- String[] fields = value.toString().split(",");
- if (fields.length == 3) {
- String dateStr = fields[0]; // 日期字段
- String amountStr = fields[1]; // 销售金额字段
- try {
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
- Date date = dateFormat.parse(dateStr);
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(date);
- int month = calendar.get(Calendar.MONTH) + 1; // 月份从0开始,需要加1
- outputKey.set(String.valueOf(month));
- outputValue.set(Float.parseFloat(amountStr));
- context.write(outputKey, outputValue);
- } catch (ParseException e) {
- e.printStackTrace();
- }
- }
- }
- }
复制代码 Reducer类
- import java.io.IOException;
- import org.apache.hadoop.io.FloatWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- public class SalesReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
- private FloatWritable result = new FloatWritable();
- public void reduce(Text key, Iterable<FloatWritable> values, Context context)
- throws IOException, InterruptedException {
- float sum = 0;
- for (FloatWritable value : values) {
- sum += value.get();
- }
- result.set(sum);
- context.write(key, result);
- }
- }
复制代码 Driver类
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.FloatWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class MonthlySalesDriver {
- public static void main(String[] args) throws Exception {
- if (args.length != 2) {
- System.err.println("Usage: MonthlySalesDriver <input path> <output path>");
- System.exit(-1);
- }
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf, "Monthly Sales");
- job.setJarByClass(MonthlySalesDriver.class);
- job.setMapperClass(SalesMapper.class);
- job.setReducerClass(SalesReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FloatWritable.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
复制代码 将计算结果存储到Linux上的MySQL数据库中的t_<个人学号>表中(例如t_202201)
首先登录MySQL数据库:mysql -uroot -p123456,并手动创建数据库:create database mydata; 然后并查看是否创建成功:show databases;
使用use mydata 切换到目的数据库,然后使用show tables 查看全部的位于mydata数据库下的全部表。
使用select * from t_20220322,查看是否有数据存在,如如有,则阐明成功
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |