头歌实践教学平台 大数据编程 实训答案(三)

打印 上一主题 下一主题

主题 862|帖子 862|积分 2586

第一章 遍历日记数据


用 Spark 遍历日记数据

第1关:用 Spark 得到日记文件中记录总数

使命形貌
本关使命:编写一个能用 Spark 操纵日记文件并输出日记文件记录数的小程序。
相关知识
为了完本钱关使命,你需要掌握:1.搜索查询日记的内容,2.如何用 Spark 得到日记文件,3.如何得到日记文件的记录数。
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object Test1 {
  3.   def main(args: Array[String]) {
  4.     // SparkConf包含了Spark配置的各种参数,
  5.     // local:设置为本地运行
  6.     // *:使用本地的所有cpu核
  7.     // setAppName:设置本应用程序的别名(自定义)
  8.     val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")
  9.     // 进入Spark操作的入口
  10.     val sc = new SparkContext(sparkConf)
  11.     // 获得文本文件内容
  12.     val sou = sc.textFile("/data/workspace/myshixun/projectsou1_1/src/soulog.txt")
  13.     //********** Begin **********
  14.     println("搜索日志文件中共有%d条记录".format(sou.count()))
  15.     //********** End **********
  16.   }
  17. }
复制代码
第2关:用 Spark 得到日记文件中记录内容

使命形貌
本关使命:编写一个能用 Spark 得到日记文件中记录内容的小程序。
相关知识
为了完本钱关使命,你需要掌握:如何用 Spark 得到日记文件中记录内容。
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object Test2 {
  3.   def main(args: Array[String]) {
  4.     // SparkConf包含了Spark配置的各种参数,
  5.     // local:设置为本地运行
  6.     // *:使用本地的所有cpu核
  7.     // setAppName:设置本应用程序的别名(自定义)
  8.     val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")
  9.     // 进入Spark操作的入口
  10.     val sc = new SparkContext(sparkConf)
  11.     // 获得文本文件内容
  12.     val sou = sc.textFile("/data/workspace/myshixun/projectsou1_1/src/soulog.txt")
  13.     //********** Begin **********
  14.     val rdd1 = sou.map {
  15.       case log =>
  16.         val logSplit = log.split("\\s")
  17.         (logSplit(3),logSplit(4))
  18.     }
  19.     rdd1.collect.take(6).foreach(println(_))
  20.     //********** End **********
  21.   }
  22. }
复制代码
第二章 过滤日记数据


用 Spark 过滤日记数据

第1关:掌握用 Spark 过滤日记数据

使命形貌
本关使命:编写一个能用 Spark 过滤日记数据的小程序。
相关知识
为了完本钱关使命,你需要掌握:如何用 Spark 过滤日记数据。
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object Test1 {
  3.   def main(args: Array[String]) {
  4.     // SparkConf包含了Spark配置的各种参数,
  5.     // local:设置为本地运行
  6.     // *:使用本地的所有cpu核
  7.     // setAppName:设置本应用程序的别名(自定义)
  8.     val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")
  9.     // 进入Spark操作的入口
  10.     val sc = new SparkContext(sparkConf)
  11.     // 获得文本文件内容
  12.     val sou = sc.textFile("/data/workspace/myshixun/projectsou2_1/src/soulog.txt")
  13.     //********** Begin **********
  14.     val splitSou = sou.map(_.split("\\s"))
  15.     val filterSou = splitSou  
  16.       .filter(_ (3).toInt == 1)
  17.       .filter(_ (4).toInt == 1)
  18.     print(filterSou.count())
  19.     //********** End **********
  20.   }
  21. }
复制代码
第三章 聚合、排序日记数据


用 Spark 对日记数据举行排序

第1关:用 Spark 对日记数据举行排序

使命形貌
本关使命:编写一个能用 Spark 对日记数据举行排序的小程序。
相关知识
为了完本钱关使命,你需要掌握:如何用 Spark 对日记数据举行排序。
排序操纵
要对上节课的数据举行排序操纵,才气从大到小输出排行榜,
比如上节课得到的数据是:
(222,1)
(111,3)
(333,2)
格式是(用户id,查询次数),
现在要将这些数据按照查询次数的从大到小举行排序,也就是降序排序,代码如下:
     val sparkConf = new SparkConf().setMaster("local
  • ").setAppName("sou")
         val sou = sc.textFile("src/soulog2.txt")
         val splitSou = sou.map(_.split("\\s"))
         val result=splitSou.map(x => (x(1),1))
           .reduceByKey(_+_)
         // 将之前的结果举行降序排序,输出用户查询次数的排行榜
           val sortResult=result
           .map(x => (x._2,x._1))
           .sortByKey(false)
           .map(x => (x._2,x._1))
         sortResult.collect().foreach(println(_))
    1. import org.apache.spark.{SparkConf, SparkContext}
    2. object Test1 {
    3.   def main(args: Array[String]) {
    4.     // SparkConf包含了Spark配置的各种参数,
    5.     // local:设置为本地运行
    6.     // *:使用本地的所有cpu核
    7.     // setAppName:设置本应用程序的别名(自定义)
    8.     val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")
    9.     // 进入Spark操作的入口
    10.     val sc = new SparkContext(sparkConf)
    11.     // 获得文本文件内容
    12.     val sou = sc.textFile("/data/workspace/myshixun/projectsou4_1/src/soulog.txt")
    13.     //\s代表正则表达式中的一个空白字符(可能是空格、制表符、其他空白)
    14.     //分割后,输出总记录数
    15.     val splitSou = sou.map(_.split("\\s"))
    16.     // 日志文件中总共有10000条记录
    17.     println(splitSou.count())
    18.     //只查询在返回结果中的排名和用户点击的顺序号都为1的记录,
    19.     // 之前已经分隔成6个部分的数据,
    20.     // 现在我们要查询排名第1(也就是第4个部分数据)以及用户点击排名第1(也就是第5个数据)的数据
    21.     // 可以用连续的filter方法来进行多次过滤,
    22.     // 注意将排名值用toInt方法转换为整数
    23.     val filterSou = splitSou
    24.       .filter(_ (3).toInt == 1)
    25.       .filter(_ (4).toInt == 1)
    26.     // 获得经过以上过滤的数据,并且,将每条记录的用户id取出来,并给每条记录加一个value值为1
    27.     val result=filterSou.map(x => (x(1),1))
    28.     // 将相同用户的查询次数统计出来
    29.       .reduceByKey(_+_)
    30.     //********** Begin **********
    31.     val sortResult=result
    32.         // 因为我们要按key进行排序,而之前的结果的key是用户id,value是次数
    33.         // 所以我们将原来的key和value互换位置,
    34.         // x._1就是(key,value)的第一个元素key,x._2就是(key,value)的第二个元素value
    35.         // 所以我们用map方法互换了key和value的位置
    36.       .map(x => (x._2,x._1))
    37.         // 然后按照现在的key也就是查询次数来进行排序,因为是排行榜,从大到小,所以是降序排序
    38.       .sortByKey(false)
    39.         //排完序后,再将排完序的数据的key和value进行互换,
    40.       .map(x => (x._2,x._1))
    41.     // 输出用户查询次数
    42.     sortResult.collect().take(10).foreach(println(_))
    43.     //********** End **********
    44.   }
    45. }
    复制代码

    用 Spark 对日记数据举行聚合

    第1关:用 Spark 对日记数据举行聚合

    使命形貌
    本关使命:编写一个能用 Spark 对日记数据举行聚合的小程序。
    相关知识
    为了完本钱关使命,你需要掌握:如何用 Spark 对日记数据举行聚合。
    聚合操纵
    我们经常要对数据举行聚合操纵,
    比如对于以下数据:
    时间       用户id   查询的词
    00:00:00    111    [查询词1]
    00:00:00    111    [查询词2]
    00:00:00    333    [查询词3]
    00:00:00    111    [查询词4]
    00:00:00    222    [查询词5]
    00:00:00    333    [查询词5]
    我们要查询出每个用户查询的次数,
    可以用以下代码来实现:
         val sparkConf = new SparkConf().setMaster("local
  • ").setAppName("sou")
         val sc = new SparkContext(sparkConf)
         val sou = sc.textFile("src/soulog2.txt")
         val splitSou = sou.map(_.split("\\s"))
         val result=splitSou.map(x => (x(1),1))
           .reduceByKey(_+_)
         result.collect().foreach(println(_))
    1. import org.apache.spark.{SparkConf, SparkContext}
    2. object Test1 {
    3.   def main(args: Array[String]) {
    4.     // SparkConf包含了Spark配置的各种参数,  
    5.     // local:设置为本地运行  
    6.     // *:使用本地的所有cpu核  
    7.     // setAppName:设置本应用程序的别名(自定义)  
    8.     val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")
    9.     // 进入Spark操作的入口  
    10.     val sc = new SparkContext(sparkConf)
    11.     // 获得文本文件内容  
    12.     val sou = sc.textFile("/data/workspace/myshixun/projectsou3_1/src/soulog.txt")
    13.     //\s代表正则表达式中的一个空白字符(可能是空格、制表符、其他空白)  
    14.     //分割后,输出总记录数  
    15.     val splitSou = sou.map(_.split("\\s"))
    16.     // 日志文件中总共有10000条记录  
    17.     println(splitSou.count())
    18.     //只查询在返回结果中的排名和用户点击的顺序号都为1的记录,  
    19.     // 之前已经分隔成6个部分的数据,  
    20.     // 现在我们要查询排名第1(也就是第4个部分数据)以及用户点击排名第1(也就是第5个数据)的数据  
    21.     // 可以用连续的filter方法来进行多次过滤,  
    22.     // 注意将排名值用toInt方法转换为整数  
    23.     val filterSou = splitSou  
    24.       .filter(_ (3).toInt == 1)  
    25.       .filter(_ (4).toInt == 1)  
    26.     //********** Begin **********
    27.     val result=filterSou.map(x => (x(1),1))
    28.       // 将相同用户的查询次数统计出来
    29.       .reduceByKey(_+_)
    30.     result.collect().take(5).foreach(println(_))
    31.     //********** End **********
    32.   }
    33. }
    复制代码
    第一章 Spark 入门


    Spark Standalone 模式的安装和部署

    第1关: Standalone 分布式集群搭建

    使命形貌
    掌握 Standalone 分布式集群搭建。
    相关知识
    我们已经掌握了 Spark 单机版安装,那么分布式集群怎么搭建呢? 接下来我们学习 Standalone 分布式集群搭建。
    启动环境
    1. cd /home
    2. wrapdocker
    3. ulimit -f 1024000
    4. # 加载镜像
    5. docker load -i hbase-ssh2_v1.0.tar
    6. # 启动集群 启动失败则等一会,再次执行,直至成功
    7. docker-compose up -d
    8. # 新开一个命令行窗口 master         密码统一为 123456
    9. ssh 172.18.0.2
    10. ssh-keygen -t rsa #三下回车
    11. # 新开一个命令行窗口 slave1
    12. ssh 172.18.0.3
    13. ssh-keygen -t rsa #三下回车
    14. # 新开一个命令行窗口 slave2
    15. ssh 172.18.0.4
    16. ssh-keygen -t rsa #三下回车
    17. #  在 master 复制 master、slave1、slave2 的公钥。
    18. cat ~/.ssh/id_rsa.pub>> ~/.ssh/authorized_keys
    19. ssh root@slave1 cat ~/.ssh/id_rsa.pub>> ~/.ssh/authorized_keys
    20. ssh root@slave2 cat ~/.ssh/id_rsa.pub>> ~/.ssh/authorized_keys
    21. # 分别在 slave1、slave2 复制 master 的 authorized_keys 文件。
    22. ssh root@master cat ~/.ssh/authorized_keys>> ~/.ssh/authorized_keys
    23. # 在第1个命令行窗口 密码为123456
    24. scp -r /usr/local/spark-2.3.4-bin-hadoop2.7 root@172.18.0.2:/usr/local
    25. # 在master(第2个命令行窗口) 修改配置 注意:未提示更换命令行则一直在master上执行
    26. echo "export SPARK_HOME=/usr/local/spark-2.3.4-bin-hadoop2.7" >> /etc/profile
    27. source /etc/profile
    28. cd /usr/local/spark-2.3.4-bin-hadoop2.7/conf
    29. mv spark-env.sh.template spark-env.sh
    30. echo "export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_111" >> spark-env.sh
    31. echo "SPARK_MASTER_WEBUI_PORT=8888" >> spark-env.sh
    32. echo "export SPARK_MASTER_IP=master" >> spark-env.sh
    33. mv slaves.template slaves
    34. vi slaves
    35. # 修改为以下内容
    36. master
    37. slave1
    38. slave2
    39. # 分发安装包
    40. cd /usr/local
    41. scp -r spark-2.3.4-bin-hadoop2.7/ root@slave1:/usr/local
    42. scp -r spark-2.3.4-bin-hadoop2.7/ root@slave2:/usr/local
    43. # 启动集群
    44. cd /usr/local/spark-2.3.4-bin-hadoop2.7/sbin
    45. ./start-all.sh
    复制代码

    Spark的安装与使用

    第1关:Scala 环境的安装与部署

    使命形貌
    本关使命:安装与配置Scala开发环境。
    相关知识
    Scala是一种函数式面向对象语言,它融汇了许多前所未有的特性,而同时又运行于JVM之上。随着开发者对Scala的兴趣日增,以及越来越多的工具支持,无疑Scala语言将成为你手上一件必不可少的工具。
    而我们将要学习的大数据框架Spark底层是使用Scala开发的,使用scala写出的代码长度是使用java写出的代码长度的1/10左右,代码实现更加简练。
    以是安装与配置Scala的环境是我们在开始学习Spark之前要完成的准备工作。
    接下来我们开始安装,分为三个步骤:
    下载解压;
    配置环境;
    校验。
    启动环境
    1. mkdir /app
    2. cd /opt/
    3. tar -zxvf  scala-2.12.7.tgz -C /app
    4. vi /etc/profile
    5. SCALA_HOME=/app/scala-2.12.7
    6. export PATH=$PATH:$SCALA_HOME/bin
    7. source /etc/profile
    复制代码
    第2关:Spark 环境安装

    使命形貌
    本关使命:安装与配置Spark开发环境。
    相关知识
    Apache Spark是专为大规模数据处置惩罚而筹划的快速通用的计算引擎。Spark是UC Berkeley AMP lab(加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中心输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地实用于数据发掘与机器学习等需要迭代的MapReduce的算法。
    本关我们来配置一个伪分布式的Spark开发环境,与配置Hadoop雷同分为三个步骤:
    下载解压安装包;
    配置环境变量;
    配置Spark环境;
    校验。
    1. cd /opt/
    2. tar -zxvf spark-2.2.2-bin-hadoop2.7.tgz -C /app
    3. vim /etc/profile
    4. SPARK_HOME=/app/spark-2.2.2-bin-hadooop2.7
    5. export PATH=$PATH:$SPARK_HOME/bin
    6. source /etc/profile
    7. cd /app/spark-2.2.2-bin-hadoop2.7/conf
    8. cp spark-env.sh.template spark-env.sh
    9. vi spark-env.sh
    10. export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_111
    11. export SCALA_HOME=/app/scala-2.12.7
    12. export HADOOP_HOME=/usr/local/hadoop/
    13. export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
    14. export SPARK_MASTER_IP=127.0.0.1
    15. export SPARK_LOCAL_IP=127.0.0.1
    16. cd /app/spark-2.2.2-bin-hadoop2.7
    17. ./sbin/start-all.sh
    复制代码
    第三章 SparkSQL结构化数据分析与处置惩罚


    军用大数据 - 结构化数据分析与处置惩罚

    第1关:Spark SQL入门

    使命形貌
    掌握 Spark SQL 相关根本知识,完成选择题使命。
    相关知识
    Spark SQL 是 Spark 用来处置惩罚结构化数据的一个模块。Spark SQL 为了支持结构化数据的处置惩罚,它提供了两个编程抽象分别叫做 DataFrame 和DataSet。
    1、C;2、AB
    第2关:使用Spark SQL统计战斗机飞行性能

    使命形貌
    通过飞行速度统计出战斗机飞行性能排比。
    相关知识
    本关使用  mySQL 统计战斗机飞行性能。
    1. # coding=utf-8
    2. from pyspark.sql import SparkSession
    3. #**********Begin**********#
    4. #创建SparkSession
    5. spark = SparkSession \
    6.     .builder \
    7.     .appName("Python Spark SQL basic example") \
    8.     .config("spark.sql.crossJoin.enabled", "true") \
    9.     .master("local") \
    10.     .getOrCreate()
    11.    
    12. #读取/root/jun.json中数据
    13. df =spark.read.json("/root/jun.json")
    14. #创建视图
    15. df.createOrReplaceTempView("table1")
    16. #统计出全球飞行速度排名前三的战斗机
    17. sqlDF = spark.sql("select cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),',','') as float) as SPEED, `名称` from table1 order by SPEED desc LIMIT 3")
    18. #保存结果
    19. sqlDF.write.format("csv").save("/root/airspark")
    20. #**********End**********#
    21. spark.stop()
    复制代码
    第3关:使用Spark SQL统计各个研发单位研制战斗机占比

    使命形貌
    统计出各个研发单位研制战斗机占比。
    相关知识
    使用 Spark SQL 统计各个研发单位研制战斗机占比。
    1. # coding=utf-8
    2. from pyspark.sql import SparkSession
    3. #**********Begin**********#
    4. #创建SparkSession
    5. spark = SparkSession \
    6.     .builder \
    7.     .appName("Python Spark SQL basic example") \
    8.     .config("spark.sql.crossJoin.enabled", "true") \
    9.     .master("local") \
    10.     .getOrCreate()
    11.    
    12. #读取/root/jun.json中数据
    13. df =spark.read.json("/root/jun.json").coalesce(1)
    14. #创建视图
    15. df.createOrReplaceTempView("table1")
    16. #统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比
    17. sqlDF = spark.sql("select concat(cast(round(count(`研发单位`)*100/(select count(`研发单位`) from table1 where `研发单位` is not null and `名称` is not null ),2) as float),'%'),`研发单位` from table1 where `研发单位` is not null and `名称` is not null group by `研发单位`")
    18. #保存结果
    19. sqlDF.write.format("csv").save("/root/airspark")
    20. #**********End**********#
    21. spark.stop()
    复制代码
    第四章 Spark结构化流处置惩罚


    军用大数据--结构化流式数据处置惩罚

    第1关:Spark结构化流快速入门

    使命形貌
    Spark Streaming 是一套良好的实时计算框架。其良好的可扩展性、高吞吐量以及容错机制能够满足我们许多的场景应用。本关联合我们的应用场景,介结我们如何使用 Spark Streaming 处置惩罚数据。
    1. # -*- coding: utf-8
    2. from pyspark.sql import SparkSession
    3. from pyspark.sql.functions import explode
    4. from pyspark.sql.functions import split
    5. import time
    6. # 请在此处编写代码
    7. #********** Begin **********#
    8. spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
    9. spark.sparkContext.setLogLevel("error")
    10. lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
    11. # Split the lines into words
    12. words = lines.select(
    13.    explode(
    14.        split(lines.value, " ")
    15.    ).alias("word")
    16. )
    17. # Generate running word count
    18. wordCounts = words.groupBy("word").count()
    19. # Start running the query that prints the running counts to the console
    20. query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime='1 seconds').start()
    21. time.sleep( 20 )
    22. query.stop()
    23. #********** End **********#
    复制代码
    第2关:对飞机的点击次数实时统计

    使命形貌
    Spark Streaming 是一套良好的实时计算框架。其良好的可扩展性、高吞吐量以及容错机制能够满足我们许多的场景应用。上一关我们先容了如何实时获取数据,并处置惩罚数据,本关联合上一关的场景,联合 Kafka 把分析结果读出来。
    相关知识
    Spark Streaming 其良好的特点给我们带来许多的应用场景。本关中,将通过从 TCP 获取数据来举行先容。
    1. import time
    2. from pyspark.sql import SparkSession
    3. #********** Begin **********#
    4. spark = SparkSession.builder.master("local[2]").appName("case2").getOrCreate()
    5. spark.sparkContext.setLogLevel("error")
    6. df = spark.readStream.format("socket").option("host", "localhost").option("port", 9998).load()
    7. ds=df.selectExpr( "CAST(value AS STRING)")
    8. ds.createOrReplaceTempView("planeNumber")
    9. sql= spark.sql("select count(*) nums,value from planeNumber   group  by value order by nums desc");
    10. query =  sql.writeStream.format("console").outputMode("complete").start()
    11. time.sleep( 20 )
    12. query.stop()
    13. #********** End **********#
    复制代码

    kafka-入门篇

    第1关:kafka - 初体验

    使命形貌
    本关使命:使用 Kafka 命令创建一个副本数量为1、分区数量为3的 Topic 。
    相关知识
    为了完本钱关使命,你需要掌握:1.如何使用 Kafka 的常用命令。
    1. #!/bin/bash
    2. #1.创建一个名为demo的Topic
    3. kafka-topics.sh -create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic demo
    4. #2.查看所有Topic
    5. kafka-topics.sh --list --zookeeper 127.0.0.1:2181
    6. #3.查看名为demo的Topic的详情信息
    7. kafka-topics.sh -topic demo --describe --zookeeper 127.0.0.1:2181
    复制代码
    第2关:生产者 (Producer ) - 简朴模式

    使命形貌
    本关使命:编写一个 Kafka 的 Producer 举行数据生产。
    相关知识
    为了完本钱关使命,你需要掌握:1.如何使用 Kafka 的 Producer API 举行数据生产。
    1. package net.educoder;
    2. import org.apache.kafka.clients.producer.KafkaProducer;
    3. import org.apache.kafka.clients.producer.Producer;
    4. import org.apache.kafka.clients.producer.ProducerRecord;
    5. import java.util.Properties;
    6. /**
    7. * kafka producer 简单模式
    8. */
    9. public class App {
    10.     public static void main(String[] args) {
    11.         /**
    12.          * 1.创建配置文件对象,一般采用 Properties
    13.          */
    14.         /**----------------begin-----------------------*/
    15.         Properties props = new Properties();
    16.         /**-----------------end-------------------------*/
    17.         /**
    18.          * 2.设置kafka的一些参数
    19.          *          bootstrap.servers --> kafka的连接地址 127.0.0.1:9092
    20.          *          key、value的序列化类 -->org.apache.kafka.common.serialization.StringSerializer
    21.          *          acks:1,-1,0
    22.          */
    23.         /**-----------------begin-----------------------*/
    24.         props.put("bootstrap.servers", "127.0.0.1:9092");
    25.         props.put("acks", "1");
    26.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    27.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    28.         /**-----------------end-------------------------*/
    29.         /**
    30.          * 3.构建kafkaProducer对象
    31.          */
    32.         /**-----------------begin-----------------------*/
    33.         Producer<String, String> producer = new KafkaProducer<>(props);
    34.         /**-----------------end-------------------------*/
    35.         for (int i = 0; i < 100; i++) {
    36.             ProducerRecord<String, String> record = new ProducerRecord<>("demo", i + "", i + "");
    37.             /**
    38.              * 4.发送消息
    39.              */
    40.             /**-----------------begin-----------------------*/
    41.             producer.send(record);
    42.             /**-----------------end-------------------------*/
    43.         }
    44.         producer.close();
    45.     }
    46. }
    复制代码
    第3关:消耗者( Consumer)- 自动提交偏移量

    使命形貌
    本关使命:编写一个 Kafka 消耗者并设置自动提交偏移量举行数据消耗。
    相关知识
    为了完本钱关使命,你需要掌握:1.如何编写 Kafka 消耗者,2.如何使用自动提交偏移量。
    1. package net.educoder;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.apache.kafka.clients.consumer.ConsumerRecords;
    4. import org.apache.kafka.clients.consumer.KafkaConsumer;
    5. import java.util.Arrays;
    6. import java.util.Properties;
    7. public class App {
    8.     public static void main(String[] args) {
    9.         Properties props = new Properties();
    10.         /**--------------begin----------------*/
    11.         //1.设置kafka集群的地址
    12.         props.put("bootstrap.servers", "127.0.0.1:9092");
    13.         //2.设置消费者组,组名字自定义,组名字相同的消费者在一个组
    14.         props.put("group.id", "g1");
    15.         //3.开启offset自动提交
    16.         props.put("enable.auto.commit", "true");
    17.         //4.自动提交时间间隔
    18.         props.put("auto.commit.interval.ms", "1000");
    19.         //5.序列化器
    20.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    21.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    22.         /**---------------end---------------*/
    23.         /**--------------begin----------------*/
    24.         //6.创建kafka消费者
    25.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    26.         //7.订阅kafka的topic
    27.         consumer.subscribe(Arrays.asList("demo"));
    28.         /**---------------end---------------*/
    29.         int i = 1;
    30.         while (true) {
    31.             /**----------------------begin--------------------------------*/
    32.             //8.poll消息数据,返回的变量为crs
    33.             ConsumerRecords<String, String> crs = consumer.poll(100);
    34.             for (ConsumerRecord<String, String> cr : crs) {
    35.                 System.out.println("consume data:" + i);
    36.                 i++;
    37.             }
    38.             /**----------------------end--------------------------------*/
    39.             if (i > 10) {
    40.                 return;
    41.             }
    42.         }
    43.     }
    44. }
    复制代码
    第4关:消耗者( Consumer )- 手动提交偏移量

    使命形貌
    本关使命:编写一个 Kafka 消耗者并使用手动提交偏移量举行数据消耗。
    相关知识
    为了完本钱关使命,你需要掌握:1.如何编写 Kafka 消耗者,2.如何手动提交偏移量。
    Kafka 两种手动提交方式
    异步提交( CommitAsync ):
        异步模式下,提交失败也不会尝试提交。消耗者线程不会被壅闭,因为异步操纵,可能在提交偏移量操纵结果未返回时就开始下一次拉取操纵。
    同步提交( CommitSync ):
        同步模式下,提交失败时不停尝试提交,直到碰到无法重试才竣事。同步方式下,消耗者线程在拉取消息时会被壅闭,直到偏移量提交操纵成功大概在提交过程中发生错误。
    留意:实现手动提交前需要在创建消耗者时关闭自动提交,设置enable.auto.commit=false
    1. package net.educoder;
    2. import org.apache.kafka.clients.consumer.ConsumerRecord;
    3. import org.apache.kafka.clients.consumer.ConsumerRecords;
    4. import org.apache.kafka.clients.consumer.KafkaConsumer;
    5. import java.util.ArrayList;
    6. import java.util.Arrays;
    7. import java.util.List;
    8. import java.util.Properties;
    9. public class App {
    10.     public static void main(String[] args){
    11.         Properties props = new Properties();
    12.         /**-----------------begin------------------------*/
    13.         //1.设置kafka集群的地址
    14.         props.put("bootstrap.servers", "127.0.0.1:9092");
    15.         //2.设置消费者组,组名字自定义,组名字相同的消费者在一个组
    16.         props.put("group.id", "g1");
    17.         //3.关闭offset自动提交
    18.         props.put("enable.auto.commit", "false");
    19.         //4.序列化器
    20.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    21.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    22.         /**-----------------end------------------------*/
    23.         /**-----------------begin------------------------*/
    24.         //5.实例化一个消费者
    25.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    26.         //6.消费者订阅主题,订阅名为demo的主题
    27.         consumer.subscribe(Arrays.asList("demo"));
    28.         /**-----------------end------------------------*/
    29.         final int minBatchSize = 10;
    30.         List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    31.         while (true) {
    32.             ConsumerRecords<String, String> records = consumer.poll(100);
    33.             for (ConsumerRecord<String, String> record : records) {
    34.                 buffer.add(record);
    35.             }
    36.             if (buffer.size() >= minBatchSize) {
    37.                 for (ConsumerRecord bf : buffer) {
    38.                     System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value());
    39.                 }
    40.                 /**-----------------begin------------------------*/
    41.                 //7.手动提交偏移量
    42.                 consumer.commitSync();
    43.                 /**-----------------end------------------------*/
    44.                 buffer.clear();
    45.                 return;
    46.             }
    47.         }
    48.     }
    49. }
    复制代码
    第六章 Spark MLib机器学习


    军用大数据 - Spark机器学习

    第1关:Iris 分类

    使命形貌
    本关使命:使用 pyspark ml 的LogisticRegression分类器完成 Iris 分类使命。
    相关知识
    观察数据集

    我们本次使用的数据集是sklearn自带的数据集Iris。
    接下来,我们来相识下Iris数据集的数据吧!
    1. # -*- coding: utf-8 -*-
    2. from pyspark.sql import SparkSession
    3. from sklearn.datasets import load_iris
    4. import pandas
    5. from pyspark.ml.classification import LogisticRegression
    6. from pyspark.mllib.evaluation import BinaryClassificationMetrics
    7. from pyspark.ml.feature import RFormula
    8. # 训练模型
    9. def trainingModel(spark):
    10.     # ********** Begin ********** #
    11.    
    12.     # 1.加载sklearn的训练数据
    13.     iris =
    复制代码
    免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
  • 回复

    使用道具 举报

    0 个回复

    倒序浏览

    快速回复

    您需要登录后才可以回帖 登录 or 立即注册

    本版积分规则

    千千梦丶琪

    金牌会员
    这个人很懒什么都没写!

    标签云

    快速回复 返回顶部 返回列表