马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
在 Spring Boot 项目中集成 Kafka 有多种方式,顺应不同的应用场景和需求。以下将详细先容几种常用的集成方法,包罗:
- 使用 Spring Kafka (KafkaTemplate 和 @KafkaListener)
- 使用 Spring Cloud Stream 与 Kafka Binder
- 使用 Spring for Apache Kafka Reactive(基于 Reactor)
- 手动配置 Producer 和 Consumer Bean
- 使用 Spring Integration Kafka
- 在测试中使用嵌入式 Kafka
每种方法都有其特点和适用场景,选择合适的方法可以或许有效提拔开发效率和应用性能。
1. 使用 Spring Kafka (KafkaTemplate 和 @KafkaListener)
这是最常用的 Spring Boot 集成 Kafka 的方式,依赖于 Spring for Apache Kafka 项目,提供了 KafkaTemplate 用于发送消息,以及 @KafkaListener 注解用于接收消息。
步骤一:添加 Maven 依赖
在 pom.xml 中引入 spring-kafka 依赖:
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
复制代码 步骤二:配置 application.properties 或 application.yml
示例 (application.properties):
- # Kafka 集群地址
- spring.kafka.bootstrap-servers=worker1:9092,worker2:9092,worker3:9092
- # 生产者配置
- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.acks=1
- spring.kafka.producer.retries=3
- spring.kafka.producer.batch-size=16384
- spring.kafka.producer.linger.ms=1
- # 消费者配置
- spring.kafka.consumer.group-id=myGroup
- spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.auto-offset-reset=earliest
- spring.kafka.consumer.enable-auto-commit=true
复制代码 步骤三:编写消息生产者
使用 KafkaTemplate 发送消息:
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Service;
- @Service
- public class ProducerService {
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- private static final String TOPIC = "topic1";
- public void sendMessage(String message) {
- kafkaTemplate.send(TOPIC, message);
- }
- }
复制代码 步骤四:编写消息消费者
使用 @KafkaListener 接收消息:
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
- @Component
- public class ConsumerService {
- @KafkaListener(topics = "topic1", groupId = "myGroup")
- public void listen(ConsumerRecord<?, ?> record) {
- System.out.println("Received message: " + record.value());
- }
- }
复制代码 优缺点
- 长处:
- 简单易用,快速上手。
- 与 Spring 生态体系无缝集成。
- 支持事务、幂等性等高级特性。
- 缺点:
- 适用于传统的阻塞式应用,若需要响应式编程则不够友好。
2. 使用 Spring Cloud Stream 与 Kafka Binder
Spring Cloud Stream 是一个构建消息驱动微服务的框架,通过 Binder(绑定器)与不同的消息中间件集成。使用 Kafka Binder,可以更加简化 Kafka 与 Spring Boot 的集成。
步骤一:添加 Maven 依赖
在 pom.xml 中引入 spring-cloud-starter-stream-kafka 依赖:
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-kafka</artifactId>
- </dependency>
复制代码 并确保引入 Spring Cloud 的 BOM 以管理版本:
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>Hoxton.SR12</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
复制代码 步骤二:配置 application.yml
- spring:
- cloud:
- stream:
- bindings:
- output:
- destination: topic1
- contentType: application/json
- input:
- destination: topic1
- group: myGroup
- kafka:
- binder:
- brokers: worker1:9092,worker2:9092,worker3:9092
复制代码 步骤三:编写消息生产者
使用 @EnableBinding 和 Source 接口:
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.cloud.stream.annotation.EnableBinding;
- import org.springframework.cloud.stream.messaging.Source;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Service;
- @Service
- @EnableBinding(Source.class)
- public class StreamProducerService {
- @Autowired
- private Source source;
- public void sendMessage(String message) {
- source.output().send(MessageBuilder.withPayload(message).build());
- }
- }
复制代码 步骤四:编写消息消费者
使用 @StreamListener 注解:
- import org.springframework.cloud.stream.annotation.EnableBinding;
- import org.springframework.cloud.stream.annotation.StreamListener;
- import org.springframework.cloud.stream.messaging.Sink;
- import org.springframework.stereotype.Component;
- @Component
- @EnableBinding(Sink.class)
- public class StreamConsumerService {
- @StreamListener(Sink.INPUT)
- public void handleMessage(String message) {
- System.out.println("Received message: " + message);
- }
- }
复制代码 优缺点
- 长处:
- 高度抽象,淘汰配置与代码量。
- 更得当微服务架构,支持绑定多个输入输出。
- 支持多种消息中间件,易于切换。
- 缺点:
- 抽象层较高,大概难以实现一些细粒度的自定义配置。
- 学习曲线较陡,需明白 Binder 和 Channel 的概念。
3. 使用 Spring for Apache Kafka Reactive(基于 Reactor)
对于需要响应式编程的应用,可以使用基于 Reactor 的 Spring Kafka Reactive 举行集成,实现非阻塞的消息处理。
步骤一:添加 Maven 依赖
目前,Spring Kafka 本身并未直接提供响应式支持,但可以结合 Project Reactor Kafka 使用。
引入 Reactor Kafka 依赖:
- <dependency>
- <groupId>io.projectreactor.kafka</groupId>
- <artifactId>reactor-kafka</artifactId>
- <version>1.3.11</version>
- </dependency>
复制代码 步骤二:配置 application.yml
- kafka:
- bootstrap-servers: worker1:9092,worker2:9092,worker3:9092
- producer:
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer:
- group-id: myReactiveGroup
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- auto-offset-reset: earliest
复制代码 步骤三:编写响应式消息生产者
使用 SenderOptions 和 KafkaSender:
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Service;
- import reactor.kafka.sender.KafkaSender;
- import reactor.kafka.sender.SenderOptions;
- import reactor.kafka.sender.SenderRecord;
- import reactor.core.publisher.Mono;
- import java.util.HashMap;
- import java.util.Map;
- @Service
- public class ReactiveProducerService {
- @Value("${kafka.bootstrap-servers}")
- private String bootstrapServers;
- @Bean
- public SenderOptions<String, String> senderOptions() {
- Map<String, Object> props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return SenderOptions.create(props);
- }
- @Bean
- public KafkaSender<String, String> kafkaSender(SenderOptions<String, String> senderOptions) {
- return KafkaSender.create(senderOptions);
- }
- public Mono<Void> sendMessage(String topic, String key, String value) {
- SenderRecord<String, String, Integer> record = SenderRecord.create(
- new org.apache.kafka.clients.producer.ProducerRecord<>(topic, key, value),
- 1
- );
- return kafkaSender(senderOptions())
- .send(Mono.just(record))
- .then();
- }
- }
复制代码 步骤四:编写响应式消息消费者
使用 ReceiverOptions 和 KafkaReceiver:
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Service;
- import reactor.core.publisher.Flux;
- import reactor.kafka.receiver.KafkaReceiver;
- import reactor.kafka.receiver.ReceiverOptions;
- import reactor.kafka.receiver.ReceiverRecord;
- import java.util.HashMap;
- import java.util.Map;
- @Service
- public class ReactiveConsumerService {
- @Value("${kafka.bootstrap-servers}")
- private String bootstrapServers;
- @Value("${kafka.consumer.group-id}")
- private String groupId;
- @Bean
- public ReceiverOptions<String, String> receiverOptions() {
- Map<String, Object> props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props);
- return receiverOptions.subscription(java.util.Collections.singleton("topic1"));
- }
- @Bean
- public Flux<ReceiverRecord<String, String>> kafkaFlux(ReceiverOptions<String, String> receiverOptions) {
- return KafkaReceiver.create(receiverOptions).receive();
- }
- public void consumeMessages() {
- kafkaFlux(receiverOptions())
- .doOnNext(record -> {
- System.out.println("Received: " + record.value());
- record.receiverOffset().acknowledge();
- })
- .subscribe();
- }
- }
复制代码 优缺点
- 长处:
- 支持响应式编程模子,适用于高并发和非阻塞场景。
- 更高的资源使用率和吞吐量。
- 缺点:
- 相较于传统阻塞式,开发复杂度更高。
- 需要明白 Reactor 和响应式编程的基本概念。
4. 手动配置 Producer 和 Consumer Bean
对于需要更高自定义配置的应用,可以手动配置 ProducerFactory, ConsumerFactory, KafkaTemplate 和 ConcurrentKafkaListenerContainerFactory 等 Bean。
步骤一:添加 Maven 依赖
在 pom.xml 中引入 spring-kafka 依赖:
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
复制代码 步骤二:编写 Kafka 配置类
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.core.*;
- import java.util.HashMap;
- import java.util.Map;
- @Configuration
- @EnableKafka
- public class KafkaManualConfig {
- @Value("${kafka.bootstrap-servers}")
- private String bootstrapServers;
- @Value("${kafka.consumer.group-id}")
- private String groupId;
- // Producer 配置
- @Bean
- public ProducerFactory<String, String> producerFactory() {
- Map<String, Object> configProps = new HashMap<>();
- configProps.put(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
- bootstrapServers);
- configProps.put(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- StringSerializer.class);
- configProps.put(
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- StringSerializer.class);
- // 其他自定义配置
- return new DefaultKafkaProducerFactory<>(configProps);
- }
- @Bean
- public KafkaTemplate<String, String> kafkaTemplate() {
- return new KafkaTemplate<>(producerFactory());
- }
- // Consumer 配置
- @Bean
- public ConsumerFactory<String, String> consumerFactory() {
- Map<String, Object> props = new HashMap<>();
- props.put(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- bootstrapServers);
- props.put(
- ConsumerConfig.GROUP_ID_CONFIG,
- groupId);
- props.put(
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class);
- props.put(
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class);
- // 其他自定义配置
- return new DefaultKafkaConsumerFactory<>(props);
- }
- // KafkaListenerContainerFactory
- @Bean
- public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory =
- new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- // 其他自定义配置,如并发数、批量消费等
- return factory;
- }
- }
复制代码 步骤三:编写消息生产者和消费者
Producer 示例:
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Service;
- @Service
- public class ManualProducerService {
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- private static final String TOPIC = "topic1";
- public void sendMessage(String message) {
- kafkaTemplate.send(TOPIC, message);
- }
- }
复制代码 Consumer 示例:
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
- @Component
- public class ManualConsumerService {
- @KafkaListener(topics = "topic1", groupId = "myGroup")
- public void listen(String message) {
- System.out.println("Received message: " + message);
- }
- }
复制代码 优缺点
- 长处:
- 高度自定义,适用于复杂配置需求。
- 可以机动配置多个 KafkaTemplate 或 KafkaListenerContainerFactory,顺应多种场景。
- 缺点:
- 配置较为繁琐,代码量增加。
- 需要深入明白 Spring Kafka 的配置与使用。
5. 使用 Spring Integration Kafka
Spring Integration 提供了对 Kafka 的集成支持,适用于需要集成多种消息渠道和复杂消息路由的应用。
步骤一:添加 Maven 依赖
在 pom.xml 中引入 spring-integration-kafka 依赖:
- <dependency>
- <groupId>org.springframework.integration</groupId>
- <artifactId>spring-integration-kafka</artifactId>
- <version>3.3.5.RELEASE</version>
- </dependency>
复制代码 步骤二:编写 Kafka Integration 配置
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.integration.annotation.ServiceActivator;
- import org.springframework.integration.channel.DirectChannel;
- import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
- import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
- import org.springframework.kafka.core.*;
- import org.springframework.messaging.MessageChannel;
- import org.springframework.messaging.MessageHandler;
- import java.util.HashMap;
- import java.util.Map;
- @Configuration
- public class SpringIntegrationKafkaConfig {
- @Bean
- public ProducerFactory<String, String> producerFactory() {
- Map<String, Object> props = new HashMap<>();
- props.put(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "worker1:9092,worker2:9092,worker3:9092");
- props.put(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- StringSerializer.class);
- props.put(
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- StringSerializer.class);
- return new DefaultKafkaProducerFactory<>(props);
- }
- @Bean
- public KafkaTemplate<String, String> kafkaTemplate() {
- return new KafkaTemplate<>(producerFactory());
- }
- // 消费者工厂
- @Bean
- public ConsumerFactory<String, String> consumerFactory() {
- Map<String, Object> props = new HashMap<>();
- props.put(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- "worker1:9092,worker2:9092,worker3:9092");
- props.put(
- ConsumerConfig.GROUP_ID_CONFIG,
- "myGroup");
- props.put(
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class);
- props.put(
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class);
- return new DefaultKafkaConsumerFactory<>(props);
- }
- // 输入通道
- @Bean
- public MessageChannel inputChannel() {
- return new DirectChannel();
- }
- // 消费者适配器
- @Bean
- public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
- KafkaMessageDrivenChannelAdapter<String, String> adapter =
- new KafkaMessageDrivenChannelAdapter<>(consumerFactory(), "topic1");
- adapter.setOutputChannel(inputChannel());
- return adapter;
- }
- // 消费者处理器
- @Bean
- @ServiceActivator(inputChannel = "inputChannel")
- public MessageHandler messageHandler() {
- return message -> {
- String payload = (String) message.getPayload();
- System.out.println("Received message: " + payload);
- };
- }
- // 输出通道
- @Bean
- public MessageChannel outputChannel() {
- return new DirectChannel();
- }
- // 生产者处理器
- @Bean
- @ServiceActivator(inputChannel = "outputChannel")
- public MessageHandler producerMessageHandler(KafkaTemplate<String, String> kafkaTemplate) {
- KafkaProducerMessageHandler<String, String> handler =
- new KafkaProducerMessageHandler<>(kafkaTemplate);
- handler.setTopicExpressionString("'topic1'");
- return handler;
- }
- }
复制代码 步骤三:发送消息到输出通道
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.messaging.MessageChannel;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Service;
- @Service
- public class IntegrationProducerService {
- @Autowired
- private MessageChannel outputChannel;
- public void sendMessage(String message) {
- outputChannel.send(MessageBuilder.withPayload(message).build());
- }
- }
复制代码 优缺点
- 长处:
- 强盛的消息路由和转换功能,适用于复杂集成场景。
- 可以与 Spring Integration 的其他模块无缝协作。
- 缺点:
- 配置复杂,学习成本较高。
- 对于简单的 Kafka 集成场景,大概显得过于痴肥。
6. 在测试中使用嵌入式 Kafka
在集成测试中,使用嵌入式 Kafka 可以避免依赖外部 Kafka 集群,提拔测试效率与稳固性。
步骤一:添加 Maven 依赖
在 pom.xml 中引入 spring-kafka-test 依赖:
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka-test</artifactId>
- <scope>test</scope>
- </dependency>
复制代码 步骤二:编写测试类
使用 @EmbeddedKafka 注解启动嵌入式 Kafka:
- import org.apache.kafka.clients.consumer.Consumer;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.junit.jupiter.api.AfterAll;
- import org.junit.jupiter.api.BeforeAll;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.test.EmbeddedKafkaBroker;
- import org.springframework.kafka.test.context.EmbeddedKafka;
- import org.springframework.kafka.test.utils.KafkaTestUtils;
- import java.util.Map;
- @SpringBootTest
- @EmbeddedKafka(partitions = 1, topics = { "topic1" }, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
- public class KafkaIntegrationTest {
- @Autowired
- private EmbeddedKafkaBroker embeddedKafkaBroker;
- private static Consumer<String, String> consumer;
- @BeforeAll
- public static void setUp(@Autowired EmbeddedKafkaBroker embeddedKafkaBroker) {
- Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
- consumer = consumerFactory.createConsumer();
- embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "topic1");
- }
- @AfterAll
- public static void tearDown() {
- if (consumer != null) {
- consumer.close();
- }
- }
- @Test
- public void testSendAndReceive() {
- // 发送消息
- // 假设有一个 ProducerService 可以发送消息
- // producerService.sendMessage("Hello, Kafka!");
- // 接收消息
- // Consumer Record 验证逻辑
- // 可以使用 KafkaTestUtils 来接收消息并断言
- }
- }
复制代码 优缺点
- 长处:
- 不依赖外部 Kafka 集群,得当 CI/CD 情况。
- 提拔测试的可重复性与稳固性。
- 缺点:
- 嵌入式 Kafka 启动较慢,大概影响测试速率。
- 需要额外配置,测试代码复杂度增加。
总结
在 Spring Boot 中集成 Kafka 有多种方式,每种方式适用于不同的应用场景和需求:
- Spring Kafka (KafkaTemplate 和 @KafkaListener)
适用于大多数常规应用,简单易用,与 Spring 生态体系无缝集成。
- Spring Cloud Stream 与 Kafka Binder
适用于微服务架构,需处理复杂消息路由与多中间件支持的场景。
- Spring for Apache Kafka Reactive
适用于需要响应式编程模子、高并发和非阻塞消息处理的应用。
- 手动配置 Producer 和 Consumer Bean
适用于需要高度自定义 Kafka 配置和行为的应用。
- Spring Integration Kafka
适用于复杂集成场景,需要与其他消息渠道和体系协作的应用。
- 嵌入式 Kafka 在测试中使用
适用于编写集成测试,提拔测试效率和稳固性。
根据项目的具体需求,选择最合适的集成方式可以或许有效提拔开发效率,确保应用的稳固性与可扩展性。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |