温锦文欧普厨电及净水器总代理 发表于 2025-1-22 05:20:39

Flink把kafa数据写入Doris的N种方法及对比。

用Flink+Doris来开发实时数仓,首要办理是怎样接入kafka实时流,下面是参考Doris官方文档和代码,在本身项目开发的实践中总结,包罗一些容易踩坑的细节。

Routine Load方法

如果Doris是2.1以上,不必要复杂的数据转换的,发起利用Doris自带的Routine Load,实测利用方便,性能高。
 接入kafka实时数据

Doris 可以通过 Routine Load 导入方式持续消费 Kafka Topic 中的数据。在提交 Routine Load 作业后,Doris 会持续运行该导入作业,实时生成导入任务不断消费 Kakfa 集群中指定 Topic 中的消息。
Routine Load 是一个流式导入作业,支持 Exactly-Once 语义,保证数据不丢不重。下面示例怎样通过拉入kafka数据(json格式):
首先是创建必要导入的表:
CREATE TABLE testdb.test_routineload_tbl(
    user_id            BIGINT       NOT NULL COMMENT "user id",
    name               VARCHAR(20)         COMMENT "name",
    age                INT                   COMMENT "age"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10; 如果要接入的数据是主键不重复的,可以设置为Unique模型,如许可以删除或修改。

创建 Routine Load 导入作业
在 Doris 中,利用 CREATE ROUTINE LOAD 命令,创建导入作业
CREATE ROUTINE LOAD testdb.example_routine_load_json ON test_routineload_tbl
COLUMNS(user_id,name,age)
PROPERTIES(
    "format"="json",
    "max_error_number" = "999999999999",
   "strip_outer_array"="true",
    "jsonpaths"="[\"$.user_id\",\"$.name\",\"$.age\"]"
)
FROM KAFKA(
    "kafka_broker_list" = "192.168.88.62:9092",
    "kafka_topic" = "test-routine-load-json",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
); 踩坑的问题细节 

max_error_number采样窗口内,答应的最大错误行数。必须大于便是 0。默认是 0,即不答应有错误行。采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被停息,必要人工介入检查数据质量问题,通过 SHOW ROUTINE LOAD 命令中 ErrorLogUrls 检查数据的质量问题。被 where 条件过滤掉的行不算错误行。 strip_outer_array当导入数据格式为 json 时,strip_outer_array 为 true 表示 JSON 数据以数组的情势展现,数据中的每一个元素将被视为一行数据。默认值是 false。通常情况下,Kafka 中的 JSON 数据可能以数组情势表示,即在最外层中包罗中括号[],此时,可以指定 "strip_outer_array" = "true",以数组模式消费 Topic 中的数据。如以下数据会被剖析成两行:[{"user_id":1,"name":"Emily","age":25},{"user_id":2,"name":"Benjamin","age":35}] 我公司的kafka,是需要设置"strip_outer_array"="true",根据实际来调整。

有些kafka的数据有脏数据,可以用max_error_number来过滤。或者考虑用脚本来检测:
import pymysql#导入 pymysql
import requests,json


#打开数据库连接
db= pymysql.connect(host="host",user="user",
                  password="passwd",db="database",port=port)

# 使用cursor()方法获取操作游标
cur = db.cursor()

#1.查询操作
# 编写sql 查询语句
sql = "show routine load"
cur.execute(sql)      #执行sql语句
results = cur.fetchall()      #获取查询的所有记录
for row in results :
    name = row
    state = row
    if state != 'RUNNING':
      err_log_urls = row
      reason_state_changed = row
      msg = "doris 数据导入任务异常:\n name=%s \n state=%s \n reason_state_changed=%s \n err_log_urls=%s \n即将自动恢复,请检查错误信息" % (name, state,
reason_state_changed, err_log_urls)
      payload_message = {
    "msg_type": "text",
    "content": {
      "text": msg
    }
}
      url = 'lark 报警url'
      s = json.dumps(payload_message)
      r = requests.post(url, data=s)
      cur.execute("resume routine load for " + name)

cur.close()
db.close() 通过 SHOW ROUTINE LOAD 来查看结果
mysql> SHOW ROUTINE LOAD FOR testdb.example_routine_load 我是没按官方文档加\G,加了\G报错。

如果导入数据有问题,可以通过上面的命令,查看下面这2个:
ErrorLogUrls被过滤的质量不合格的数据的检察所在OtherMsg其他错误信息 具体可以参考:Routine Load - Apache Doris
Flink Doris Connector方法

Flink Doris Connector 可以支持通过 Flink 操纵(读取、插入、修改、删除)Doris 中存储的数据。
这种方法适合Doris2.1以下的,大概必要对kafka数据举行复杂过滤或数据转换。Routine Load方法也是可以简单过滤的,但觉得还是不可以或许高度定制:
CREATE ROUTINE LOAD demo.kafka_job04 ON routine_test04
      COLUMNS TERMINATED BY ",",
      WHERE id >= 3
      FROM KAFKA
      (
            "kafka_broker_list" = "10.16.10.6:9092",
            "kafka_topic" = "routineLoad04",
            "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      ); 由于公司项目flink是很老版本1.11,如果是新版本flink,请参考官方文档举行调整:Flink Doris Connector - Apache Doris
完整示例

pom依赖如下:
<properties>
      <scala.version>2.12.10</scala.version>
      <scala.binary.version>2.12</scala.binary.version>
      <java.version>1.8</java.version>
      <flink.version>1.12.2</flink.version>
      <fastjson.version>1.2.83</fastjson.version>
      <hadoop.version>2.8.3</hadoop.version>
      <scope.mode>compile</scope.mode>
    </properties>
    <dependencies>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.12</version>
      </dependency>
      <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.3.0</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.6.5-10.0</version>
      </dependency>
      <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.14_2.12</artifactId>
            <version>1.0.3</version>
      </dependency>
      <!--After adding the following two dependencies, Flink's log will appear-->
      <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
      </dependency>
      <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
      </dependency>
      
      <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.13</version>
      </dependency>
      <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.12</version>
      </dependency>
    </dependencies>
 FlinkKafka2Doris :
public class FlinkKafka2Doris {
    //kafka address
    private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";
    //kafka groupName
    private static final String groupName = "test_flink_doris_group";
    //kafka topicName
    private static final String topicName = "test_flink_doris";
    //doris ip port
    private static final String hostPort = "xxx:8030";
    //doris dbName
    private static final String dbName = "test1";
    //doris tbName
    private static final String tbName = "doris_test_source_2";
    //doris userName
    private static final String userName = "root";
    //doris password
    private static final String password = "";
    //doris columns
    private static final String columns = "name,age,price,sale";
    //json format
    private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";

    public static void main(String[] args) throws Exception {

      Properties props = new Properties();
      props.put("bootstrap.servers", bootstrapServer);
      props.put("group.id", groupName);
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("auto.offset.reset", "earliest");
      props.put("max.poll.records", "10000");

      StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      blinkStreamEnv.enableCheckpointing(10000);
      blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

      FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
                new SimpleStringSchema(),
                props);

      DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);

//在这里进行数据过滤或转换

      DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);

      dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));

      blinkStreamEnv.execute("flink kafka to doris");

    }
}
DorisStreamLoad:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.io.Serializable;
import java.io.IOException;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Calendar;
import java.util.UUID;


/**
* doris streamLoad
*/

public class DorisStreamLoad implements Serializable {

    private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);

    private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
    private String hostPort;
    private String db;
    private String tbl;
    private String user;
    private String passwd;
    private String loadUrlStr;
    private String authEncoding;


    public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
      this.hostPort = hostPort;
      this.db = db;
      this.tbl = tbl;
      this.user = user;
      this.passwd = passwd;
      this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
      this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
    }

    private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
      URL url = new URL(urlStr);
      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
      conn.setInstanceFollowRedirects(false);
      conn.setRequestMethod("PUT");
      conn.setRequestProperty("Authorization", "Basic " + authEncoding);
      conn.addRequestProperty("Expect", "100-continue");
      conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
      conn.addRequestProperty("label", label);
      conn.addRequestProperty("max_filter_ratio", "0");
      conn.addRequestProperty("strict_mode", "true");
      conn.addRequestProperty("columns", columns);
      conn.addRequestProperty("format", "json");
      conn.addRequestProperty("jsonpaths", jsonformat);
      conn.addRequestProperty("strip_outer_array", "true");
      conn.setDoOutput(true);
      conn.setDoInput(true);

      return conn;
    }

    public static class LoadResponse {
      public int status;
      public String respMsg;
      public String respContent;

      public LoadResponse(int status, String respMsg, String respContent) {
            this.status = status;
            this.respMsg = respMsg;
            this.respContent = respContent;
      }

      @Override
      public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("status: ").append(status);
            sb.append(", resp msg: ").append(respMsg);
            sb.append(", resp content: ").append(respContent);
            return sb.toString();
      }
    }

    public LoadResponse loadBatch(String data, String columns, String jsonformat) {
      Calendar calendar = Calendar.getInstance();
      String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
                calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
                calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                UUID.randomUUID().toString().replaceAll("-", ""));

      HttpURLConnection feConn = null;
      HttpURLConnection beConn = null;
      try {
            // build request and send to fe
            feConn = getConnection(loadUrlStr, label, columns, jsonformat);
            int status = feConn.getResponseCode();
            // fe send back http response code TEMPORARY_REDIRECT 307 and new be location
            if (status != 307) {
                throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
            }
            String location = feConn.getHeaderField("Location");
            if (location == null) {
                throw new Exception("redirect location is null");
            }
            // build request and send to new be location
            beConn = getConnection(location, label, columns, jsonformat);
            // send data to be
            BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
            bos.write(data.getBytes());
            bos.close();

            // get respond
            status = beConn.getResponseCode();
            String respMsg = beConn.getResponseMessage();
            InputStream stream = (InputStream) beConn.getContent();
            BufferedReader br = new BufferedReader(new InputStreamReader(stream));
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = br.readLine()) != null) {
                response.append(line);
            }
            return new LoadResponse(status, respMsg, response.toString());

      } catch (Exception e) {
            e.printStackTrace();
            String err = "failed to load audit via AuditLoader plugin with label: " + label;
            log.warn(err, e);
            return new LoadResponse(-1, e.getMessage(), err);
      } finally {
            if (feConn != null) {
                feConn.disconnect();
            }
            if (beConn != null) {
                beConn.disconnect();
            }
      }
    }

}
DorisSink:
import com.alibaba.fastjson.JSON;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Custom doris sink
*/
public class DorisSink extends RichSinkFunction<String> {


