【1】引入pom.xml依赖
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
- <version>1.10.0</version>
- </dependency>
复制代码 【2】ES6 Scala代码,主动导入的scala包需要修改为scala._ 否则会出现错误。
- package com.zzx.flink
- import java.util
- import org.apache.flink.api.common.functions.RuntimeContext
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
- import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
- import org.apache.http.HttpHost
- import org.elasticsearch.client.Requests
- object EsSinkTest {
- def main(args: Array[String]): Unit = {
- // 创建一个流处理执行环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- //从文件中读取数据并转换为 类
- val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")
- //转换
- val dataStream: DataStream[SensorReading] = inputStreamFromFile
- .map( data => {
- var dataArray = data.split(",")
- SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
- })
- //定义一个 HttpHosts
- val httpHost = new util.ArrayList[HttpHost]()
- //默认 9200 我的修改为了 9201
- httpHost.add(new HttpHost("192.168.1.12",9200,"http"))
- httpHost.add(new HttpHost("127.0.0.1",9200,"http"))
- //定义一个 ElasticSearchFuntion 操作 es的function
- val esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {
- //element 每一条数据 通过 index 发送
- override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {
- //包装写入 es 的数据
- val dataSource = new util.HashMap[String,String]()
- dataSource.put("sensor_id",element.id)
- dataSource.put("temp",element.temperature.toString)
- dataSource.put("ts",element.timestamp.toString)
- //index
- val indexRequest = Requests.indexRequest()
- .index("sensor_temp")
- .`type`("readingdata")
- .source(dataSource)
- index.add(indexRequest)
- println("saved successfully " + element.toString)
- }
- }
- //输出值 es
- dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())
- env.execute("es")
- }
- }
复制代码 【3】ES6输出展示

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |