云计算课程设计(Prometheus+grafana+Flume+ganglia+mysql+jdk)

嚴華  金牌会员 | 2024-8-19 10:47:36 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 687|帖子 687|积分 2061



一、准备情况

   prometheus下载所在: https://github.com/prometheus/prometheus/releases/download/v2.52.0-rc.1/prometheus-2.52.0-rc.1. windows-amd64.zip
grafana 下 载 地 址 :
  https://dl.grafana.com/enterprise/release/grafana-enterprise-10.4.2.windows-amd64.zip windows_exporter下载所在:
  https://github.com/prometheus-community/windows_exporter/releases/download/v0.25.1/wi ndows_exporter-0.25.1-amd64.msi
flume下载所在:
  https://archive.apache.org/dist/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
jdk(linux)下载所在:
  https://cola-yunos-1305721388.cos.ap-guangzhou.myqcloud.com/20210813/jdk-8u401-linux-x64. tar.gz
mysql下载所在
https://downloads.mysql.com/archives/get/p/23/file/mysql-community-server-5.7.26-1.el7.x86_64.rpm
  https://downloads.mysql.com/archives/get/p/23/file/mysql-community-client-5.7.26-1.el7.x86_64.rpm
  https://downloads.mysql.com/archives/get/p/23/file/mysql-community-common-5.7.26-1.el7.x86_64.rpm
  https://downloads.mysql.com/archives/get/p/23/file/mysql-community-libs-5.7.26-1.el7.x86_64.rpm
驱动包下载所在:  
  https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-5.1.16.tar.gz idea
  idea下载所在: 
https://www.jetbrains.com/zh-cn/idea/download/download-thanks.html?platform=windows 
maven下载所在: 
https://dlcdn.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.zip
  二、监督面板搭建

1、解压Prometheus,运行prometheus.exe


2、打开网址


3、配置windows服务器监督:

        双击运行windows_exporter-0.25.1-amd64.msi,在欣赏器进入以下所在:
http://10.225.193.16:9182/metrics,出现以下页面即为运行成功:

4、编辑prometheus.yml(此文件在解压的Prometheus内里)

在文件末段添加以下内容:
  1. - job_name: "windows"
  2.     file_sd_configs:
  3.     - refresh_interval: 15s
  4.       files:
  5.       - ".\\windows.yml"
复制代码

5、在同级目录下新建windows.yml

  1. - targets: ["127.0.0.1:9182"]
  2.   labels:
  3.     instance: 127.0.0.1
  4. serverName: 'windows server'
复制代码
 

6、重启prometheus(就是把黑框框关掉,然后再双击prometheus.exe)

在欣赏器打开:http://127.0.0.1:9090/targets?search=

7、解压grafana,在bin目录打开grafana-server.exe 


8、打开网址

http://127.0.0.1:3000
默认的账号暗码都是admin 

9、添加datasource

 







出现下边页面即为搭建成功

三、Flume日记聚集搭建

1、创建目录

在‘/opt’目录下创建 software和module两个目录,命令如下:
  1. mkdir module software
复制代码

2、利用xftp工具上传文件


3、将JDK和Flume文件解压到‘/opt/module’

  1. tar -zxf jdk-8u371-linux-x64.tar.gz -C /opt/module/
  2. tar -zxf apache-flume-1.6.0-bin.tar.gz -C /opt/module/
复制代码
4、修改安装目录名称

  1. mv jdk1.8.0_371/ jdk
  2. mv apache-flume-1.6.0-bin/ flume
复制代码

5、配置情况

A)JDK情况配置

[root@localhost module]# vi /etc/profile
配置内容如下:
  1. export JAVA_HOME=/opt/module/jdk
  2. export PATH=$PATH:$JAVA_HOME/bin # 将 JAVA 安装目录加入 PATH 路径
复制代码

B)Flume情况配置:

  1. 配置Flume文件
  2. [root@localhost conf]# mv flume-env.sh.template flume-env.sh
复制代码

  1. # 编辑配置文件
  2. [root@localhost conf]# vi flume-env.sh
复制代码

6、刷新情况变量

  1. [root@localhost module]# source /etc/profile
复制代码

四、ganglia监控搭建

1、安装相关依靠包

  1. [root@localhost ~]# yum -y install httpd php
  2. [root@localhost ~]# yum -y install rrdtool perl-rrdtool rrdtool-devel
  3. [root@localhost ~]# yum -y install apr-devel
  4. [root@localhost ~]# yum install -y epel-release
  5. [root@localhost ~]# yum -y install ganglia-gmetad
  6. [root@localhost ~]# yum -y install ganglia-web
  7. [root@localhost ~]# yum install -y ganglia-gmond
复制代码
2、安装telent

  1. [root@localhost ~]# yum install telnet -y
复制代码

3、修改配置文件

  1. [root@localhost ~]# vi /etc/httpd/conf.d/ganglia.conf
复制代码

  1. [root@localhost ~]# vi /etc/ganglia/gmetad.conf
复制代码

  1. 修改host文件
  2. [root@localhost ~]# vi /etc/hosts
复制代码

  1. [root@localhost ~]# vi /etc/ganglia/gmond.conf
复制代码



五、禁用selinux

  1. [root@localhost ~]# vi /etc/selinux/config
复制代码

1、设置服务自启动

  1. [root@master ~]# systemctl enable httpd && systemctl enable gmetad && systemctl enable gmond
复制代码

2、关闭防火墙

  1. [root@master ~]# systemctl stop firewalld && systemctl disable firewalld
复制代码
3、授权

  1. [root@master ~]# chmod -R 777 /var/lib/ganglia
复制代码

4、重启捏造机

  1. [root@master ~]# reboot
复制代码
5、访问ganglia

http://192.168.229.160/ganglia/

6、修改flume配置文件

[root@master ~]# vi /opt/module/flume/conf/flume-env.sh
添加以下内容:
  1. JAVA_OPTS="-Dflume.monitoring.type=ganglia
  2. -Dflume.monitoring.hosts=192.168.229.160:8649  #(修改IP)
  3. -Xms100m
  4. -Xmx200m"
复制代码

7、创建job文件

在‘/opt/module/flume’目录下创建文件夹‘job’

进入该目录下创建一个名字为‘flume-telnet-logger.conf’文件,并编辑该文件,编辑内容如下:
[root@master job]# vi flume-telnet-logger.conf
  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = netcat
  7. a1.sources.r1.bind = 0.0.0.0
  8. a1.sources.r1.port = 44444
  9. # Describe the sink
  10. a1.sinks.k1.type = logger
  11. # Use a channel which buffers events in memory
  12. a1.channels.c1.type = memory
  13. a1.channels.c1.capacity = 1000
  14. a1.channels.c1.transactionCapacity = 100
  15. # Bind the source and sink to the channel
  16. a1.sources.r1.channels = c1
  17. a1.sinks.k1.channel = c1
复制代码
8、启动Flume服务

  1. [root@master ~]# /opt/module/flume/bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet-logger.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.229.160:8649
复制代码

9、重新开启一个xshell端口,发送数据

  1. [root@master ~]# telnet localhost 44444
复制代码
 

10、查察网页端监控情况

http://192.168.229.160/ganglia/?r=hour&cs=&ce=&c=master&h=master&tab=m&vn=&tz=&hide-hf=false&m=load_one&sh=1&z=small&hc=4&host_regex=&max_graphs=0&s=by+name
 

六、flume日记聚集(MySQL)

1、利用xftp工具上传文件


2、删除mariadb依靠


系统中已经安装了 mariadb-libs-5.5.52-2.el7.x86_64 软件包,需要将其卸载
  1. [root@master ~]# rpm -e mariadb-libs-5.5.65-1.el7.x86_64 --nodeps
复制代码

已删除。
3、安装mysql服务

  1. [root@master software]# rpm -ivh mysql-community-common-5.7.18-1.el7.x86_64.rpm --nodeps
  2. [root@master software]# rpm -ivh mysql-community-libs-5.7.18-1.el7.x86_64.rpm --nodeps
  3. [root@master software]# rpm -ivh mysql-community-client-5.7.18-1.el7.x86_64.rpm --nodeps
  4. [root@master software]# rpm -ivh mysql-community-server-5.7.18-1.el7.x86_64.rpm --nodeps
复制代码

4、启动MySQL服务

  1. [root@master software]# systemctl start mysqld
复制代码

已启动。
5、查找MySQL初始暗码

  1. [root@master software]# grep "password" /var/log/mysqld.log
复制代码

6、登录MySQL

  1. [root@master ~]# mysql -uroot -p
  2. # 输入初始密码:>q.r#A9(f&4<
复制代码

7、设置暗码复杂度

  1. set global validate_password_policy=LOW;
复制代码

  1. set global validate_password_length=4;
复制代码

8、修改暗码, 这里设置为root

  1. alter user 'root'@'localhost' identified by 'root';
复制代码

9、开启远程访问

  1. GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'root' WITH GRANT OPTION;
复制代码

10、利用navicat工具远程连接MySQL

确保MySQL服务正在运行,利用navicat举行远程访问:

连接成功
七、创建msyql source jar包

1、安装maven,在windows本机解压后打开conf目录下的setting.conf,添加阿里云镜像源


2、打开idea,新建一个maven项目


项目具体信息,如果本机没有jdk需要安装jdk



3、导入依靠

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <modelVersion>4.0.0</modelVersion>
  6.     <groupId>org.example</groupId>
  7.     <artifactId>mysql_sourcde</artifactId>
  8.     <version>1.0-SNAPSHOT</version>
  9.     <properties>
  10.         <maven.compiler.source>8</maven.compiler.source>
  11.         <maven.compiler.target>8</maven.compiler.target>
  12.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  13.     </properties>
  14.     <dependencies>
  15.         <dependency>
  16.             <groupId>org.apache.flume</groupId>
  17.             <artifactId>flume-ng-core</artifactId>
  18.             <version>1.6.0</version>
  19.         </dependency>
  20.         <dependency>
  21.             <groupId>mysql</groupId>
  22.             <artifactId>mysql-connector-java</artifactId>
  23.             <version>5.1.16</version>
  24.         </dependency>
  25.         <dependency>
  26.             <groupId>log4j</groupId>
  27.             <artifactId>log4j</artifactId>
  28.             <version>1.2.17</version>
  29.         </dependency>
  30.         <dependency>
  31.             <groupId>org.slf4j</groupId>
  32.             <artifactId>slf4j-api</artifactId>
  33.             <version>1.7.12</version>
  34.         </dependency>
  35.         <dependency>
  36.             <groupId>org.slf4j</groupId>
  37.             <artifactId>slf4j-log4j12</artifactId>
  38.             <version>1.7.12</version>
  39.         </dependency>
  40.     </dependencies>
  41. </project>
复制代码
刷新项目

4、创建目录



5、创建资源文件夹



6、创建两个资源文件jdbc.properties和log4j.properties



7、添加配置

jdbc.properties配置文件如下:
  1. dbDriver=com.mysql.jdbc.Driver
  2. dbUrl=jdbc:mysql://192.168.229.160:3306/mysqlsource? useUnicode=true&characterEncoding=utf-8
  3. dbUser=root
  4. dbPassword=root
复制代码
log4j.properties配置文件如下:
  1. #--------console-----------
  2. log4j.rootLogger=info,myconsole,myfile log4j.appender.myconsole=org.apache.log4j.ConsoleAppender log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout #log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
  3. #log4j.rootLogger=error,myfile
  4. log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender log4j.appender.myfile.File=/tmp/flume.log log4j.appender.myfile.layout=org.apache.log4j.PatternLayout log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
复制代码
8、右键java目录,创建包


9、在包内新建两个类



SQLSource代码:
  1. package com.nuit.source;
  2. import org.apache.flume.Context;
  3. import org.apache.flume.Event;
  4. import org.apache.flume.EventDeliveryException;
  5. import org.apache.flume.PollableSource;
  6. import org.apache.flume.conf.Configurable;
  7. import org.apache.flume.event.SimpleEvent;
  8. import org.apache.flume.source.AbstractSource;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import java.text.ParseException;
  12. import java.util.ArrayList;
  13. import java.util.HashMap;
  14. import java.util.List;
  15. public class SQLSource extends AbstractSource implements Configurable, PollableSource {
  16.     //打印日志
  17.     private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);
  18.     //定义sqlHelper
  19.     private SQLSourceHelper sqlSourceHelper;
  20.     @Override
  21.     public long getBackOffSleepIncrement() {
  22.         return 0;
  23.     }
  24.     @Override
  25.     public long getMaxBackOffSleepInterval() {
  26.         return 0;
  27.     }
  28.     @Override
  29.     public void configure(Context context) {
  30.         try {
  31.             //初始化
  32.             sqlSourceHelper = new SQLSourceHelper(context);
  33.         } catch (ParseException e) {
  34.             e.printStackTrace();
  35.         }
  36.     }
  37.     @Override
  38.     public Status process() throws EventDeliveryException {
  39.         try {
  40.             //查询数据表
  41.             List<List<Object>> result = sqlSourceHelper.executeQuery();
  42.             //存放event的集合
  43.             List<Event> events = new ArrayList<>();
  44.             //存放event头集合
  45.             HashMap<String, String> header = new HashMap<>();
  46.             //如果有返回数据,则将数据封装为event
  47.             if (!result.isEmpty()) {
  48.                 List<String> allRows = sqlSourceHelper.getAllRows(result);
  49.                 Event event = null;
  50.                 for (String row : allRows) {
  51.                     event = new SimpleEvent();
  52.                     event.setBody(row.getBytes());
  53.                     event.setHeaders(header);
  54.                     events.add(event);
  55.                 }
  56.                 //将event写入channel
  57.                 this.getChannelProcessor().processEventBatch(events);
  58.                 //更新数据表中的offset信息
  59.                 sqlSourceHelper.updateOffset2DB(result.size());
  60.             }
  61.             //等待时长
  62.             Thread.sleep(sqlSourceHelper.getRunQueryDelay());
  63.             return Status.READY;
  64.         } catch (InterruptedException e) {
  65.             LOG.error("Error procesing row", e);
  66.             return Status.BACKOFF;
  67.         }
  68.     }
  69.     @Override
  70.     public synchronized void stop() {
  71.         LOG.info("Stopping sql source {} ...", getName());
  72.         try {
  73.             //关闭资源
  74.             sqlSourceHelper.close();
  75.         } finally {
  76.             super.stop();
  77.         }
  78.     }
  79. }
复制代码
SQLSourceHelper代码:
  1. package com.nuit.source;
  2. import org.apache.flume.Context;
  3. import org.apache.flume.conf.ConfigurationException;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import java.io.IOException;
  7. import java.sql.*;
  8. import java.text.ParseException;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. import java.util.Properties;
  12. public class SQLSourceHelper {
  13.     private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);
  14.     private int runQueryDelay, //两次查询的时间间隔
  15.             startFrom,            //开始id
  16.             currentIndex,                //当前id
  17.             recordSixe = 0,      //每次查询返回结果的条数
  18.             maxRow;                //每次查询的最大条数
  19.     private String table,       //要操作的表
  20.             columnsToSelect,     //用户传入的查询的列
  21.             customQuery,          //用户传入的查询语句
  22.             query,                 //构建的查询语句
  23.             defaultCharsetResultSet;//编码集
  24.     //上下文,用来获取配置文件
  25.     private Context context;
  26.     //为定义的变量赋值(默认值),可在flume任务的配置文件中修改
  27.     private static final int DEFAULT_QUERY_DELAY = 10000;
  28.     private static final int DEFAULT_START_VALUE = 0;
  29.     private static final int DEFAULT_MAX_ROWS = 2000;
  30.     private static final String DEFAULT_COLUMNS_SELECT = "*";
  31.     private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
  32.     private static Connection conn = null;
  33.     private static PreparedStatement ps = null;
  34.     private static String connectionURL, connectionUserName, connectionPassword;
  35.     //加载静态资源
  36.     static {
  37.         Properties p = new Properties();
  38.         try {
  39.             p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));
  40.             connectionURL = p.getProperty("dbUrl");
  41.             connectionUserName = p.getProperty("dbUser");
  42.             connectionPassword = p.getProperty("dbPassword");
  43.             Class.forName(p.getProperty("dbDriver"));
  44.         } catch (IOException | ClassNotFoundException e) {
  45.             LOG.error(e.toString());
  46.         }
  47.     }
  48.     //获取JDBC连接
  49.     private static Connection InitConnection(String url, String user, String pw) {
  50.         try {
  51.             Connection conn = DriverManager.getConnection(url, user, pw);
  52.             if (conn == null)
  53.                 throw new SQLException();
  54.             return conn;
  55.         } catch (SQLException e) {
  56.             e.printStackTrace();
  57.         }
  58.         return null;
  59.     }
  60.     //构造方法
  61.     SQLSourceHelper(Context context) throws ParseException {
  62.         //初始化上下文
  63.         this.context = context;
  64.         //有默认值参数:获取flume任务配置文件中的参数,读不到的采用默认值
  65.         this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
  66.         this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
  67.         this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
  68.         this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
  69.         //无默认值参数:获取flume任务配置文件中的参数
  70.         this.table = context.getString("table");
  71.         this.customQuery = context.getString("custom.query");
  72.         connectionURL = context.getString("connection.url");
  73.         connectionUserName = context.getString("connection.user");
  74.         connectionPassword = context.getString("connection.password");
  75.         conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
  76.         //校验相应的配置信息,如果没有默认值的参数也没赋值,抛出异常
  77.         checkMandatoryProperties();
  78.         //获取当前的id
  79.         currentIndex = getStatusDBIndex(startFrom);
  80.         //构建查询语句
  81.         query = buildQuery();
  82.     }
  83.     //校验相应的配置信息(表,查询语句以及数据库连接的参数)
  84.     private void checkMandatoryProperties() {
  85.         if (table == null) {
  86.             throw new ConfigurationException("property table not set");
  87.         }
  88.         if (connectionURL == null) {
  89.             throw new ConfigurationException("connection.url property not set");
  90.         }
  91.         if (connectionUserName == null) {
  92.             throw new ConfigurationException("connection.user property not set");
  93.         }
  94.         if (connectionPassword == null) {
  95.             throw new ConfigurationException("connection.password property not set");
  96.         }
  97.     }
  98.     //构建sql语句
  99.     private String buildQuery() {
  100.         String sql = "";
  101.         //获取当前id
  102.         currentIndex = getStatusDBIndex(startFrom);
  103.         LOG.info(currentIndex + "");
  104.         if (customQuery == null) {
  105.             sql = "SELECT " + columnsToSelect + " FROM " + table;
  106.         } else {
  107.             sql = customQuery;
  108.         }
  109.         StringBuilder execSql = new StringBuilder(sql);
  110.         //以id作为offset
  111.         if (!sql.contains("where")) {
  112.             execSql.append(" where ");
  113.             execSql.append("id").append(">").append(currentIndex);
  114.             return execSql.toString();
  115.         } else {
  116.             int length = execSql.toString().length();
  117.             return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
  118.         }
  119.     }
  120.     //执行查询
  121.     List<List<Object>> executeQuery() {
  122.         try {
  123.             //每次执行查询时都要重新生成sql,因为id不同
  124.             customQuery = buildQuery();
  125.             //存放结果的集合
  126.             List<List<Object>> results = new ArrayList<>();
  127.             if (ps == null) {
  128.                 //
  129.                 ps = conn.prepareStatement(customQuery);
  130.             }
  131.             ResultSet result = ps.executeQuery(customQuery);
  132.             while (result.next()) {
  133.                 //存放一条数据的集合(多个列)
  134.                 List<Object> row = new ArrayList<>();
  135.                 //将返回结果放入集合
  136.                 for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
  137.                     row.add(result.getObject(i));
  138.                 }
  139.                 results.add(row);
  140.             }
  141.             LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());
  142.             return results;
  143.         } catch (SQLException e) {
  144.             LOG.error(e.toString());
  145.             // 重新连接
  146.             conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
  147.         }
  148.         return null;
  149.     }
  150.     //将结果集转化为字符串,每一条数据是一个list集合,将每一个小的list集合转化为字符串
  151.     List<String> getAllRows(List<List<Object>> queryResult) {
  152.         List<String> allRows = new ArrayList<>();
  153.         if (queryResult == null || queryResult.isEmpty())
  154.             return allRows;
  155.         StringBuilder row = new StringBuilder();
  156.         for (List<Object> rawRow : queryResult) {
  157.             Object value = null;
  158.             for (Object aRawRow : rawRow) {
  159.                 value = aRawRow;
  160.                 if (value == null) {
  161.                     row.append(",");
  162.                 } else {
  163.                     row.append(aRawRow.toString()).append(",");
  164.                 }
  165.             }
  166.             allRows.add(row.toString());
  167.             row = new StringBuilder();
  168.         }
  169.         return allRows;
  170.     }
  171.     //更新offset元数据状态,每次返回结果集后调用。必须记录每次查询的offset值,为程序中断续跑数据时使用,以id为offset
  172.     void updateOffset2DB(int size) {
  173.         //以source_tab做为KEY,如果不存在则插入,存在则更新(每个源表对应一条记录)
  174.         String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('"
  175.                 + this.table
  176.                 + "','" + (recordSixe += size)
  177.                 + "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";
  178.         LOG.info("updateStatus Sql:" + sql);
  179.         execSql(sql);
  180.     }
  181.     //执行sql语句
  182.     private void execSql(String sql) {
  183.         try {
  184.             ps = conn.prepareStatement(sql);
  185.             LOG.info("exec::" + sql);
  186.             ps.execute();
  187.         } catch (SQLException e) {
  188.             e.printStackTrace();
  189.         }
  190.     }
  191.     //获取当前id的offset
  192.     private Integer getStatusDBIndex(int startFrom) {
  193.         //从flume_meta表中查询出当前的id是多少
  194.         String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");
  195.         if (dbIndex != null) {
  196.             return Integer.parseInt(dbIndex);
  197.         }
  198.         //如果没有数据,则说明是第一次查询或者数据表中还没有存入数据,返回最初传入的值
  199.         return startFrom;
  200.     }
  201.     //查询一条数据的执行语句(当前id)
  202.     private String queryOne(String sql) {
  203.         ResultSet result = null;
  204.         try {
  205.             ps = conn.prepareStatement(sql);
  206.             result = ps.executeQuery();
  207.             while (result.next()) {
  208.                 return result.getString(1);
  209.             }
  210.         } catch (SQLException e) {
  211.             e.printStackTrace();
  212.         }
  213.         return null;
  214.     }
  215.     //关闭相关资源
  216.     void close() {
  217.         try {
  218.             ps.close();
  219.             conn.close();
  220.         } catch (SQLException e) {
  221.             e.printStackTrace();
  222.         }
  223.     }
  224.     int getCurrentIndex() {
  225.         return currentIndex;
  226.     }
  227.     void setCurrentIndex(int newValue) {
  228.         currentIndex = newValue;
  229.     }
  230.     int getRunQueryDelay() {
  231.         return runQueryDelay;
  232.     }
  233.     String getQuery() {
  234.         return query;
  235.     }
  236.     String getConnectionURL() {
  237.         return connectionURL;
  238.     }
  239.     private boolean isCustomQuerySet() {
  240.         return (customQuery != null);
  241.     }
  242.     Context getContext() {
  243.         return context;
  244.     }
  245.     public String getConnectionUserName() {
  246.         return connectionUserName;
  247.     }
  248.     public String getConnectionPassword() {
  249.         return connectionPassword;
  250.     }
  251.     String getDefaultCharsetResultSet() {
  252.         return defaultCharsetResultSet;
  253.     }
  254. }
复制代码
10、打jar包


打包结果:

11、打包成功后,target目录会出现一个jar包


12、打开jar包的位置,并利用xftp工具上传到服务器



13、添加jar包依靠

  1. # 解压mysql-connector-java-5.1.16.tar.gz
  2. [root@master software]# tar -zxf mysql-connector-java-5.1.16.tar.gz -C /opt/module/
  3. # 复制‘mysql-connector-java-5.1.16/mysql-connector-java-5.1.16-bin.jar’、‘mysql_source-1.0-SNAPSHOT.jar’两个压缩包到‘/opt/module/flume/lib/‘目录下
  4. [root@master software]# cp -p mysql_source-1.0-SNAPSHOT.jar /opt/module/flume/lib/
  5. [root@master module]# cp -p mysql-connector-java-5.1.16/mysql-connector-java-5.1.16-bin.jar /opt/module/flume/lib/
复制代码
14、创建job文件

[root@master job]# pwd
/opt/module/flume/job
[root@master job]# vi mysql.conf
编辑内容如下:
  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = com.nuit.source.SQLSource  
  7. a1.sources.r1.connection.url = jdbc:mysql://192.168.229.160:3306/mysqlsource
  8. a1.sources.r1.connection.user = root
  9. a1.sources.r1.connection.password = root
  10. a1.sources.r1.table = student
  11. a1.sources.r1.columns.to.select = *
  12. #a1.sources.r1.incremental.column.name = id
  13. #a1.sources.r1.incremental.value = 0
  14. a1.sources.r1.run.query.delay=5000
  15. # Describe the sink
  16. a1.sinks.k1.type = logger
  17. # Describe the channel
  18. a1.channels.c1.type = memory
  19. a1.channels.c1.capacity = 1000
  20. a1.channels.c1.transactionCapacity = 100
  21. # Bind the source and sink to the channel
  22. a1.sources.r1.channels = c1
  23. a1.sinks.k1.channel = c1
复制代码
八、测试结果

1、创建一个名为’mysqlsource‘数据库


2、选择刚刚创建的数据库


3、创建数据表

  1. CREATE TABLE `student` (
  2. `id` int(11) NOT NULL AUTO_INCREMENT,
  3. `name` varchar(255) NOT NULL,
  4. PRIMARY KEY (`id`)
  5. );
  6. CREATE TABLE `flume_meta` (
  7. `source_tab` varchar(255) NOT NULL,
  8. `currentIndex` varchar(255) NOT NULL,
  9. PRIMARY KEY (`source_tab`)
  10. );
复制代码

4、向表中插入数据

  1. insert into student values(1,'zhangsan');
  2. insert into student values(2,'lisi');
  3. insert into student values(3,'wangwu');
  4. insert into student values(4,'zhaoliu');
复制代码

5、查察表的数据


6、退出MySQL,启动flume服务

  1. [root@master flume]# bin/flume-ng agent --conf conf/ --name a1 \
  2. > --conf-file job/mysql.conf -Dflume.root.logger=INFO,console
复制代码

发现我们插入数据的日记,至此搭建完成!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

嚴華

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表