IT评测·应用市场-qidao123.com技术社区

标题: Flink使用详解 [打印本页]

作者: tsx81429    时间: 5 天前
标题: Flink使用详解
本文档使用 flink-1.13.1 版本依赖
一、Flink 常用流处置处罚 API

getExecutionEnvironment

创建实行环境
  1. StreamExecutionEnvironment environment =
  2.                 StreamExecutionEnvironment.getExecutionEnvironment();
复制代码
setParallelism

为实行环境设置并行度
  1. environment.setParallelism(1);
复制代码
addSource

给实行环境指定数据来源
  1. // 设置 RabbitMQ 连接配置
  2. RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  3.         .setHost("192.168.117.4")
  4.         .setPort(5672)
  5.         .setVirtualHost("/")
  6.         .setUserName("mix")
  7.         .setPassword("jovision")
  8.         .build();
  9. // 创建侧输出流
  10. OutputTag<TestDto> maxTestStream = new OutputTag<TestDto>("maxTest"){};
  11. // 创建 RabbitMQ 数据源,获取名为demo.in队列中的消息
  12. DataStream<String> rabbitMQStream = environment.addSource(new RMQSource<>(
  13.         connectionConfig,
  14.         "demo.in",
  15.         true,
  16.         new SimpleStringSchema()
  17. ));
复制代码
map

用于将吸收到的流转化成目的数据类型
  1. // 将所有接受的数据分出两个相同的流对象,侧输出流用于测试分组能力
  2. SingleOutputStreamOperator<TestDto> originStream = rabbitMQStream.map(new MapFunction<String, TestDto>() {
  3.     @Override
  4.     public TestDto map(String s) throws Exception {
  5.         TestDto testDto = JSON.parseObject(s, TestDto.class);
  6.         return testDto;
  7.     }
  8. });
复制代码
OutputTag

侧输出流,可拷贝来源流并进行其他逻辑操作
  1. // 创建侧输出流
  2. OutputTag<TestDto> maxTestStream = new OutputTag<TestDto>("maxTest"){};
  3. // 将所有接受的数据分出两个流对象,侧输出流用于测试分组能力
  4. SingleOutputStreamOperator<TestDto> originStream = originStream.process(new ProcessFunction<TestDto, TestDto>() {
  5.     @Override
  6.     public void processElement(TestDto testDto, ProcessFunction<TestDto, TestDto>.Context context, Collector<TestDto> collector) throws Exception {
  7.         // 通过age大小分流
  8.         if(testDto.getAge() > 10){
  9.             // 将流数据塞入上面创建的maxTestStream侧输出流
  10.             context.output(maxTestStream, testDto);
  11.         }else{
  12.             // 将流数据依然放入当前输出流
  13.             collector.collect(testDto);
  14.         }
  15.     }
  16. });
复制代码
print

打印流信息
  1. originStream.print("sumStream");
复制代码
filter

用于过滤吸收的流
  1. // 过滤接收的数据,只接收age=11的流数据放入sinkStream流对象
  2. SingleOutputStreamOperator<String> sinkStream = originStream.filter(new FilterFunction<TestDto>() {
  3.     @Override
  4.     public boolean filter(TestDto testDto) throws Exception {
  5.         return testDto.getAge() == 11;
  6.     }
  7. }).map(new MapFunction<TestDto, String>() {
  8.     // 转成String
  9.     @Override
  10.     public String map(TestDto testDto) throws Exception {
  11.         return JSON.toJSONString(testDto);
  12.     }
  13. });
复制代码
getSideOutput

获取测输出流数据
  1. // 上面是在originStream流对象将流数据分给maxTestStream侧输出流的,所以需要如下调用
  2. DataStream<TestDto> sideOutput = originStream.getSideOutput(maxTestStream);
复制代码
keyBy

将获取到的流对指定属性分组
  1. // 按照对象的name进行分组
  2. KeyedStream<TestDto, Tuple> keyedStream = sideOutput.keyBy("name");
复制代码
max

分组后返回指定属性最大的值(返回的指定属性最大的值是正确的,但是其中的其他属性大概不准确)
  1. DataStream<TestDto> maxStream = keyedStream.max("age");
复制代码
maxBy

分组后返回指定属性最大的值(返回的最大值与相应的其他字段内容都是准确的)
  1. DataStream<TestDto> maxByStream = keyedStream.maxBy("age");
复制代码
sum

分组后返回指定属性累加值
  1. DataStream<TestDto> sumStream = keyedStream.sum("age");
