Flink把kafa数据写入Doris的N种方法及对比。

打印 上一主题 下一主题

主题 971|帖子 971|积分 2913

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
用Flink+Doris来开发实时数仓,首要办理是怎样接入kafka实时流,下面是参考Doris官方文档和代码,在本身项目开发的实践中总结,包罗一些容易踩坑的细节。

Routine Load方法

如果Doris是2.1以上,不必要复杂的数据转换的,发起利用Doris自带的Routine Load,实测利用方便,性能高。
 接入kafka实时数据

Doris 可以通过 Routine Load 导入方式持续消费 Kafka Topic 中的数据。在提交 Routine Load 作业后,Doris 会持续运行该导入作业,实时生成导入任务不断消费 Kakfa 集群中指定 Topic 中的消息。
Routine Load 是一个流式导入作业,支持 Exactly-Once 语义,保证数据不丢不重。下面示例怎样通过拉入kafka数据(json格式):
首先是创建必要导入的表:
  1. CREATE TABLE testdb.test_routineload_tbl(
  2.     user_id            BIGINT       NOT NULL COMMENT "user id",
  3.     name               VARCHAR(20)           COMMENT "name",
  4.     age                INT                   COMMENT "age"
  5. )
  6. DUPLICATE KEY(user_id)
  7. DISTRIBUTED BY HASH(user_id) BUCKETS 10;
复制代码
如果要接入的数据是主键不重复的,可以设置为Unique模型,如许可以删除或修改。

创建 Routine Load 导入作业
在 Doris 中,利用 CREATE ROUTINE LOAD 命令,创建导入作业
  1. CREATE ROUTINE LOAD testdb.example_routine_load_json ON test_routineload_tbl
  2. COLUMNS(user_id,name,age)
  3. PROPERTIES(
  4.     "format"="json",
  5.     "max_error_number" = "999999999999",
  6.      "strip_outer_array"="true",
  7.     "jsonpaths"="["$.user_id","$.name","$.age"]"
  8. )
  9. FROM KAFKA(
  10.     "kafka_broker_list" = "192.168.88.62:9092",
  11.     "kafka_topic" = "test-routine-load-json",
  12.     "property.kafka_default_offsets" = "OFFSET_BEGINNING"
  13. );
复制代码
踩坑的问题细节 

max_error_number采样窗口内,答应的最大错误行数。必须大于便是 0。默认是 0,即不答应有错误行。采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被停息,必要人工介入检查数据质量问题,通过 SHOW ROUTINE LOAD 命令中 ErrorLogUrls 检查数据的质量问题。被 where 条件过滤掉的行不算错误行。
strip_outer_array当导入数据格式为 json 时,strip_outer_array 为 true 表示 JSON 数据以数组的情势展现,数据中的每一个元素将被视为一行数据。默认值是 false。通常情况下,Kafka 中的 JSON 数据可能以数组情势表示,即在最外层中包罗中括号[],此时,可以指定 "strip_outer_array" = "true",以数组模式消费 Topic 中的数据。如以下数据会被剖析成两行:[{"user_id":1,"name":"Emily","age":25},{"user_id":2,"name":"Benjamin","age":35}]
  1. 我公司的kafka,是需要设置"strip_outer_array"="true",根据实际来调整。
  2. 有些kafka的数据有脏数据,可以用max_error_number来过滤。或者考虑用脚本来检测:
复制代码
  1. import pymysql  #导入 pymysql
  2. import requests,json
  3. #打开数据库连接
  4. db= pymysql.connect(host="host",user="user",
  5.                     password="passwd",db="database",port=port)
  6. # 使用cursor()方法获取操作游标
  7. cur = db.cursor()
  8. #1.查询操作
  9. # 编写sql 查询语句
  10. sql = "show routine load"
  11. cur.execute(sql)        #执行sql语句
  12. results = cur.fetchall()        #获取查询的所有记录
  13. for row in results :
  14.     name = row[1]
  15.     state = row[7]
  16.     if state != 'RUNNING':
  17.         err_log_urls = row[16]
  18.         reason_state_changed = row[15]
  19.         msg = "doris 数据导入任务异常:\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n即将自动恢复,请检查错误信息" % (name, state,
  20. reason_state_changed, err_log_urls)
  21.         payload_message = {
  22.     "msg_type": "text",
  23.     "content": {
  24.         "text": msg
  25.     }
  26. }
  27.         url = 'lark 报警url'
  28.         s = json.dumps(payload_message)
  29.         r = requests.post(url, data=s)
  30.         cur.execute("resume routine load for " + name)
  31. cur.close()
  32. db.close()
复制代码
  1. 通过 SHOW ROUTINE LOAD 来查看结果
复制代码
  1. mysql> SHOW ROUTINE LOAD FOR testdb.example_routine_load
复制代码
  1. 我是没按官方文档加\G,加了\G报错。
  2. 如果导入数据有问题,可以通过上面的命令,查看下面这2个:
复制代码
ErrorLogUrls被过滤的质量不合格的数据的检察所在
OtherMsg其他错误信息
  1. 具体可以参考:Routine Load - Apache Doris
复制代码

Flink Doris Connector方法

Flink Doris Connector 可以支持通过 Flink 操纵(读取、插入、修改、删除)Doris 中存储的数据。
这种方法适合Doris2.1以下的,大概必要对kafka数据举行复杂过滤或数据转换。Routine Load方法也是可以简单过滤的,但觉得还是不可以或许高度定制:
  1. CREATE ROUTINE LOAD demo.kafka_job04 ON routine_test04
  2.         COLUMNS TERMINATED BY ",",
  3.         WHERE id >= 3
  4.         FROM KAFKA
  5.         (
  6.             "kafka_broker_list" = "10.16.10.6:9092",
  7.             "kafka_topic" = "routineLoad04",
  8.             "property.kafka_default_offsets" = "OFFSET_BEGINNING"
  9.         );  
复制代码
由于公司项目flink是很老版本1.11,如果是新版本flink,请参考官方文档举行调整:Flink Doris Connector - Apache Doris
完整示例

pom依赖如下:
  1. <properties>
  2.         <scala.version>2.12.10</scala.version>
  3.         <scala.binary.version>2.12</scala.binary.version>
  4.         <java.version>1.8</java.version>
  5.         <flink.version>1.12.2</flink.version>
  6.         <fastjson.version>1.2.83</fastjson.version>
  7.         <hadoop.version>2.8.3</hadoop.version>
  8.         <scope.mode>compile</scope.mode>
  9.     </properties>
  10.     <dependencies>
  11.         <dependency>
  12.             <groupId>org.apache.flink</groupId>
  13.             <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
  14.             <version>${flink.version}</version>
  15.         </dependency>
  16.         <dependency>
  17.             <groupId>org.apache.flink</groupId>
  18.             <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
  19.             <version>${flink.version}</version>
  20.         </dependency>
  21.         <dependency>
  22.             <groupId>org.apache.flink</groupId>
  23.             <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  24.             <version>${flink.version}</version>
  25.         </dependency>
  26.         <dependency>
  27.             <groupId>mysql</groupId>
  28.             <artifactId>mysql-connector-java</artifactId>
  29.             <version>8.0.12</version>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>com.alibaba.ververica</groupId>
  33.             <artifactId>flink-connector-mysql-cdc</artifactId>
  34.             <version>1.3.0</version>
  35.         </dependency>
  36.         <dependency>
  37.             <groupId>org.apache.flink</groupId>
  38.             <artifactId>flink-clients_2.11</artifactId>
  39.             <version>${flink.version}</version>
  40.         </dependency>
  41.         <dependency>
  42.             <groupId>org.apache.flink</groupId>
  43.             <artifactId>flink-connector-jdbc_2.11</artifactId>
  44.             <version>${flink.version}</version>
  45.         </dependency>
  46.         <dependency>
  47.             <groupId>org.apache.flink</groupId>
  48.             <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  49.             <version>${flink.version}</version>
  50.         </dependency>
  51.         <dependency>
  52.             <groupId>com.alibaba</groupId>
  53.             <artifactId>fastjson</artifactId>
  54.             <version>${fastjson.version}</version>
  55.         </dependency>
  56.         <dependency>
  57.             <groupId>org.apache.flink</groupId>
  58.             <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  59.             <version>${flink.version}</version>
  60.         </dependency>
  61.         <dependency>
  62.             <groupId>org.apache.hadoop</groupId>
  63.             <artifactId>hadoop-client</artifactId>
  64.             <version>${hadoop.version}</version>
  65.         </dependency>
  66.         <dependency>
  67.             <groupId>org.apache.flink</groupId>
  68.             <artifactId>flink-shaded-hadoop-2-uber</artifactId>
  69.             <version>2.6.5-10.0</version>
  70.         </dependency>
  71.         <dependency>
  72.             <groupId>org.apache.doris</groupId>
  73.             <artifactId>flink-doris-connector-1.14_2.12</artifactId>
  74.             <version>1.0.3</version>
  75.         </dependency>
  76.         <!--After adding the following two dependencies, Flink's log will appear-->
  77.         <dependency>
  78.             <groupId>org.slf4j</groupId>
  79.             <artifactId>slf4j-api</artifactId>
  80.             <version>1.7.25</version>
  81.         </dependency>
  82.         <dependency>
  83.             <groupId>org.slf4j</groupId>
  84.             <artifactId>slf4j-simple</artifactId>
  85.             <version>1.7.25</version>
  86.         </dependency>
  87.       
  88.         <dependency>
  89.             <groupId>org.apache.httpcomponents</groupId>
  90.             <artifactId>httpclient</artifactId>
  91.             <version>4.5.13</version>
  92.         </dependency>
  93.         <dependency>
  94.             <groupId>org.apache.httpcomponents</groupId>
  95.             <artifactId>httpcore</artifactId>
  96.             <version>4.4.12</version>
  97.         </dependency>
  98.     </dependencies>
