Kafka Stream实时流式计算
1 概念一样平常流式计算会与批量计算相比较。在流式计算模子中,输入是连续的,可以以为在时间上是无界的,也就意味着,永世拿不到全量数据去做计算。同时,计算结果是连续输出的,也即计算结果在时间上也是无界的。流式计算一样平常对实时性要求较高,同时一样平常是先界说目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了进步计算效率,往往尽大概采用增量计算代替全量计算。
2 应用场景
[*] 日记分析
网站的用户访问日记举行实时的分析,计算访问量,用户画像,留存率等等,实时的举行数据分析,帮助企业举行决议
[*] 大屏看板统计
可以实时的查看网站注册数量,订单数量,购买数量,金额等。
[*] 公交实时数据
可以随时更新公交车方位,计算多久到达站牌等
[*] 实时文章分值计算
头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被保举。
[*] 3 技能方案选型
[*] Hadoop
[*]https://i-blog.csdnimg.cn/direct/4cdb144a445b407f91ff2ab9104c641e.png
[*] Apche Storm
Storm 是一个分布式实时大数据处置惩罚系统,可以帮助我们方便地处置惩罚海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。
[*] Kafka Stream
可以轻松地将其嵌入任何Java应用步伐中,并与用户为其流应用步伐所拥有的任何现有打包,部署和操纵工具集成
Kafka Stream
1 概述
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据举行流式处置惩罚和分析的功能。
Kafka Stream的特点如下:
[*] Kafka Stream提供了一个非常简朴而轻量的Library,它可以非常方便地嵌入恣意Java应用中,也可以恣意方式打包和部署
[*] 除了Kafka外,无任何外部依赖
[*] 充分使用Kafka分区机制实现程度扩展温顺序性保证
[*] 通过可容错的state store实现高效的状态操纵(如windowed join和aggregation)
[*] 支持正好一次处置惩罚语义
[*] 提供记载级的处置惩罚能力,从而实现毫秒级的低耽误
[*] 支持基于事件时间的窗口操纵,并且可处置惩罚晚到的数据(late arrival of records)
[*] 同时提供底层的处置惩罚原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)
https://i-blog.csdnimg.cn/direct/7df248edcb5b47d3a370d5db3329f9c5.png
3.2 Kafka Streams的关键概念
[*] 源处置惩罚器(Source Processor):源处置惩罚器是一个没有任何上游处置惩罚器的特殊类型的流处置惩罚器。它从一个或多个kafka主题天生输入流。通过消费这些主题的消息并将它们转发到卑鄙处置惩罚器。
[*] Sink处置惩罚器:sink处置惩罚器是一个没有卑鄙流处置惩罚器的特殊类型的流处置惩罚器。它接收上游流处置惩罚器的消息发送到一个指定的Kafka主题。
3.3 KStream
[*] (1)数据布局类似于map,如下图,key-value键值对
https://i-blog.csdnimg.cn/direct/4ca7d8cb0eda47bd996956bc367f0c8a.png(2)KStream
https://i-blog.csdnimg.cn/direct/573bdf6417e444ca8315dcdbab133999.pngKStream数据流(data stream),即是一段顺序的,可以无限长,不停更新的数据集。 数据流中比较常记载的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记载的位置数据。
KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日记,每一次操纵都是向此中插入(insert)新数据。
为了说明这一点,让我们想象一下以下两个数据记载正在发送到流中:
(“ alice”,1)->(“” alice“,3)
假如您的流处置惩罚应用是要总结每个用户的价值,它将返回4了alice。为什么?因为第二条数据记载将不被视为先前记载的更新。(insert)新数据
3.4 Kafka Stream入门案例编写
(1)需求分析,求单词个数(word count)
https://i-blog.csdnimg.cn/direct/4ca81f05f79548c2911159887ae4906a.png
(2)引入依赖
在之前的kafka-demo工程的pom文件中引入
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
(3)创建原生的kafka staream入门案例
package com.heima.kafka.sample;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* 流式处理
*/
public class KafkaStreamQuickStart {
public static void main(String[] args) {
//kafka的配置信心
Properties prop = new Properties();
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
//stream 构建器
StreamsBuilder streamsBuilder = new StreamsBuilder();
//流式计算
streamProcessor(streamsBuilder);
//创建kafkaStream对象
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
//开启流式计算
kafkaStreams.start();
}
/**
* 流式计算
* 消息的内容:hello kafkahello itcast
* @param streamsBuilder
*/
private static void streamProcessor(StreamsBuilder streamsBuilder) {
//创建kstream对象,同时指定从那个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
/**
* 处理消息的value
*/
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//按照value进行聚合处理
.groupBy((key,value)->value)
//时间窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//统计单词的个数
.count()
//转换为kStream
.toStream()
.map((key,value)->{
System.out.println("key:"+key+",vlaue:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
}
}
[*]
(4)测试准备
[*] 使用生产者在topic为:itcast_topic_input中发送多条消息
[*] 使用消费者接收topic为:itcast_topic_out
结果:
[*] 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出
3.5 SpringBoot集成Kafka Stream
(1)自定设置参数
package com.heima.kafka.config;
import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
/**
* 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
*/
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
} 修改application.yml文件,在最下方添加自界说设置
kafka:
hosts: 192.168.200.130:9092
group: ${spring.application.name} (2)新增设置类,创建KStream对象,举行聚合
package com.heima.kafka.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Arrays;
@Configuration
@Slf4j
public class KafkaStreamHelloListener {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//创建kstream对象,同时指定从那个topic中接收消息
KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
})
//根据value进行聚合分组
.groupBy((key,value)->value)
//聚合计算时间间隔
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//求单词的个数
.count()
.toStream()
//处理后的结果转换为string字符串
.map((key,value)->{
System.out.println("key:"+key+",value:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
return stream;
}
} 测试:
启动微服务,正常发送消息,可以正常接收到消息
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]