    private static final Logger log = LoggerFactory.getLogger(DorisSink.class);

    private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));

    private DorisStreamLoad dorisStreamLoad;

    private String columns;

    private String jsonFormat;

    public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
      this.dorisStreamLoad = dorisStreamLoad;
      this.columns = columns;
      this.jsonFormat = jsonFormat;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
    }


    /**
   * Determine whether StreamLoad is successful
   *
   * @param respContent streamLoad returns the entity
   * @return
   */
    public static Boolean checkStreamLoadStatus(RespContent respContent) {
      if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
                && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {
            return true;
      } else {
            return false;
      }
    }

    @Override
    public void invoke(String value, Context context) throws Exception {
      DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
      if (loadResponse != null && loadResponse.status == 200) {
            RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
            if (!checkStreamLoadStatus(respContent)) {
                log.error("Stream Load fail{}:", loadResponse);
            }
      } else {
            log.error("Stream Load Request failed:{}", loadResponse);
      }
    }
}
RespContent:
import java.io.Serializable;

/**
* Entity returned by streamLoad
*/
public class RespContent implements Serializable {

    private static final long serialVersionUID = 1L;


    /**
   * Imported transaction ID. The user may not perceive it.
   */
    private int TxnId;
    /**
   * Import Label. Specified by the user or automatically generated by the system.
   */
    private String Label;
    /**
   * Import complete status.
   * "Success": Indicates that the import was successful.
   * "Publish Timeout": This status also indicates that the import has been completed, but the data may be visible with a delay, and there is no need to retry.
   * "Label Already Exists": The Label is duplicated, and the Label needs to be replaced.
   */
    private String Status;
    /**
   * The status of the import job corresponding to the existing Label.
   * This field will only be displayed when the Status is "Label Already Exists".
   * The user can know the status of the import job corresponding to the existing Label through this status.
   * "RUNNING" means that the job is still executing, and "FINISHED" means that the job is successful.
   */
    private String ExistingJobStatus;
    /**
   * Import error information
   */
    private String Message;
    /**
   * Import the total number of processed rows
   */
    private long NumberTotalRows;
    /**
   * The number of rows successfully imported.
   */
    private long NumberLoadedRows;
    /**
   * Number of rows with unqualified data quality。
   */
    private int NumberFilteredRows;
    /**
   * The number of rows filtered by the where condition
   */
    private int NumberUnselectedRows;
    /**
   * Number of bytes imported。
   */
    private long LoadBytes;
    /**
   * Import completion time. The unit is milliseconds.
   */
    private int LoadTimeMs;
    /**
   * The time it takes to request Fe to start a transaction, in milliseconds
   */
    private int BeginTxnTimeMs;
    /**
   * The time it takes to request Fe to obtain the import data execution plan, in milliseconds
   */
    private int StreamLoadPutTimeMs;
    /**
   * The time spent reading data, in milliseconds
   */
    private int ReadDataTimeMs;
    /**
   * Time to perform a data write operation takes milliseconds。
   */
    private int WriteDataTimeMs;
    /**
   * The time taken to submit and publish the transaction request to Fe, in milliseconds
   */
    private int CommitAndPublishTimeMs;
    /**
   * If there is a data quality problem, check the specific error line by visiting this URL
   */
    private String ErrorURL;

    public int getTxnId() {
      return TxnId;
    }

    public void setTxnId(int txnId) {
      TxnId = txnId;
    }

    public String getLabel() {
      return Label;
    }

    public void setLabel(String label) {
      Label = label;
    }

    public String getStatus() {
      return Status;
    }

    public void setStatus(String status) {
      Status = status;
    }

    public String getExistingJobStatus() {
      return ExistingJobStatus;
    }

    public void setExistingJobStatus(String existingJobStatus) {
      ExistingJobStatus = existingJobStatus;
    }

    public String getMessage() {
      return Message;
    }

    public void setMessage(String message) {
      Message = message;
    }

    public long getNumberTotalRows() {
      return NumberTotalRows;
    }

    public void setNumberTotalRows(long numberTotalRows) {
      NumberTotalRows = numberTotalRows;
    }

    public long getNumberLoadedRows() {
      return NumberLoadedRows;
    }

    public void setNumberLoadedRows(long numberLoadedRows) {
      NumberLoadedRows = numberLoadedRows;
    }

    public int getNumberFilteredRows() {
      return NumberFilteredRows;
    }

    public void setNumberFilteredRows(int numberFilteredRows) {
      NumberFilteredRows = numberFilteredRows;
    }

    public int getNumberUnselectedRows() {
      return NumberUnselectedRows;
    }

    public void setNumberUnselectedRows(int numberUnselectedRows) {
      NumberUnselectedRows = numberUnselectedRows;
    }

    public long getLoadBytes() {
      return LoadBytes;
    }

    public void setLoadBytes(long loadBytes) {
      LoadBytes = loadBytes;
    }

    public int getLoadTimeMs() {
      return LoadTimeMs;
    }

    public void setLoadTimeMs(int loadTimeMs) {
      LoadTimeMs = loadTimeMs;
    }

    public int getBeginTxnTimeMs() {
      return BeginTxnTimeMs;
    }

    public void setBeginTxnTimeMs(int beginTxnTimeMs) {
      BeginTxnTimeMs = beginTxnTimeMs;
    }

    public int getStreamLoadPutTimeMs() {
      return StreamLoadPutTimeMs;
    }

    public void setStreamLoadPutTimeMs(int streamLoadPutTimeMs) {
      StreamLoadPutTimeMs = streamLoadPutTimeMs;
    }

    public int getReadDataTimeMs() {
      return ReadDataTimeMs;
    }

    public void setReadDataTimeMs(int readDataTimeMs) {
      ReadDataTimeMs = readDataTimeMs;
    }

    public int getWriteDataTimeMs() {
      return WriteDataTimeMs;
    }

    public void setWriteDataTimeMs(int writeDataTimeMs) {
      WriteDataTimeMs = writeDataTimeMs;
    }

    public int getCommitAndPublishTimeMs() {
      return CommitAndPublishTimeMs;
    }

    public void setCommitAndPublishTimeMs(int commitAndPublishTimeMs) {
      CommitAndPublishTimeMs = commitAndPublishTimeMs;
    }

    public String getErrorURL() {
      return ErrorURL;
    }

    public void setErrorURL(String errorURL) {
      ErrorURL = errorURL;
    }

    @Override
    public String toString() {
      return "RespContent{" +
                "TxnId=" + TxnId +
                ", Label='" + Label + '\'' +
                ", Status='" + Status + '\'' +
                ", ExistingJobStatus='" + ExistingJobStatus + '\'' +
                ", Message='" + Message + '\'' +
                ", NumberTotalRows=" + NumberTotalRows +
                ", NumberLoadedRows=" + NumberLoadedRows +
                ", NumberFilteredRows=" + NumberFilteredRows +
                ", NumberUnselectedRows=" + NumberUnselectedRows +
                ", LoadBytes=" + LoadBytes +
                ", LoadTimeMs=" + LoadTimeMs +
                ", BeginTxnTimeMs=" + BeginTxnTimeMs +
                ", StreamLoadPutTimeMs=" + StreamLoadPutTimeMs +
                ", ReadDataTimeMs=" + ReadDataTimeMs +
                ", WriteDataTimeMs=" + WriteDataTimeMs +
                ", CommitAndPublishTimeMs=" + CommitAndPublishTimeMs +
                ", ErrorURL='" + ErrorURL + '\'' +
                '}';
    }
}

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Flink把kafa数据写入Doris的N种方法及对比。