复制代码

 FlinkKafka2Doris :
  1. public class FlinkKafka2Doris {
  2.     //kafka address
  3.     private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";
  4.     //kafka groupName
  5.     private static final String groupName = "test_flink_doris_group";
  6.     //kafka topicName
  7.     private static final String topicName = "test_flink_doris";
  8.     //doris ip port
  9.     private static final String hostPort = "xxx:8030";
  10.     //doris dbName
  11.     private static final String dbName = "test1";
  12.     //doris tbName
  13.     private static final String tbName = "doris_test_source_2";
  14.     //doris userName
  15.     private static final String userName = "root";
  16.     //doris password
  17.     private static final String password = "";
  18.     //doris columns
  19.     private static final String columns = "name,age,price,sale";
  20.     //json format
  21.     private static final String jsonFormat = "["$.name","$.age","$.price","$.sale"]";
  22.     public static void main(String[] args) throws Exception {
  23.         Properties props = new Properties();
  24.         props.put("bootstrap.servers", bootstrapServer);
  25.         props.put("group.id", groupName);
  26.         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  27.         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  28.         props.put("auto.offset.reset", "earliest");
  29.         props.put("max.poll.records", "10000");
  30.         StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  31.         blinkStreamEnv.enableCheckpointing(10000);
  32.         blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  33.         FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
  34.                 new SimpleStringSchema(),
  35.                 props);
  36.         DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);
  37. //在这里进行数据过滤或转换
  38.         DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
  39.         dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));
  40.         blinkStreamEnv.execute("flink kafka to doris");
  41.     }
  42. }
复制代码

DorisStreamLoad:
  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import java.io.Serializable;
  4. import java.io.IOException;
  5. import java.io.BufferedOutputStream;
  6. import java.io.InputStream;
  7. import java.io.BufferedReader;
  8. import java.io.InputStreamReader;
  9. import java.net.HttpURLConnection;
  10. import java.net.URL;
  11. import java.nio.charset.StandardCharsets;
  12. import java.util.Base64;
  13. import java.util.Calendar;
  14. import java.util.UUID;
  15. /**
  16. * doris streamLoad
  17. */
  18. public class DorisStreamLoad implements Serializable {
  19.     private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
  20.     private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
  21.     private String hostPort;
  22.     private String db;
  23.     private String tbl;
  24.     private String user;
  25.     private String passwd;
  26.     private String loadUrlStr;
  27.     private String authEncoding;
  28.     public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
  29.         this.hostPort = hostPort;
  30.         this.db = db;
  31.         this.tbl = tbl;
  32.         this.user = user;
  33.         this.passwd = passwd;
  34.         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
  35.         this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
  36.     }
  37.     private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
  38.         URL url = new URL(urlStr);
  39.         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
  40.         conn.setInstanceFollowRedirects(false);
  41.         conn.setRequestMethod("PUT");
  42.         conn.setRequestProperty("Authorization", "Basic " + authEncoding);
  43.         conn.addRequestProperty("Expect", "100-continue");
  44.         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
  45.         conn.addRequestProperty("label", label);
  46.         conn.addRequestProperty("max_filter_ratio", "0");
  47.         conn.addRequestProperty("strict_mode", "true");
  48.         conn.addRequestProperty("columns", columns);
  49.         conn.addRequestProperty("format", "json");
  50.         conn.addRequestProperty("jsonpaths", jsonformat);
  51.         conn.addRequestProperty("strip_outer_array", "true");
  52.         conn.setDoOutput(true);
  53.         conn.setDoInput(true);
  54.         return conn;
  55.     }
  56.     public static class LoadResponse {
  57.         public int status;
  58.         public String respMsg;
  59.         public String respContent;
  60.         public LoadResponse(int status, String respMsg, String respContent) {
  61.             this.status = status;
  62.             this.respMsg = respMsg;
  63.             this.respContent = respContent;
  64.         }
  65.         @Override
  66.         public String toString() {
  67.             StringBuilder sb = new StringBuilder();
  68.             sb.append("status: ").append(status);
  69.             sb.append(", resp msg: ").append(respMsg);
  70.             sb.append(", resp content: ").append(respContent);
  71.             return sb.toString();
  72.         }
  73.     }
  74.     public LoadResponse loadBatch(String data, String columns, String jsonformat) {
  75.         Calendar calendar = Calendar.getInstance();
  76.         String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
  77.                 calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
  78.                 calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
  79.                 UUID.randomUUID().toString().replaceAll("-", ""));
  80.         HttpURLConnection feConn = null;
  81.         HttpURLConnection beConn = null;
  82.         try {
  83.             // build request and send to fe
  84.             feConn = getConnection(loadUrlStr, label, columns, jsonformat);
  85.             int status = feConn.getResponseCode();
  86.             // fe send back http response code TEMPORARY_REDIRECT 307 and new be location
  87.             if (status != 307) {
  88.                 throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
  89.             }
  90.             String location = feConn.getHeaderField("Location");
  91.             if (location == null) {
  92.                 throw new Exception("redirect location is null");
  93.             }
  94.             // build request and send to new be location
  95.             beConn = getConnection(location, label, columns, jsonformat);
  96.             // send data to be
  97.             BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
  98.             bos.write(data.getBytes());
  99.             bos.close();
  100.             // get respond
  101.             status = beConn.getResponseCode();
  102.             String respMsg = beConn.getResponseMessage();
  103.             InputStream stream = (InputStream) beConn.getContent();
  104.             BufferedReader br = new BufferedReader(new InputStreamReader(stream));
  105.             StringBuilder response = new StringBuilder();
  106.             String line;
  107.             while ((line = br.readLine()) != null) {
  108.                 response.append(line);
  109.             }
  110.             return new LoadResponse(status, respMsg, response.toString());
  111.         } catch (Exception e) {
  112.             e.printStackTrace();
  113.             String err = "failed to load audit via AuditLoader plugin with label: " + label;
  114.             log.warn(err, e);
  115.             return new LoadResponse(-1, e.getMessage(), err);
  116.         } finally {
  117.             if (feConn != null) {
  118.                 feConn.disconnect();
  119.             }
  120.             if (beConn != null) {
  121.                 beConn.disconnect();
  122.             }
  123.         }
  124.     }
  125. }
复制代码

DorisSink:
  1. import com.alibaba.fastjson.JSON;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import java.util.ArrayList;
  7. import java.util.Arrays;
  8. import java.util.List;
  9. /**
  10. * Custom doris sink
  11. */
  12. public class DorisSink extends RichSinkFunction<String> {
  13.     private static final Logger log = LoggerFactory.getLogger(DorisSink.class);
  14.     private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
  15.     private DorisStreamLoad dorisStreamLoad;
  16.     private String columns;
  17.     private String jsonFormat;
  18.     public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
  19.         this.dorisStreamLoad = dorisStreamLoad;
  20.         this.columns = columns;
  21.         this.jsonFormat = jsonFormat;
  22.     }
  23.     @Override
  24.     public void open(Configuration parameters) throws Exception {
  25.         super.open(parameters);
  26.     }
  27.     /**
  28.      * Determine whether StreamLoad is successful
  29.      *
  30.      * @param respContent streamLoad returns the entity
  31.      * @return
  32.      */
  33.     public static Boolean checkStreamLoadStatus(RespContent respContent) {
  34.         if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
  35.                 && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {
  36.             return true;
  37.         } else {
  38.             return false;
  39.         }
  40.     }
  41.     @Override
  42.     public void invoke(String value, Context context) throws Exception {
  43.         DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
  44.         if (loadResponse != null && loadResponse.status == 200) {
  45.             RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
  46.             if (!checkStreamLoadStatus(respContent)) {
  47.                 log.error("Stream Load fail{}:", loadResponse);
  48.             }
  49.         } else {
  50.             log.error("Stream Load Request failed:{}", loadResponse);
  51.         }
  52.     }
  53. }
复制代码

RespContent:
  1. import java.io.Serializable;
  2. /**
  3. * Entity returned by streamLoad
  4. */
  5. public class RespContent implements Serializable {
  6.     private static final long serialVersionUID = 1L;
  7.     /**
  8.      * Imported transaction ID. The user may not perceive it.
  9.      */
  10.     private int TxnId;
  11.     /**
  12.      * Import Label. Specified by the user or automatically generated by the system.
  13.      */
  14.     private String Label;
  15.     /**
  16.      * Import complete status.
  17.      * "Success": Indicates that the import was successful.
  18.      * "Publish Timeout": This status also indicates that the import has been completed, but the data may be visible with a delay, and there is no need to retry.
  19.      * "Label Already Exists": The Label is duplicated, and the Label needs to be replaced.
  20.      */
  21.     private String Status;
  22.     /**
  23.      * The status of the import job corresponding to the existing Label.
  24.      * This field will only be displayed when the Status is "Label Already Exists".
  25.      * The user can know the status of the import job corresponding to the existing Label through this status.
  26.      * "RUNNING" means that the job is still executing, and "FINISHED" means that the job is successful.
  27.      */
  28.     private String ExistingJobStatus;
  29.     /**
  30.      * Import error information
  31.      */
  32.     private String Message;
  33.     /**
  34.      * Import the total number of processed rows
  35.      */
  36.     private long NumberTotalRows;
  37.     /**
  38.      * The number of rows successfully imported.
  39.      */
  40.     private long NumberLoadedRows;
  41.     /**
  42.      * Number of rows with unqualified data quality。
  43.      */
  44.     private int NumberFilteredRows;
  45.     /**
  46.      * The number of rows filtered by the where condition
  47.      */
  48.     private int NumberUnselectedRows;
  49.     /**
  50.      * Number of bytes imported。
  51.      */
  52.     private long LoadBytes;
  53.     /**
  54.      * Import completion time. The unit is milliseconds.
  55.      */
  56.     private int LoadTimeMs;
  57.     /**
  58.      * The time it takes to request Fe to start a transaction, in milliseconds
  59.      */
  60.     private int BeginTxnTimeMs;
  61.     /**
  62.      * The time it takes to request Fe to obtain the import data execution plan, in milliseconds
  63.      */
  64.     private int StreamLoadPutTimeMs;
  65.     /**
  66.      * The time spent reading data, in milliseconds
  67.      */
  68.     private int ReadDataTimeMs;
  69.     /**
  70.      * Time to perform a data write operation takes milliseconds。
  71.      */
  72.     private int WriteDataTimeMs;
  73.     /**
  74.      * The time taken to submit and publish the transaction request to Fe, in milliseconds
  75.      */
  76.     private int CommitAndPublishTimeMs;
  77.     /**
  78.      * If there is a data quality problem, check the specific error line by visiting this URL
  79.      */
  80.     private String ErrorURL;
  81.     public int getTxnId() {
  82.         return TxnId;
  83.     }
  84.     public void setTxnId(int txnId) {
  85.         TxnId = txnId;
  86.     }
  87.     public String getLabel() {
  88.         return Label;
  89.     }
  90.     public void setLabel(String label) {
  91.         Label = label;
  92.     }
  93.     public String getStatus() {
  94.         return Status;
  95.     }
  96.     public void setStatus(String status) {
  97.         Status = status;
  98.     }
  99.     public String getExistingJobStatus() {
  100.         return ExistingJobStatus;
  101.     }
  102.     public void setExistingJobStatus(String existingJobStatus) {
  103.         ExistingJobStatus = existingJobStatus;
  104.     }
  105.     public String getMessage() {
  106.         return Message;
  107.     }
  108.     public void setMessage(String message) {
  109.         Message = message;
  110.     }
  111.     public long getNumberTotalRows() {
  112.         return NumberTotalRows;
  113.     }
  114.     public void setNumberTotalRows(long numberTotalRows) {
  115.         NumberTotalRows = numberTotalRows;
  116.     }
  117.     public long getNumberLoadedRows() {
  118.         return NumberLoadedRows;
  119.     }
  120.     public void setNumberLoadedRows(long numberLoadedRows) {
  121.         NumberLoadedRows = numberLoadedRows;
  122.     }
  123.     public int getNumberFilteredRows() {
  124.         return NumberFilteredRows;
  125.     }
  126.     public void setNumberFilteredRows(int numberFilteredRows) {
  127.         NumberFilteredRows = numberFilteredRows;
  128.     }
  129.     public int getNumberUnselectedRows() {
  130.         return NumberUnselectedRows;
  131.     }
  132.     public void setNumberUnselectedRows(int numberUnselectedRows) {
  133.         NumberUnselectedRows = numberUnselectedRows;
  134.     }
  135.     public long getLoadBytes() {
  136.         return LoadBytes;
  137.     }
  138.     public void setLoadBytes(long loadBytes) {
  139.         LoadBytes = loadBytes;
  140.     }
  141.     public int getLoadTimeMs() {
  142.         return LoadTimeMs;
  143.     }
  144.     public void setLoadTimeMs(int loadTimeMs) {
  145.         LoadTimeMs = loadTimeMs;
  146.     }
  147.     public int getBeginTxnTimeMs() {
  148.         return BeginTxnTimeMs;
  149.     }
  150.     public void setBeginTxnTimeMs(int beginTxnTimeMs) {
  151.         BeginTxnTimeMs = beginTxnTimeMs;
  152.     }
  153.     public int getStreamLoadPutTimeMs() {
  154.         return StreamLoadPutTimeMs;
  155.     }
  156.     public void setStreamLoadPutTimeMs(int streamLoadPutTimeMs) {
  157.         StreamLoadPutTimeMs = streamLoadPutTimeMs;
  158.     }
  159.     public int getReadDataTimeMs() {
  160.         return ReadDataTimeMs;
  161.     }
  162.     public void setReadDataTimeMs(int readDataTimeMs) {
  163.         ReadDataTimeMs = readDataTimeMs;
  164.     }
  165.     public int getWriteDataTimeMs() {
  166.         return WriteDataTimeMs;
  167.     }
  168.     public void setWriteDataTimeMs(int writeDataTimeMs) {
  169.         WriteDataTimeMs = writeDataTimeMs;
  170.     }
  171.     public int getCommitAndPublishTimeMs() {
  172.         return CommitAndPublishTimeMs;
  173.     }
  174.     public void setCommitAndPublishTimeMs(int commitAndPublishTimeMs) {
  175.         CommitAndPublishTimeMs = commitAndPublishTimeMs;
  176.     }
  177.     public String getErrorURL() {
  178.         return ErrorURL;
  179.     }
  180.     public void setErrorURL(String errorURL) {
  181.         ErrorURL = errorURL;
  182.     }
  183.     @Override
  184.     public String toString() {
  185.         return "RespContent{" +
  186.                 "TxnId=" + TxnId +
  187.                 ", Label='" + Label + '\'' +
  188.                 ", Status='" + Status + '\'' +
  189.                 ", ExistingJobStatus='" + ExistingJobStatus + '\'' +
  190.                 ", Message='" + Message + '\'' +
  191.                 ", NumberTotalRows=" + NumberTotalRows +
  192.                 ", NumberLoadedRows=" + NumberLoadedRows +
  193.                 ", NumberFilteredRows=" + NumberFilteredRows +
  194.                 ", NumberUnselectedRows=" + NumberUnselectedRows +
  195.                 ", LoadBytes=" + LoadBytes +
  196.                 ", LoadTimeMs=" + LoadTimeMs +
  197.                 ", BeginTxnTimeMs=" + BeginTxnTimeMs +
  198.                 ", StreamLoadPutTimeMs=" + StreamLoadPutTimeMs +
  199.                 ", ReadDataTimeMs=" + ReadDataTimeMs +
  200.                 ", WriteDataTimeMs=" + WriteDataTimeMs +
  201.                 ", CommitAndPublishTimeMs=" + CommitAndPublishTimeMs +
  202.                 ", ErrorURL='" + ErrorURL + '\'' +
  203.                 '}';
  204.     }
  205. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

温锦文欧普厨电及净水器总代理

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表