一、准备情况
prometheus下载所在: https://github.com/prometheus/prometheus/releases/download/v2.52.0-rc.1/prometheus-2.52.0-rc.1. windows-amd64.zip
grafana 下 载 地 址 :
https://dl.grafana.com/enterprise/release/grafana-enterprise-10.4.2.windows-amd64.zip windows_exporter下载所在:
https://github.com/prometheus-community/windows_exporter/releases/download/v0.25.1/wi ndows_exporter-0.25.1-amd64.msi
flume下载所在:
https://archive.apache.org/dist/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
jdk(linux)下载所在:
https://cola-yunos-1305721388.cos.ap-guangzhou.myqcloud.com/20210813/jdk-8u401-linux-x64. tar.gz
mysql下载所在
https://downloads.mysql.com/archives/get/p/23/file/mysql-community-server-5.7.26-1.el7.x86_64.rpm
https://downloads.mysql.com/archives/get/p/23/file/mysql-community-client-5.7.26-1.el7.x86_64.rpm
https://downloads.mysql.com/archives/get/p/23/file/mysql-community-common-5.7.26-1.el7.x86_64.rpm
https://downloads.mysql.com/archives/get/p/23/file/mysql-community-libs-5.7.26-1.el7.x86_64.rpm
驱动包下载所在:
https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-5.1.16.tar.gz idea
idea下载所在:
https://www.jetbrains.com/zh-cn/idea/download/download-thanks.html?platform=windows
maven下载所在:
https://dlcdn.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.zip
二、监督面板搭建
1、解压Prometheus,运行prometheus.exe
2、打开网址
3、配置windows服务器监督:
双击运行windows_exporter-0.25.1-amd64.msi,在欣赏器进入以下所在:
http://10.225.193.16:9182/metrics,出现以下页面即为运行成功:
4、编辑prometheus.yml(此文件在解压的Prometheus内里)
在文件末段添加以下内容:
- - job_name: "windows"
- file_sd_configs:
- - refresh_interval: 15s
- files:
- - ".\\windows.yml"
复制代码
5、在同级目录下新建windows.yml
- - targets: ["127.0.0.1:9182"]
- labels:
- instance: 127.0.0.1
- serverName: 'windows server'
复制代码
6、重启prometheus(就是把黑框框关掉,然后再双击prometheus.exe)
在欣赏器打开:http://127.0.0.1:9090/targets?search=
7、解压grafana,在bin目录打开grafana-server.exe
8、打开网址
http://127.0.0.1:3000
默认的账号暗码都是admin
9、添加datasource
出现下边页面即为搭建成功
三、Flume日记聚集搭建
1、创建目录
在‘/opt’目录下创建 software和module两个目录,命令如下:
2、利用xftp工具上传文件
3、将JDK和Flume文件解压到‘/opt/module’
- tar -zxf jdk-8u371-linux-x64.tar.gz -C /opt/module/
- tar -zxf apache-flume-1.6.0-bin.tar.gz -C /opt/module/
复制代码 4、修改安装目录名称
- mv jdk1.8.0_371/ jdk
- mv apache-flume-1.6.0-bin/ flume
复制代码
5、配置情况
A)JDK情况配置
[root@localhost module]# vi /etc/profile
配置内容如下:
- export JAVA_HOME=/opt/module/jdk
- export PATH=$PATH:$JAVA_HOME/bin # 将 JAVA 安装目录加入 PATH 路径
复制代码
B)Flume情况配置:
- 配置Flume文件
- [root@localhost conf]# mv flume-env.sh.template flume-env.sh
复制代码
- # 编辑配置文件
- [root@localhost conf]# vi flume-env.sh
复制代码
6、刷新情况变量
- [root@localhost module]# source /etc/profile
复制代码
四、ganglia监控搭建
1、安装相关依靠包
- [root@localhost ~]# yum -y install httpd php
- [root@localhost ~]# yum -y install rrdtool perl-rrdtool rrdtool-devel
- [root@localhost ~]# yum -y install apr-devel
- [root@localhost ~]# yum install -y epel-release
- [root@localhost ~]# yum -y install ganglia-gmetad
- [root@localhost ~]# yum -y install ganglia-web
- [root@localhost ~]# yum install -y ganglia-gmond
复制代码 2、安装telent
- [root@localhost ~]# yum install telnet -y
复制代码
3、修改配置文件
- [root@localhost ~]# vi /etc/httpd/conf.d/ganglia.conf
复制代码
- [root@localhost ~]# vi /etc/ganglia/gmetad.conf
复制代码
- 修改host文件
- [root@localhost ~]# vi /etc/hosts
复制代码
- [root@localhost ~]# vi /etc/ganglia/gmond.conf
复制代码
五、禁用selinux
- [root@localhost ~]# vi /etc/selinux/config
复制代码
1、设置服务自启动
- [root@master ~]# systemctl enable httpd && systemctl enable gmetad && systemctl enable gmond
复制代码
2、关闭防火墙
- [root@master ~]# systemctl stop firewalld && systemctl disable firewalld
复制代码 3、授权
- [root@master ~]# chmod -R 777 /var/lib/ganglia
复制代码
4、重启捏造机
5、访问ganglia
http://192.168.229.160/ganglia/
6、修改flume配置文件
[root@master ~]# vi /opt/module/flume/conf/flume-env.sh
添加以下内容:
- JAVA_OPTS="-Dflume.monitoring.type=ganglia
- -Dflume.monitoring.hosts=192.168.229.160:8649 #(修改IP)
- -Xms100m
- -Xmx200m"
复制代码
7、创建job文件
在‘/opt/module/flume’目录下创建文件夹‘job’
进入该目录下创建一个名字为‘flume-telnet-logger.conf’文件,并编辑该文件,编辑内容如下:
[root@master job]# vi flume-telnet-logger.conf
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # Describe/configure the source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = 0.0.0.0
- a1.sources.r1.port = 44444
- # Describe the sink
- a1.sinks.k1.type = logger
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
复制代码 8、启动Flume服务
- [root@master ~]# /opt/module/flume/bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet-logger.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.229.160:8649
复制代码
9、重新开启一个xshell端口,发送数据
- [root@master ~]# telnet localhost 44444
复制代码
10、查察网页端监控情况
http://192.168.229.160/ganglia/?r=hour&cs=&ce=&c=master&h=master&tab=m&vn=&tz=&hide-hf=false&m=load_one&sh=1&z=small&hc=4&host_regex=&max_graphs=0&s=by+name
六、flume日记聚集(MySQL)
1、利用xftp工具上传文件
2、删除mariadb依靠
系统中已经安装了 mariadb-libs-5.5.52-2.el7.x86_64 软件包,需要将其卸载
- [root@master ~]# rpm -e mariadb-libs-5.5.65-1.el7.x86_64 --nodeps
复制代码
已删除。
3、安装mysql服务
- [root@master software]# rpm -ivh mysql-community-common-5.7.18-1.el7.x86_64.rpm --nodeps
- [root@master software]# rpm -ivh mysql-community-libs-5.7.18-1.el7.x86_64.rpm --nodeps
- [root@master software]# rpm -ivh mysql-community-client-5.7.18-1.el7.x86_64.rpm --nodeps
- [root@master software]# rpm -ivh mysql-community-server-5.7.18-1.el7.x86_64.rpm --nodeps
复制代码
4、启动MySQL服务
- [root@master software]# systemctl start mysqld
复制代码
已启动。
5、查找MySQL初始暗码
- [root@master software]# grep "password" /var/log/mysqld.log
复制代码
6、登录MySQL
- [root@master ~]# mysql -uroot -p
- # 输入初始密码:>q.r#A9(f&4<
复制代码
7、设置暗码复杂度
- set global validate_password_policy=LOW;
复制代码
- set global validate_password_length=4;
复制代码
8、修改暗码, 这里设置为root
- alter user 'root'@'localhost' identified by 'root';
复制代码
9、开启远程访问
- GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'root' WITH GRANT OPTION;
复制代码
10、利用navicat工具远程连接MySQL
确保MySQL服务正在运行,利用navicat举行远程访问:
连接成功
七、创建msyql source jar包
1、安装maven,在windows本机解压后打开conf目录下的setting.conf,添加阿里云镜像源
2、打开idea,新建一个maven项目
项目具体信息,如果本机没有jdk需要安装jdk
3、导入依靠
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.example</groupId>
- <artifactId>mysql_sourcde</artifactId>
- <version>1.0-SNAPSHOT</version>
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>1.6.0</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.16</version>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.12</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.12</version>
- </dependency>
- </dependencies>
- </project>
复制代码 刷新项目
4、创建目录
5、创建资源文件夹
6、创建两个资源文件jdbc.properties和log4j.properties
7、添加配置
jdbc.properties配置文件如下:
- dbDriver=com.mysql.jdbc.Driver
- dbUrl=jdbc:mysql://192.168.229.160:3306/mysqlsource? useUnicode=true&characterEncoding=utf-8
- dbUser=root
- dbPassword=root
复制代码 log4j.properties配置文件如下:
- #--------console-----------
- log4j.rootLogger=info,myconsole,myfile log4j.appender.myconsole=org.apache.log4j.ConsoleAppender log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout #log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
- #log4j.rootLogger=error,myfile
- log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender log4j.appender.myfile.File=/tmp/flume.log log4j.appender.myfile.layout=org.apache.log4j.PatternLayout log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
复制代码 8、右键java目录,创建包
9、在包内新建两个类
SQLSource代码:
SQLSourceHelper代码:
- package com.nuit.source;
- import org.apache.flume.Context;
- import org.apache.flume.conf.ConfigurationException;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.IOException;
- import java.sql.*;
- import java.text.ParseException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
- public class SQLSourceHelper {
- private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);
- private int runQueryDelay, //两次查询的时间间隔
- startFrom, //开始id
- currentIndex, //当前id
- recordSixe = 0, //每次查询返回结果的条数
- maxRow; //每次查询的最大条数
- private String table, //要操作的表
- columnsToSelect, //用户传入的查询的列
- customQuery, //用户传入的查询语句
- query, //构建的查询语句
- defaultCharsetResultSet;//编码集
- //上下文,用来获取配置文件
- private Context context;
- //为定义的变量赋值(默认值),可在flume任务的配置文件中修改
- private static final int DEFAULT_QUERY_DELAY = 10000;
- private static final int DEFAULT_START_VALUE = 0;
- private static final int DEFAULT_MAX_ROWS = 2000;
- private static final String DEFAULT_COLUMNS_SELECT = "*";
- private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
- private static Connection conn = null;
- private static PreparedStatement ps = null;
- private static String connectionURL, connectionUserName, connectionPassword;
- //加载静态资源
- static {
- Properties p = new Properties();
- try {
- p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));
- connectionURL = p.getProperty("dbUrl");
- connectionUserName = p.getProperty("dbUser");
- connectionPassword = p.getProperty("dbPassword");
- Class.forName(p.getProperty("dbDriver"));
- } catch (IOException | ClassNotFoundException e) {
- LOG.error(e.toString());
- }
- }
- //获取JDBC连接
- private static Connection InitConnection(String url, String user, String pw) {
- try {
- Connection conn = DriverManager.getConnection(url, user, pw);
- if (conn == null)
- throw new SQLException();
- return conn;
- } catch (SQLException e) {
- e.printStackTrace();
- }
- return null;
- }
- //构造方法
- SQLSourceHelper(Context context) throws ParseException {
- //初始化上下文
- this.context = context;
- //有默认值参数:获取flume任务配置文件中的参数,读不到的采用默认值
- this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
- this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
- this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
- this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
- //无默认值参数:获取flume任务配置文件中的参数
- this.table = context.getString("table");
- this.customQuery = context.getString("custom.query");
- connectionURL = context.getString("connection.url");
- connectionUserName = context.getString("connection.user");
- connectionPassword = context.getString("connection.password");
- conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
- //校验相应的配置信息,如果没有默认值的参数也没赋值,抛出异常
- checkMandatoryProperties();
- //获取当前的id
- currentIndex = getStatusDBIndex(startFrom);
- //构建查询语句
- query = buildQuery();
- }
- //校验相应的配置信息(表,查询语句以及数据库连接的参数)
- private void checkMandatoryProperties() {
- if (table == null) {
- throw new ConfigurationException("property table not set");
- }
- if (connectionURL == null) {
- throw new ConfigurationException("connection.url property not set");
- }
- if (connectionUserName == null) {
- throw new ConfigurationException("connection.user property not set");
- }
- if (connectionPassword == null) {
- throw new ConfigurationException("connection.password property not set");
- }
- }
- //构建sql语句
- private String buildQuery() {
- String sql = "";
- //获取当前id
- currentIndex = getStatusDBIndex(startFrom);
- LOG.info(currentIndex + "");
- if (customQuery == null) {
- sql = "SELECT " + columnsToSelect + " FROM " + table;
- } else {
- sql = customQuery;
- }
- StringBuilder execSql = new StringBuilder(sql);
- //以id作为offset
- if (!sql.contains("where")) {
- execSql.append(" where ");
- execSql.append("id").append(">").append(currentIndex);
- return execSql.toString();
- } else {
- int length = execSql.toString().length();
- return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
- }
- }
- //执行查询
- List<List<Object>> executeQuery() {
- try {
- //每次执行查询时都要重新生成sql,因为id不同
- customQuery = buildQuery();
- //存放结果的集合
- List<List<Object>> results = new ArrayList<>();
- if (ps == null) {
- //
- ps = conn.prepareStatement(customQuery);
- }
- ResultSet result = ps.executeQuery(customQuery);
- while (result.next()) {
- //存放一条数据的集合(多个列)
- List<Object> row = new ArrayList<>();
- //将返回结果放入集合
- for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
- row.add(result.getObject(i));
- }
- results.add(row);
- }
- LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());
- return results;
- } catch (SQLException e) {
- LOG.error(e.toString());
- // 重新连接
- conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
- }
- return null;
- }
- //将结果集转化为字符串,每一条数据是一个list集合,将每一个小的list集合转化为字符串
- List<String> getAllRows(List<List<Object>> queryResult) {
- List<String> allRows = new ArrayList<>();
- if (queryResult == null || queryResult.isEmpty())
- return allRows;
- StringBuilder row = new StringBuilder();
- for (List<Object> rawRow : queryResult) {
- Object value = null;
- for (Object aRawRow : rawRow) {
- value = aRawRow;
- if (value == null) {
- row.append(",");
- } else {
- row.append(aRawRow.toString()).append(",");
- }
- }
- allRows.add(row.toString());
- row = new StringBuilder();
- }
- return allRows;
- }
- //更新offset元数据状态,每次返回结果集后调用。必须记录每次查询的offset值,为程序中断续跑数据时使用,以id为offset
- void updateOffset2DB(int size) {
- //以source_tab做为KEY,如果不存在则插入,存在则更新(每个源表对应一条记录)
- String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('"
- + this.table
- + "','" + (recordSixe += size)
- + "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";
- LOG.info("updateStatus Sql:" + sql);
- execSql(sql);
- }
- //执行sql语句
- private void execSql(String sql) {
- try {
- ps = conn.prepareStatement(sql);
- LOG.info("exec::" + sql);
- ps.execute();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- //获取当前id的offset
- private Integer getStatusDBIndex(int startFrom) {
- //从flume_meta表中查询出当前的id是多少
- String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");
- if (dbIndex != null) {
- return Integer.parseInt(dbIndex);
- }
- //如果没有数据,则说明是第一次查询或者数据表中还没有存入数据,返回最初传入的值
- return startFrom;
- }
- //查询一条数据的执行语句(当前id)
- private String queryOne(String sql) {
- ResultSet result = null;
- try {
- ps = conn.prepareStatement(sql);
- result = ps.executeQuery();
- while (result.next()) {
- return result.getString(1);
- }
- } catch (SQLException e) {
- e.printStackTrace();
- }
- return null;
- }
- //关闭相关资源
- void close() {
- try {
- ps.close();
- conn.close();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- int getCurrentIndex() {
- return currentIndex;
- }
- void setCurrentIndex(int newValue) {
- currentIndex = newValue;
- }
- int getRunQueryDelay() {
- return runQueryDelay;
- }
- String getQuery() {
- return query;
- }
- String getConnectionURL() {
- return connectionURL;
- }
- private boolean isCustomQuerySet() {
- return (customQuery != null);
- }
- Context getContext() {
- return context;
- }
- public String getConnectionUserName() {
- return connectionUserName;
- }
- public String getConnectionPassword() {
- return connectionPassword;
- }
- String getDefaultCharsetResultSet() {
- return defaultCharsetResultSet;
- }
- }
复制代码 10、打jar包
打包结果:
11、打包成功后,target目录会出现一个jar包
12、打开jar包的位置,并利用xftp工具上传到服务器
13、添加jar包依靠
- # 解压mysql-connector-java-5.1.16.tar.gz
- [root@master software]# tar -zxf mysql-connector-java-5.1.16.tar.gz -C /opt/module/
- # 复制‘mysql-connector-java-5.1.16/mysql-connector-java-5.1.16-bin.jar’、‘mysql_source-1.0-SNAPSHOT.jar’两个压缩包到‘/opt/module/flume/lib/‘目录下
- [root@master software]# cp -p mysql_source-1.0-SNAPSHOT.jar /opt/module/flume/lib/
- [root@master module]# cp -p mysql-connector-java-5.1.16/mysql-connector-java-5.1.16-bin.jar /opt/module/flume/lib/
复制代码 14、创建job文件
[root@master job]# pwd
/opt/module/flume/job
[root@master job]# vi mysql.conf
编辑内容如下:
- # Name the components on this agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # Describe/configure the source
- a1.sources.r1.type = com.nuit.source.SQLSource
- a1.sources.r1.connection.url = jdbc:mysql://192.168.229.160:3306/mysqlsource
- a1.sources.r1.connection.user = root
- a1.sources.r1.connection.password = root
- a1.sources.r1.table = student
- a1.sources.r1.columns.to.select = *
- #a1.sources.r1.incremental.column.name = id
- #a1.sources.r1.incremental.value = 0
- a1.sources.r1.run.query.delay=5000
- # Describe the sink
- a1.sinks.k1.type = logger
- # Describe the channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
复制代码 八、测试结果
1、创建一个名为’mysqlsource‘数据库
2、选择刚刚创建的数据库
3、创建数据表
- CREATE TABLE `student` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `name` varchar(255) NOT NULL,
- PRIMARY KEY (`id`)
- );
- CREATE TABLE `flume_meta` (
- `source_tab` varchar(255) NOT NULL,
- `currentIndex` varchar(255) NOT NULL,
- PRIMARY KEY (`source_tab`)
- );
复制代码
4、向表中插入数据
- insert into student values(1,'zhangsan');
- insert into student values(2,'lisi');
- insert into student values(3,'wangwu');
- insert into student values(4,'zhaoliu');
复制代码
5、查察表的数据
6、退出MySQL,启动flume服务
- [root@master flume]# bin/flume-ng agent --conf conf/ --name a1 \
- > --conf-file job/mysql.conf -Dflume.root.logger=INFO,console
复制代码
发现我们插入数据的日记,至此搭建完成!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |