农民 发表于 2024-8-7 19:33:04

Flink 输出至 Elasticsearch

【1】引入pom.xml依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
    <version>1.10.0</version>
</dependency>
【2】ES6 Scala代码,主动导入的scala包需要修改为scala._ 否则会出现错误。
package com.zzx.flink

import java.util

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests


object EsSinkTest {
def main(args: Array): Unit = {
    // 创建一个流处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据并转换为 类
    val inputStreamFromFile: DataStream = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")
    //转换
    val dataStream: DataStream = inputStreamFromFile
      .map( data => {
      var dataArray = data.split(",")
      SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
      })

    //定义一个 HttpHosts
    val httpHost = new util.ArrayList()
    //默认 9200 我的修改为了 9201
    httpHost.add(new HttpHost("192.168.1.12",9200,"http"))
    httpHost.add(new HttpHost("127.0.0.1",9200,"http"))
    //定义一个 ElasticSearchFuntion 操作 es的function
    val esSinkFunc = new ElasticsearchSinkFunction {
      //element 每一条数据 通过 index 发送
      override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {
      //包装写入 es 的数据
      val dataSource = new util.HashMap()
      dataSource.put("sensor_id",element.id)
      dataSource.put("temp",element.temperature.toString)
      dataSource.put("ts",element.timestamp.toString)

      //index
      val indexRequest = Requests.indexRequest()
            .index("sensor_temp")
            .`type`("readingdata")
            .source(dataSource)
      index.add(indexRequest)
      println("saved successfully " + element.toString)
      }
    }
    //输出值 es
    dataStream.addSink(new ElasticsearchSink.Builder(httpHost,esSinkFunc).build())
    env.execute("es")
}
}
【3】ES6输出展示
https://i-blog.csdnimg.cn/blog_migrate/004286443ed90c3cc5f6dbe3b42351ec.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Flink 输出至 Elasticsearch