复制代码
reduce

分组后可操作新数据及上一次数据,可进行累加、比较等逻辑操作
  1. // 每次收到数据会按照分组信息分组,并累加上一次的age数据后存入reduceStream流对象,也可实现其他逻辑
  2. SingleOutputStreamOperator<TestDto> reduceStream = keyedStream.reduce(new ReduceFunction<TestDto>() {
  3.     @Override
  4.     public TestDto reduce(TestDto beforeDto, TestDto nowDto) throws Exception {
  5.         log.info("----------- reduce beforeDto:" + beforeDto);
  6.         log.info("----------- reduce nowDto:" + nowDto);
  7.         return beforeDto.setAge(beforeDto.getAge() + nowDto.getAge());
  8.     }
  9. });
复制代码
connect

可归并两个不同数据类型的流
  1. // 连接两个不同数据类型的流,类型为String的流对象sinkStream与类型为TestDto的流对象sideOutput合并
  2. ConnectedStreams<String, TestDto> connectStream = sinkStream.connect(sideOutput);
  3. DataStream<Object> connectObjectStream = connectStream.map(new CoMapFunction<String, TestDto, Object>() {
  4.     @Override
  5.     public Object map1(String s) throws Exception {
  6.         // 对第一个sinkStream流对象中的数据进行操作并返回
  7.         return new Tuple2<String,String>("stringStream",s);
  8.     }
  9.     @Override
  10.     public Object map2(TestDto testDto) throws Exception {
  11.         // 对第二个sideOutput流对象中的数据进行操作并返回
  12.         return new Tuple2<String,TestDto>("objectStream",testDto);
  13.     }
  14. });
复制代码
union

可归并两个相同数据类型的流
  1. // union 的使用(同一种数据类型的流才能用这个方法)
  2. DataStream<TestDto> unionStream = sideOutput.union(originStream);
复制代码
addSink

为流对象指定数据发送目的
  1. // 设置 RabbitMQ 连接配置
  2. RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  3.         .setHost("192.168.117.4")
  4.         .setPort(5672)
  5.         .setVirtualHost("/")
  6.         .setUserName("mix")
  7.         .setPassword("jovision")
  8.         .build();
  9. // 将数据发送至名为demo.out的队列
  10. sinkStream.addSink(new RMQSink<>(
  11.         connectionConfig,
  12.         "demo.out",
  13.         new SimpleStringSchema()
  14. ));
复制代码
execute

运行实行环境(在 addSink 之后实行)
  1. // 执行
  2. environment.execute();
复制代码
二、Flink 连接器(数据源、数据写入)

在 Flink 官网中,当前自带的且常用的连接器如下:

RabbitMQ 已经有自带的【数据源连接器】以及【数据写入连接器】。
Redis、jdbc 只有自带的【数据写入连接器】,所以数据源连接器必要实现【SourceFunction】接口进行自定义。
RabbitMQ 连接器

数据源连接器(可实时斲丧消息)

  1. RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  2.         .setHost("192.168.117.4")
  3.         .setPort(5672)
  4.         .setVirtualHost("/")
  5.         .setUserName("mix")
  6.         .setPassword("jovision")
  7.         .build();
  8. // 创建 RabbitMQ 数据源
  9. DataStream<String> rabbitMQStream = environment.addSource(new RMQSource<>(
  10.         connectionConfig,
  11.         "demo.in",
  12.         true,
  13.         new SimpleStringSchema()
  14. ));
复制代码
数据写入连接器

  1. rabbitMQStream.addSink(new RMQSink<>(
  2.         connectionConfig,
  3.         "demo.out",
  4.         new SimpleStringSchema()
  5. ));
复制代码
Kafka 连接器

数据源连接器(可实时斲丧消息)

  1. Properties sourceProperties = new Properties();
  2. sourceProperties.setProperty("bootstrap.servers", "192.168.2.198:9092");
  3. sourceProperties.setProperty("group.id", "test");
  4. // 创建Kafka消费者
  5. FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
  6.         "vse.unicom.payload.channel", // 源 topic
  7.         new SimpleStringSchema(),     // 数据序列化方式
  8.         sourceProperties              // Kafka消费者配置
  9. );
  10. DataStream<String> sourceStream = environment.addSource(consumer);
复制代码
数据写入连接器

  1. Properties sinkProperties = new Properties();
  2. sinkProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.198:9092");
  3. sinkProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");
  4. sinkProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
  5. sinkProperties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
  6. sinkProperties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
  7. sinkProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  8. sinkProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
  9. sinkProperties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "4194304");
  10. FlinkKafkaProducer<String> channelProducer = new FlinkKafkaProducer<String>(
  11.         "vse.unicom.payload.tenant", // 目标 topic
  12.         new SimpleStringSchema(),    // 序列化 schema
  13.         sinkProperties);
  14. sourceStream.addSink(channelProducer);
复制代码
Redis 连接器

数据源连接器(需自定义,单次实行,不可监听 redis 数据变更)

  1. public static class RedisSource implements SourceFunction<Tuple3<String,String,String>> {
  2.     @Override
  3.     public void run(SourceContext<Tuple3<String,String,String>> sourceContext) throws Exception {
  4.         // 连接 Redis
  5.         Jedis jedis = new Jedis("192.168.117.4", 6379);
  6.         // 设置密码(如果需要的话)
  7.         jedis.auth("redis@Abc-1234");
  8.         // 选择数据库
  9.         jedis.select(0);
  10.         for (int i = 0; i < 10; i++){
  11.             String key = "flink:test" + i;
  12.             String key2 = "flink:map" + i;
  13.             jedis.set( key, String.valueOf(new Random().nextInt()));
  14.             sourceContext.collect(new Tuple3<>(key,key2,jedis.get(key)));
  15.         }
  16.     }
  17.     @Override
  18.     public void cancel() {
  19.     }
  20. }
复制代码
数据写入连接器

  1. public static class CoustomRedisSink implements RedisMapper<Tuple2<String, String>> {
  2.     @Override
  3.     public RedisCommandDescription getCommandDescription() {
  4.         return new RedisCommandDescription(RedisCommand.SET);
  5.     }
  6.     @Override
  7.     public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
  8.         return stringStringTuple2._1();
  9.     }
  10.     @Override
  11.     public String getValueFromData(Tuple2<String, String> stringStringTuple2) {
  12.         return stringStringTuple2._2();
  13.     }
  14. }
复制代码
jdbc 连接器

数据源连接器(需自定义,单次实行,不可监听 mysql 数据变更)

  1. public static class MysqlSource implements SourceFunction<MeshVcDto> {
  2.     @Override
  3.     public void run(SourceContext<MeshVcDto> sourceContext) throws Exception {
  4.         // 定义数据库连接信息
  5.         String dbURL = "jdbc:mysql://192.168.117.4:3306/jvs_tdms";
  6.         String username = "root";
  7.         String password = "Jo123@My";
  8.         // 连接数据库
  9.         Connection conn = DriverManager.getConnection(dbURL, username, password);
  10.         // 执行查询
  11.         String query = "SELECT id, device_id, mesh_vc, add_time FROM udms_mesh_vc_log where verification_code is null";
  12.         Statement stmt = conn.createStatement();
  13.         ResultSet rs = stmt.executeQuery(query);
  14.         // 处理查询结果
  15.         while (rs.next()) {
  16.             Long id = rs.getLong("id");
  17.             String deviceId = rs.getString("device_id");
  18.             String meshVc = rs.getString("mesh_vc");
  19.             Date addTime = rs.getDate("add_time");
  20.             sourceContext.collect(new MeshVcDto().setId(id).setMeshVc(meshVc).setDeviceId(deviceId).setAddTime(addTime));
  21.         }
  22.     }
  23.     @Override
  24.     public void cancel() {
  25.     }
  26. }
复制代码
数据写入连接器

  1. dataStream.addSink(JdbcSink.sink("update udms_mesh_vc_log set device_id = ?, mesh_vc = ? where id = ?", new JdbcStatementBuilder<MeshVcDto>() {
  2.         @Override
  3.         public void accept(PreparedStatement preparedStatement, MeshVcDto meshVcDto) throws SQLException {
  4.             preparedStatement.setString(1, meshVcDto.getDeviceId());
  5.             preparedStatement.setString(2, meshVcDto.getMeshVc());
  6.             preparedStatement.setLong(3, meshVcDto.getId());
  7.         }
  8.     },new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  9.             .withDriverName("com.mysql.jdbc.Driver")
  10.             .withUrl("jdbc:mysql://192.168.117.4:3306/jvs_tdms")
  11.             .withUsername("root")
  12.             .withPassword("Jo123@My")
  13.             .build()));
  14. dataStream.addSink(JdbcSink.sink("insert into udms_mesh_vc_log (device_id, mesh_vc) values (?,?)", new JdbcStatementBuilder<MeshVcDto>() {
  15.         @Override
  16.         public void accept(PreparedStatement preparedStatement, MeshVcDto meshVcDto) throws SQLException {
  17.             preparedStatement.setString(1, meshVcDto.getDeviceId());
  18.             preparedStatement.setString(2, meshVcDto.getMeshVc());
  19.         }
  20.     },new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  21.                     .withDriverName("com.mysql.jdbc.Driver")
  22.                     .withUrl("jdbc:mysql://192.168.117.4:3306/jvs_tdms")
  23.                     .withUsername("root")
  24.                     .withPassword("Jo123@My")
  25.                     .build()));
复制代码
三、Flink 常用 Window

窗口计算,必要先分组,然后指定窗口类型,然后编写计算逻辑

CountWindow

滚动计数窗口

  1. // 只有当前这个name的消息收到第3次,才会计算前三次的age和,否则不会计算并输出至mq
  2. SingleOutputStreamOperator<String> map1 = rabbitMQStream.map(new MapFunction<String, TestDto>() {
  3.     @Override
  4.     public TestDto map(String s) throws Exception {
  5.         return JSON.parseObject(s, TestDto.class);
  6.     }
  7. }).keyBy("name")
  8. .countWindow(3)
  9. .sum("age").map(new MapFunction<TestDto, String>() {
  10.     @Override
  11.     public String map(TestDto testDto) throws Exception {
  12.         return JSON.toJSONString(testDto);
  13.     }
  14. });
复制代码
滑动计数窗口

  1. // 只有当前这个name的消息收到第2次,才会计算前5次的和,否则不会计算并输出至mq
  2. SingleOutputStreamOperator<String> map2 = rabbitMQStream.map(new MapFunction<String, TestDto>() {
  3.     @Override
  4.     public TestDto map(String s) throws Exception {
  5.         return JSON.parseObject(s, TestDto.class);
  6.     }
  7. }).keyBy("name")
  8. .countWindow(5, 2)
  9. .sum("age").map(new MapFunction<TestDto, String>() {
  10.     @Override
  11.     public String map(TestDto testDto) throws Exception {
  12.         return JSON.toJSONString(testDto);
  13.     }
  14. });
复制代码
TimeWindow

滚动时间窗口

   留意事项:
触发计算的动作与并行度巨细有关系,一个窗口只有吸收消息的数目达到并行度之后,才气触发上个窗口的计算并输出
比方:设置为 1 并行度时,[0,15)窗口已颠末去,但是不会立刻输出计算结果,来到[15,30)窗口,只有吸收了 1 条消息,才会触发[0,15)窗口的计算并输出
  1. // 迟到较晚的流数据存储至本侧输出流
  2. OutputTag<TestDto> testDtoOutputTag = new OutputTag<TestDto>("迟到了") {};
  3. // 只有当前这个name的消息一直收到15s,才会计算这15s时间段的age和,否则不会计算并输出至mq
  4. SingleOutputStreamOperator<TestDto> name = rabbitMQStream
  5.         // window截止后继续等2秒,将window范围内的时间加入到计算
  6.         .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TestDto>(Time.seconds(2)) {
  7.             @Override
  8.             public long extractTimestamp(TestDto testDto) {
  9.                 return testDto.getTimeStamp() * 1000;
  10.             }
  11.         })
  12.         .keyBy("name")
  13.         .window(TumblingEventTimeWindows.of(Time.seconds(15)))
  14.         // 迟到数据,在事件时间窗口内的消息,窗口结束后的5秒内收到的消息都被允许
  15.         .allowedLateness(Time.seconds(5))
  16.         // 迟到太多的数据会被放到侧输出流中进行补偿处理
  17.         .sideOutputLateData(testDtoOutputTag)
  18.         .aggregate(new AggregateFunction<TestDto, TestDto, TestDto>() {
  19.             // 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  20.             @Override
  21.             public TestDto createAccumulator() {
  22.                 return new TestDto().setAge(0);
  23.             }
  24.             // 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。
  25.             // 方法传入两个参数:当前新到的数据value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
  26.             @Override
  27.             public TestDto add(TestDto input, TestDto init) {
  28.                 return input.setAge(init.getAge() + input.getAge());
  29.             }
  30.             // 从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。
  31.             // 比如之前我们提到的计算平均值,就可以把sum和count作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
  32.             @Override
  33.             public TestDto getResult(TestDto testDto) {
  34.                 return testDto.setTimeStamp(new Date().getTime() / 1000).setDate(LocalDateTime.now().toString());
  35.             }
  36.             // 合并两个累加器,并将合并后的状态作为一个累加器返回。
  37.             // 这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
  38.             @Override
  39.             public TestDto merge(TestDto testDto, TestDto acc1) {
  40.                 return null;
  41.             }
  42.         }, new ProcessWindowFunction<TestDto, TestDto, Tuple, TimeWindow>() {
  43.             // 一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
  44.             @Override
  45.             public void process(Tuple tuple, ProcessWindowFunction<TestDto, TestDto, Tuple, TimeWindow>.Context context, Iterable<TestDto> iterable, Collector<TestDto> collector) throws Exception {
  46.                 iterable.forEach(item -> {
  47.                     SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  48.                     item.setWindowStartTime(context.window().getStart());
  49.                     item.setWindowEndTime(context.window().getEnd());
  50.                     log.info("窗口:[{}, {}) 当前时间:{}, item: {}", format.format(new Date(context.window().getStart())),
  51.                             format.format(new Date(context.window().getEnd())), format.format(new Date()), item);
  52.                     collector.collect(item);
  53.                 });
  54.             }
  55.         });
复制代码
四、Flink 摆设

服务启动

● 进入 flink 的 bin 目次,启动 start-cluster.sh
  1. ./start-cluster.sh
复制代码
● 进入 flink 的 conf 目次,在 flink-conf.yaml 文件中检察启动端口(rest.port 参数值)
  1. vi flink-conf.yaml
复制代码
● 输入服务器 ip 及端口

● 启动完成
页面操作使命

上传 jar 文件

将使用流处置处罚 API 实现的步伐打包为 jar 文件,并上传

上传乐成

配置启动类并启动使命


配置完之后点击【Submit】即可实行使命

制止使命



下令操作使命

进入 flink 的 bin 目次下
检察正在实行的使命列表

  1. ./flink list
复制代码
结果展示

取消使命

● 取消正在实行的指定使命
  1. ./flink cancel 【正在执行的任务的id值】-s 【保存点文件夹名】
  2. ./flink cancel 【正在执行的任务的id值】 (不会有保存点文件)
  3. ./flink cancel  -s 【正在执行的任务的id值】
  4. 例:
  5. ./flink cancel 0fa34b7daa017c510adb3692e55d4c96 -s 1234
  6. ./flink cancel 0fa34b7daa017c510adb3692e55d4c96 (不会有保存点文件)
  7. ./flink cancel -s 0fa34b7daa017c510adb3692e55d4c96
复制代码
结果展示

启动使命

● 无安全点文件启动使命
  1. ./flink run -p 【Parallelism】 -c 【EntryClass】 【已上传jar包的路径】 【Program Arguments】
  2. 例:./flink run -p 1 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties
复制代码
● 有安全点文件启动使命(前提是取消使命时有安全点文件保存)
  1. ./flink run -p 【Parallelism】 -s 【执行取消任务的安全点的目录】-c 【EntryClass】[--allowNonRestoredState] 【已上传jar包的路径】 【Program Arguments】
  2. 注: --allowNonRestoredState 非必填,可绕过保存点恢复的错误继续启动任务,绕过错误可能会丢失数据,可先不带此配置启动,报错后再加上执行启动命令也可
  3. 例:./flink run -p 1 -s file:/tmp/flink/flinkpoint/savepoints/savepoint-029d78-73d8367621d7 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties
复制代码
结果展示

启动使命时 jar 包路径可通过以下两种方式获取:
● 上传 jar 包至 linux 体系后直接使用
● 在页面上传文件,并通过以下步骤获取:
○ 在 flink 页面上传文件完成,检察如下内容

○ 在 linux 中检察上一步拿到的路径,该路径就是已上传的 jar 包的缓存文件

五、Flink 扩容

flink 扩容是对 TaskManager 数目的扩大,相对应将实行使命的并行度随之扩大。

flink 使命卡槽设置

进入 flink 的 conf 目次,在 flink-conf.yaml 文件中检察单个 TaskManager 的使命卡槽数目(taskmanager.numberOfTaskSlots 参数值):
  1. vi flink-conf.yaml
复制代码
flink 使命并行度设置

页面设置


下令设置

  1. ./flink run -p 【Parallelism】 -c 【EntryClass】 【已上传jar包的路径】 【Program Arguments】
  2. 例:./flink run -p 1 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4