Flink 输出至 Elasticsearch

打印 上一主题 下一主题

主题 906|帖子 906|积分 2718

【1】引入pom.xml依赖
  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
  4.     <version>1.10.0</version>
  5. </dependency>
复制代码
【2】ES6 Scala代码,主动导入的scala包需要修改为scala._ 否则会出现错误。
  1. package com.zzx.flink
  2. import java.util
  3. import org.apache.flink.api.common.functions.RuntimeContext
  4. import org.apache.flink.streaming.api.scala._
  5. import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
  6. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
  7. import org.apache.http.HttpHost
  8. import org.elasticsearch.client.Requests
  9. object EsSinkTest {
  10.   def main(args: Array[String]): Unit = {
  11.     // 创建一个流处理执行环境
  12.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  13.     //从文件中读取数据并转换为 类
  14.     val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")
  15.     //转换
  16.     val dataStream: DataStream[SensorReading] = inputStreamFromFile
  17.       .map( data => {
  18.         var dataArray = data.split(",")
  19.         SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
  20.       })
  21.     //定义一个 HttpHosts
  22.     val httpHost = new util.ArrayList[HttpHost]()
  23.     //默认 9200 我的修改为了 9201
  24.     httpHost.add(new HttpHost("192.168.1.12",9200,"http"))
  25.     httpHost.add(new HttpHost("127.0.0.1",9200,"http"))
  26.     //定义一个 ElasticSearchFuntion 操作 es的function
  27.     val esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {
  28.       //element 每一条数据 通过 index 发送
  29.       override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {
  30.         //包装写入 es 的数据
  31.         val dataSource = new util.HashMap[String,String]()
  32.         dataSource.put("sensor_id",element.id)
  33.         dataSource.put("temp",element.temperature.toString)
  34.         dataSource.put("ts",element.timestamp.toString)
  35.         //index
  36.         val indexRequest = Requests.indexRequest()
  37.             .index("sensor_temp")
  38.             .`type`("readingdata")
  39.             .source(dataSource)
  40.         index.add(indexRequest)
  41.         println("saved successfully " + element.toString)
  42.       }
  43.     }
  44.     //输出值 es
  45.     dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())
  46.     env.execute("es")
  47.   }
  48. }
复制代码
【3】ES6输出展示


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

农民

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表