马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
用Flink+Doris来开发实时数仓,首要办理是怎样接入kafka实时流,下面是参考Doris官方文档和代码,在本身项目开发的实践中总结,包罗一些容易踩坑的细节。
Routine Load方法
如果Doris是2.1以上,不必要复杂的数据转换的,发起利用Doris自带的Routine Load,实测利用方便,性能高。
接入kafka实时数据
Doris 可以通过 Routine Load 导入方式持续消费 Kafka Topic 中的数据。在提交 Routine Load 作业后,Doris 会持续运行该导入作业,实时生成导入任务不断消费 Kakfa 集群中指定 Topic 中的消息。
Routine Load 是一个流式导入作业,支持 Exactly-Once 语义,保证数据不丢不重。下面示例怎样通过拉入kafka数据(json格式):
首先是创建必要导入的表:
- CREATE TABLE testdb.test_routineload_tbl(
- user_id BIGINT NOT NULL COMMENT "user id",
- name VARCHAR(20) COMMENT "name",
- age INT COMMENT "age"
- )
- DUPLICATE KEY(user_id)
- DISTRIBUTED BY HASH(user_id) BUCKETS 10;
复制代码 如果要接入的数据是主键不重复的,可以设置为Unique模型,如许可以删除或修改。
创建 Routine Load 导入作业
在 Doris 中,利用 CREATE ROUTINE LOAD 命令,创建导入作业
- CREATE ROUTINE LOAD testdb.example_routine_load_json ON test_routineload_tbl
- COLUMNS(user_id,name,age)
- PROPERTIES(
- "format"="json",
- "max_error_number" = "999999999999",
- "strip_outer_array"="true",
- "jsonpaths"="["$.user_id","$.name","$.age"]"
- )
- FROM KAFKA(
- "kafka_broker_list" = "192.168.88.62:9092",
- "kafka_topic" = "test-routine-load-json",
- "property.kafka_default_offsets" = "OFFSET_BEGINNING"
- );
复制代码 踩坑的问题细节
max_error_number | 采样窗口内,答应的最大错误行数。必须大于便是 0。默认是 0,即不答应有错误行。采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被停息,必要人工介入检查数据质量问题,通过 SHOW ROUTINE LOAD 命令中 ErrorLogUrls 检查数据的质量问题。被 where 条件过滤掉的行不算错误行。 | strip_outer_array | 当导入数据格式为 json 时,strip_outer_array 为 true 表示 JSON 数据以数组的情势展现,数据中的每一个元素将被视为一行数据。默认值是 false。通常情况下,Kafka 中的 JSON 数据可能以数组情势表示,即在最外层中包罗中括号[],此时,可以指定 "strip_outer_array" = "true",以数组模式消费 Topic 中的数据。如以下数据会被剖析成两行:[{"user_id":1,"name":"Emily","age":25},{"user_id":2,"name":"Benjamin","age":35}] |
- 我公司的kafka,是需要设置"strip_outer_array"="true",根据实际来调整。
- 有些kafka的数据有脏数据,可以用max_error_number来过滤。或者考虑用脚本来检测:
复制代码- import pymysql #导入 pymysql
- import requests,json
- #打开数据库连接
- db= pymysql.connect(host="host",user="user",
- password="passwd",db="database",port=port)
- # 使用cursor()方法获取操作游标
- cur = db.cursor()
- #1.查询操作
- # 编写sql 查询语句
- sql = "show routine load"
- cur.execute(sql) #执行sql语句
- results = cur.fetchall() #获取查询的所有记录
- for row in results :
- name = row[1]
- state = row[7]
- if state != 'RUNNING':
- err_log_urls = row[16]
- reason_state_changed = row[15]
- msg = "doris 数据导入任务异常:\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n即将自动恢复,请检查错误信息" % (name, state,
- reason_state_changed, err_log_urls)
- payload_message = {
- "msg_type": "text",
- "content": {
- "text": msg
- }
- }
- url = 'lark 报警url'
- s = json.dumps(payload_message)
- r = requests.post(url, data=s)
- cur.execute("resume routine load for " + name)
- cur.close()
- db.close()
复制代码- 通过 SHOW ROUTINE LOAD 来查看结果
复制代码- mysql> SHOW ROUTINE LOAD FOR testdb.example_routine_load
复制代码- 我是没按官方文档加\G,加了\G报错。
- 如果导入数据有问题,可以通过上面的命令,查看下面这2个:
复制代码ErrorLogUrls | 被过滤的质量不合格的数据的检察所在 | OtherMsg | 其他错误信息 |
- 具体可以参考:Routine Load - Apache Doris
复制代码
Flink Doris Connector方法
Flink Doris Connector 可以支持通过 Flink 操纵(读取、插入、修改、删除)Doris 中存储的数据。
这种方法适合Doris2.1以下的,大概必要对kafka数据举行复杂过滤或数据转换。Routine Load方法也是可以简单过滤的,但觉得还是不可以或许高度定制:
- CREATE ROUTINE LOAD demo.kafka_job04 ON routine_test04
- COLUMNS TERMINATED BY ",",
- WHERE id >= 3
- FROM KAFKA
- (
- "kafka_broker_list" = "10.16.10.6:9092",
- "kafka_topic" = "routineLoad04",
- "property.kafka_default_offsets" = "OFFSET_BEGINNING"
- );
复制代码 由于公司项目flink是很老版本1.11,如果是新版本flink,请参考官方文档举行调整:Flink Doris Connector - Apache Doris
完整示例
pom依赖如下:
- <properties>
- <scala.version>2.12.10</scala.version>
- <scala.binary.version>2.12</scala.binary.version>
- <java.version>1.8</java.version>
- <flink.version>1.12.2</flink.version>
- <fastjson.version>1.2.83</fastjson.version>
- <hadoop.version>2.8.3</hadoop.version>
- <scope.mode>compile</scope.mode>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.12</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>1.3.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>${fastjson.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-hadoop-compatibility_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-hadoop-2-uber</artifactId>
- <version>2.6.5-10.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.doris</groupId>
- <artifactId>flink-doris-connector-1.14_2.12</artifactId>
- <version>1.0.3</version>
- </dependency>
- <!--After adding the following two dependencies, Flink's log will appear-->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.25</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- <version>1.7.25</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.5.13</version>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- <version>4.4.12</version>
- </dependency>
- </dependencies>
复制代码
FlinkKafka2Doris :
- public class FlinkKafka2Doris {
- //kafka address
- private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";
- //kafka groupName
- private static final String groupName = "test_flink_doris_group";
- //kafka topicName
- private static final String topicName = "test_flink_doris";
- //doris ip port
- private static final String hostPort = "xxx:8030";
- //doris dbName
- private static final String dbName = "test1";
- //doris tbName
- private static final String tbName = "doris_test_source_2";
- //doris userName
- private static final String userName = "root";
- //doris password
- private static final String password = "";
- //doris columns
- private static final String columns = "name,age,price,sale";
- //json format
- private static final String jsonFormat = "["$.name","$.age","$.price","$.sale"]";
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServer);
- props.put("group.id", groupName);
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("auto.offset.reset", "earliest");
- props.put("max.poll.records", "10000");
- StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- blinkStreamEnv.enableCheckpointing(10000);
- blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
- new SimpleStringSchema(),
- props);
- DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);
- //在这里进行数据过滤或转换
- DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
- dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));
- blinkStreamEnv.execute("flink kafka to doris");
- }
- }
复制代码
DorisStreamLoad:
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.Serializable;
- import java.io.IOException;
- import java.io.BufferedOutputStream;
- import java.io.InputStream;
- import java.io.BufferedReader;
- import java.io.InputStreamReader;
- import java.net.HttpURLConnection;
- import java.net.URL;
- import java.nio.charset.StandardCharsets;
- import java.util.Base64;
- import java.util.Calendar;
- import java.util.UUID;
- /**
- * doris streamLoad
- */
- public class DorisStreamLoad implements Serializable {
- private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
- private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
- private String hostPort;
- private String db;
- private String tbl;
- private String user;
- private String passwd;
- private String loadUrlStr;
- private String authEncoding;
- public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
- this.hostPort = hostPort;
- this.db = db;
- this.tbl = tbl;
- this.user = user;
- this.passwd = passwd;
- this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
- this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
- }
- private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
- URL url = new URL(urlStr);
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setInstanceFollowRedirects(false);
- conn.setRequestMethod("PUT");
- conn.setRequestProperty("Authorization", "Basic " + authEncoding);
- conn.addRequestProperty("Expect", "100-continue");
- conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
- conn.addRequestProperty("label", label);
- conn.addRequestProperty("max_filter_ratio", "0");
- conn.addRequestProperty("strict_mode", "true");
- conn.addRequestProperty("columns", columns);
- conn.addRequestProperty("format", "json");
- conn.addRequestProperty("jsonpaths", jsonformat);
- conn.addRequestProperty("strip_outer_array", "true");
- conn.setDoOutput(true);
- conn.setDoInput(true);
- return conn;
- }
- public static class LoadResponse {
- public int status;
- public String respMsg;
- public String respContent;
- public LoadResponse(int status, String respMsg, String respContent) {
- this.status = status;
- this.respMsg = respMsg;
- this.respContent = respContent;
- }
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("status: ").append(status);
- sb.append(", resp msg: ").append(respMsg);
- sb.append(", resp content: ").append(respContent);
- return sb.toString();
- }
- }
- public LoadResponse loadBatch(String data, String columns, String jsonformat) {
- Calendar calendar = Calendar.getInstance();
- String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
- calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
- calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
- UUID.randomUUID().toString().replaceAll("-", ""));
- HttpURLConnection feConn = null;
- HttpURLConnection beConn = null;
- try {
- // build request and send to fe
- feConn = getConnection(loadUrlStr, label, columns, jsonformat);
- int status = feConn.getResponseCode();
- // fe send back http response code TEMPORARY_REDIRECT 307 and new be location
- if (status != 307) {
- throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
- }
- String location = feConn.getHeaderField("Location");
- if (location == null) {
- throw new Exception("redirect location is null");
- }
- // build request and send to new be location
- beConn = getConnection(location, label, columns, jsonformat);
- // send data to be
- BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
- bos.write(data.getBytes());
- bos.close();
- // get respond
- status = beConn.getResponseCode();
- String respMsg = beConn.getResponseMessage();
- InputStream stream = (InputStream) beConn.getContent();
- BufferedReader br = new BufferedReader(new InputStreamReader(stream));
- StringBuilder response = new StringBuilder();
- String line;
- while ((line = br.readLine()) != null) {
- response.append(line);
- }
- return new LoadResponse(status, respMsg, response.toString());
- } catch (Exception e) {
- e.printStackTrace();
- String err = "failed to load audit via AuditLoader plugin with label: " + label;
- log.warn(err, e);
- return new LoadResponse(-1, e.getMessage(), err);
- } finally {
- if (feConn != null) {
- feConn.disconnect();
- }
- if (beConn != null) {
- beConn.disconnect();
- }
- }
- }
- }
复制代码
DorisSink:
- import com.alibaba.fastjson.JSON;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
- /**
- * Custom doris sink
- */
- public class DorisSink extends RichSinkFunction<String> {
- private static final Logger log = LoggerFactory.getLogger(DorisSink.class);
- private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
- private DorisStreamLoad dorisStreamLoad;
- private String columns;
- private String jsonFormat;
- public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
- this.dorisStreamLoad = dorisStreamLoad;
- this.columns = columns;
- this.jsonFormat = jsonFormat;
- }
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- }
- /**
- * Determine whether StreamLoad is successful
- *
- * @param respContent streamLoad returns the entity
- * @return
- */
- public static Boolean checkStreamLoadStatus(RespContent respContent) {
- if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
- && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {
- return true;
- } else {
- return false;
- }
- }
- @Override
- public void invoke(String value, Context context) throws Exception {
- DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
- if (loadResponse != null && loadResponse.status == 200) {
- RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
- if (!checkStreamLoadStatus(respContent)) {
- log.error("Stream Load fail{}:", loadResponse);
- }
- } else {
- log.error("Stream Load Request failed:{}", loadResponse);
- }
- }
- }
复制代码
RespContent:
- import java.io.Serializable;
- /**
- * Entity returned by streamLoad
- */
- public class RespContent implements Serializable {
- private static final long serialVersionUID = 1L;
- /**
- * Imported transaction ID. The user may not perceive it.
- */
- private int TxnId;
- /**
- * Import Label. Specified by the user or automatically generated by the system.
- */
- private String Label;
- /**
- * Import complete status.
- * "Success": Indicates that the import was successful.
- * "Publish Timeout": This status also indicates that the import has been completed, but the data may be visible with a delay, and there is no need to retry.
- * "Label Already Exists": The Label is duplicated, and the Label needs to be replaced.
- */
- private String Status;
- /**
- * The status of the import job corresponding to the existing Label.
- * This field will only be displayed when the Status is "Label Already Exists".
- * The user can know the status of the import job corresponding to the existing Label through this status.
- * "RUNNING" means that the job is still executing, and "FINISHED" means that the job is successful.
- */
- private String ExistingJobStatus;
- /**
- * Import error information
- */
- private String Message;
- /**
- * Import the total number of processed rows
- */
- private long NumberTotalRows;
- /**
- * The number of rows successfully imported.
- */
- private long NumberLoadedRows;
- /**
- * Number of rows with unqualified data quality。
- */
- private int NumberFilteredRows;
- /**
- * The number of rows filtered by the where condition
- */
- private int NumberUnselectedRows;
- /**
- * Number of bytes imported。
- */
- private long LoadBytes;
- /**
- * Import completion time. The unit is milliseconds.
- */
- private int LoadTimeMs;
- /**
- * The time it takes to request Fe to start a transaction, in milliseconds
- */
- private int BeginTxnTimeMs;
- /**
- * The time it takes to request Fe to obtain the import data execution plan, in milliseconds
- */
- private int StreamLoadPutTimeMs;
- /**
- * The time spent reading data, in milliseconds
- */
- private int ReadDataTimeMs;
- /**
- * Time to perform a data write operation takes milliseconds。
- */
- private int WriteDataTimeMs;
- /**
- * The time taken to submit and publish the transaction request to Fe, in milliseconds
- */
- private int CommitAndPublishTimeMs;
- /**
- * If there is a data quality problem, check the specific error line by visiting this URL
- */
- private String ErrorURL;
- public int getTxnId() {
- return TxnId;
- }
- public void setTxnId(int txnId) {
- TxnId = txnId;
- }
- public String getLabel() {
- return Label;
- }
- public void setLabel(String label) {
- Label = label;
- }
- public String getStatus() {
- return Status;
- }
- public void setStatus(String status) {
- Status = status;
- }
- public String getExistingJobStatus() {
- return ExistingJobStatus;
- }
- public void setExistingJobStatus(String existingJobStatus) {
- ExistingJobStatus = existingJobStatus;
- }
- public String getMessage() {
- return Message;
- }
- public void setMessage(String message) {
- Message = message;
- }
- public long getNumberTotalRows() {
- return NumberTotalRows;
- }
- public void setNumberTotalRows(long numberTotalRows) {
- NumberTotalRows = numberTotalRows;
- }
- public long getNumberLoadedRows() {
- return NumberLoadedRows;
- }
- public void setNumberLoadedRows(long numberLoadedRows) {
- NumberLoadedRows = numberLoadedRows;
- }
- public int getNumberFilteredRows() {
- return NumberFilteredRows;
- }
- public void setNumberFilteredRows(int numberFilteredRows) {
- NumberFilteredRows = numberFilteredRows;
- }
- public int getNumberUnselectedRows() {
- return NumberUnselectedRows;
- }
- public void setNumberUnselectedRows(int numberUnselectedRows) {
- NumberUnselectedRows = numberUnselectedRows;
- }
- public long getLoadBytes() {
- return LoadBytes;
- }
- public void setLoadBytes(long loadBytes) {
- LoadBytes = loadBytes;
- }
- public int getLoadTimeMs() {
- return LoadTimeMs;
- }
- public void setLoadTimeMs(int loadTimeMs) {
- LoadTimeMs = loadTimeMs;
- }
- public int getBeginTxnTimeMs() {
- return BeginTxnTimeMs;
- }
- public void setBeginTxnTimeMs(int beginTxnTimeMs) {
- BeginTxnTimeMs = beginTxnTimeMs;
- }
- public int getStreamLoadPutTimeMs() {
- return StreamLoadPutTimeMs;
- }
- public void setStreamLoadPutTimeMs(int streamLoadPutTimeMs) {
- StreamLoadPutTimeMs = streamLoadPutTimeMs;
- }
- public int getReadDataTimeMs() {
- return ReadDataTimeMs;
- }
- public void setReadDataTimeMs(int readDataTimeMs) {
- ReadDataTimeMs = readDataTimeMs;
- }
- public int getWriteDataTimeMs() {
- return WriteDataTimeMs;
- }
- public void setWriteDataTimeMs(int writeDataTimeMs) {
- WriteDataTimeMs = writeDataTimeMs;
- }
- public int getCommitAndPublishTimeMs() {
- return CommitAndPublishTimeMs;
- }
- public void setCommitAndPublishTimeMs(int commitAndPublishTimeMs) {
- CommitAndPublishTimeMs = commitAndPublishTimeMs;
- }
- public String getErrorURL() {
- return ErrorURL;
- }
- public void setErrorURL(String errorURL) {
- ErrorURL = errorURL;
- }
- @Override
- public String toString() {
- return "RespContent{" +
- "TxnId=" + TxnId +
- ", Label='" + Label + '\'' +
- ", Status='" + Status + '\'' +
- ", ExistingJobStatus='" + ExistingJobStatus + '\'' +
- ", Message='" + Message + '\'' +
- ", NumberTotalRows=" + NumberTotalRows +
- ", NumberLoadedRows=" + NumberLoadedRows +
- ", NumberFilteredRows=" + NumberFilteredRows +
- ", NumberUnselectedRows=" + NumberUnselectedRows +
- ", LoadBytes=" + LoadBytes +
- ", LoadTimeMs=" + LoadTimeMs +
- ", BeginTxnTimeMs=" + BeginTxnTimeMs +
- ", StreamLoadPutTimeMs=" + StreamLoadPutTimeMs +
- ", ReadDataTimeMs=" + ReadDataTimeMs +
- ", WriteDataTimeMs=" + WriteDataTimeMs +
- ", CommitAndPublishTimeMs=" + CommitAndPublishTimeMs +
- ", ErrorURL='" + ErrorURL + '\'' +
- '}';
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |