logstash安装学习

About 8 mintoollogstash

一、简介

1.1 描述

Logstash是用Ruby开发的软件,在配置文件中用{}定义作用域,处理数据的时候主要分3个方面去处理输入、过滤、输出。通过各种渠道去获取数据,在进行加工之后将数据输出的存储的容器。可以作为数据库之间的同步工具,最主要的作用是去获取日志文件结合elasticsearch去进行日志分析。内部主要的处理流程如下:

https://gaoqisen.github.io/GraphBed/202006/20200623130056.png

1.2 官网介绍

Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。

1.3 实际应用

  1. 数据采集工具(主要用于日志采集):最常用的就是ELK进行日志的收集,将各个服务中的日志采集后放入elasticsearch里面。之后用kibanna进行日志分析。
  2. 同步数据库(mysql/oracle)中的数据到elasticsearch里面进行快速的文档查找。一般mysql建议用阿里巴巴的canal进行数据同步,这种同步是通过binlog日志进行同步的,不会影响到数据库。如果同步周期比较大,也可以使用logstash进行同步,logstash是通过执行sql语句后将返回数据进行同步的。

二、安装

  • 下载安装包,地址: https://www.elastic.co/cn/downloads/logstashopen in new window

  • 重要的就是配置文件,如下完成mysql到elasticserch的数据同步:

    # 输入
    input {
      jdbc {
        jdbc_driver_library => "/home/mysql-connector-java-5.1.10.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
        jdbc_user => "root"
        jdbc_password => "123456"
        # 设置监听间隔 各字段含义(从左至右)分、时、天、月、年,全为*默认含义为每分钟都更新
        schedule => "* * * * *"
        # 查询sql,可以通过更新字段来区分那些是需要更新的
        statement_filepath => "/home/logstash-7.8.0/config/complete.sql"
        # 记录最后的运行时间,注意目录需要创建好
        last_run_metadata_path => "/home/logstash-7.8.0/config/logstash_jdbc_last_run_oracle"
        use_column_value => false
        tracking_column => "update_time"
        # 分页处理数据
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        # 类型,对象后面输出的类型
        type => "complete_corporate"
      }
    }
    # 过滤
    filter {
      # 用ruby解决相差8小时的时区问题, update_time必须要通过statement_filepath配置的sql可以查询出来
      ruby { 
          code => "event.set('update_time', event.get('update_time').time.localtime + 8*60*60)"
      }
    }
    # 输出
    output {
      if[type] == "complete_corporate"{
        elasticsearch {
          hosts => "localhost:9200"
          user => elastic                                                                                                                                                                                         
          password => elastic
          # 索引名
          index => "complete_corporate_index"
          # 文档名
          document_type => "complete_corporate"
          # 文档ID(主键)
          document_id => "%{body_card_no}" 
        }
      }
      # 将数据输出到控制台
      stdout {
          codec => json_lines
      }
    }
    
  • 操作命令

    // 解压缩
    tar -zxf xxx.tar.gz
    // 指定配置文件启动命令
    ./logstash -f jdbc_sync.conf
    // 后台运行启动命令
    nohup ./logstash -f jdbc_sync.conf &
    

三、配置

3.1 input,数据的来源

  • file: 输入为文件类型,通常可以去读取日志

    input{
      file{
        #path属性接受的参数是一个数组,其含义是标明需要读取的文件位置
        path => [‘pathA’,‘pathB’]
        #表示多就去path路径下查看是够有新的文件产生。默认是15秒检查一次。
        discover_interval => 15
        #排除那些文件,也就是不去读取那些文件
        exclude => [‘fileName1’,‘fileNmae2’]
        #被监听的文件多久没更新后断开连接不在监听,默认是一个小时。
        close_older => 3600
        #在每次检查文件列 表的时候, 如果一个文件的最后 修改时间 超过这个值, 就忽略这个文件。 默认一天。
        ignore_older => 86400
        #logstash 每隔多 久检查一次被监听文件状态( 是否有更新) , 默认是 1 秒。
        stat_interval => 1
        #sincedb记录数据上一次的读取位置的一个index
        sincedb_path => ’$HOME/. sincedb‘
        #logstash 从什么 位置开始读取文件数据, 默认是结束位置 也可以设置为:beginning 从头开始
        start_position => ‘beginning’
        #注意:这里需要提醒大家的是,如果你需要每次都从同开始读取文件的话,关设置start_position => beginning是没有用的,你可以选择sincedb_path 定义为 /dev/null
      }            
    }
    
  • jdbc: 输入为数据库,同步数据是可以使用

    input{
      jdbc{
        #jdbc sql server 驱动,各个数据库都有对应的驱动,需自己下载
        jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar"
        #jdbc class 不同数据库有不同的 class 配置
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        #配置数据库连接 ip 和端口,以及数据库    
        jdbc_connection_string => "jdbc:sqlserver://200.200.0.18:1433;databaseName=test_db"
        #配置数据库用户名
        jdbc_user =>   
        #配置数据库密码
        jdbc_password =>
        #上面这些都不重要,要是这些都看不懂的话,你的老板估计要考虑换人了。重要的是接下来的内容。
        # 定时器 多久执行一次SQL,默认是一分钟
        # schedule => 分 时 天 月 年  
        # schedule => * 22  *  *  * 表示每天22点执行一次
        schedule => "* * * * *"
        #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        clean_run => false
        #是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称,
        #此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
        use_column_value => true
        #如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的
        tracking_column => create_time
        #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
        record_last_run => true
        #们只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是该文件中的值
        last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info"
        #是否将字段名称转小写。
        #这里有个小的提示,如果你这前就处理过一次数据,并且在Kibana中有对应的搜索需求的话,还是改为true,
        #因为默认是true,并且Kibana是大小写区分的。准确的说应该是ES大小写区分
        lowercase_column_names => false
        #你的SQL的位置,当然,你的SQL也可以直接写在这里。
        #statement => SELECT * FROM tabeName t WHERE  t.creat_time > :last_sql_value
        statement_filepath => "/etc/logstash/statement_file.d/my_info.sql"
        #数据类型,标明你属于那一方势力。单了ES哪里好给你安排不同的山头。
        type => "my_info"
      }
      #注意:外载的SQL文件就是一个文本文件就可以了,还有需要注意的是,一个jdbc{}插件就只能处理一个SQL语句,
      #如果你有多个SQL需要处理的话,只能在重新建立一个jdbc{}插件。
    }
    
  • beats: 输入为接收端口

    input {
      beats {
        #接受数据端口
        port => 5044
        #数据类型
        type => "logs"
      }
    }
    
  • redis: 输入为redis

    input{
        redis {
            host =>"192.168.200.21"
            port =>" 6379"
            db =>"6"
            data_type =>"list"
            key="demo"
        }
    }
    

3.2 filter,数据的处理

  • date: 处理时间格式

    filter {
      date {
       # 解析名为logdate的字段以设置Logstash时间戳
        match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
      }
    }
    
  • grok: 将非结构化事件数据分析到字段中。 这个工具非常适用于系统日志

    filter {
      grok {
        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
      }
    }
    
  • dissect: 使用分隔符将非结构化事件数据提取到字段中。 解剖过滤器不使用正则表达式,速度非常快。

    filter {
      dissect {
        mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}" }
      }
    }
    
  • mutate: 使用最频繁的操作,可以对字段进行各种操作,比如重命名、删除、替换、更新等

    filter{
        mutate{
            convert => {"age" => "integer"}
        }
    }
    

3.3 output,数据的输出

  • elasticsearch:将数据导入到elasticsearch

    # 输出
    output {
      if[type] == "complete_corporate"{
        elasticsearch {
          hosts => "localhost:9200"
          user => elastic                                                                                                                                                                                         
          password => elastic
          # 索引名 
          index => "complete_corporate_index"
          # 文档名
          document_type => "complete_corporate"
          # 文档ID(主键)
          document_id => "%{body_card_no}" 
        }
      }
      stdout {
          codec => json_lines
      }
    }
    
  • jdbc:将数据导入的数据库

    output {
        jdbc {
            driver_jar_path => "/path/mysql-connector-java-5.1.40.jar"
            driver_class => "com.mysql.jdbc.Driver"
            connection_string => "jdbc:mysql://sss:8840/testcase"
            username => "root"
            password => "123456"
            statement => ["INSERT INTO test ( val, name_val, level_val, source_name, version ) VALUES (?,?,?,?,?)","code","name","level","source_name","current_version"]
        }
        stdout {}
    }
    
  • redis:将数据导入到redis

    output {
        redis {
            host =>"192.168.200.21"
            port =>" 6379"
            db =>"6"
            data_type =>"list"
            key="demo"
        }
    }
    
  • file:将数据生成文件

    output{
        if  [type] != "file" { 
           file{
                  path => "/home/app/logbak/%{+YYYY.MM.dd}-file.txt"
                  # 设置根据原始数据格式保存,不会带Json格式
                  codec => line {format => "%{[collectValue]}"}
           }
        }
    }
    
  • json: 将字段内容为json格式的数据进行解析

    filter {
      json {
        source => "message"     #要解析的字段名
        target => "msg_json"    #解析后的存储字段,默认和message同级别
      }
    }   
    
  • geoip: 根据ip地址提供对应的地域信息,比如经纬度、城市名等

    filter {
      geoip {
        source => "clientip"
      }
    }
    
  • ruby: 最灵活的插件,可以以ruby语言来随心所欲的修改Logstash Event对象

    filter{
        ruby{
            code => 'size = event.get("message").size;
                    event.set("message_size",size)'
        }
    }
    
    
    ruby {
            code => "event.set('@read_timestamp',event.get('@timestamp'))"
    }
    

四、遇到的问题

4.1 时区问题

Elasticsearch 内部,对时间类型字段,是统一采用 UTC 时间,存成 long 长整形数据的!对日志统一采用 UTC 时间存储,是国际安全/运维界的一个通识——欧美公司的服务器普遍广泛分布在多个时区里。使用ruby过滤就可以了。

4.2 和filebeat的对比选择

filebeat是一个轻量级的日志收集器,如果每台服务器上面都安装一个logstash的话会造成性能的浪费。常用的做法就是通过filebeat将日志收集起来发送给logstash,之后logstash将数据同步给elasticsearch或者数据库。

五、参考

  • 输入(input): https://yq.aliyun.com/articles/152043?utm_content=m_27192
  • 过滤(filter): https://blog.csdn.net/wfs1994/article/details/80862952
  • 输出(output): https://www.cnblogs.com/niutao/p/10909461.html
Last update:
Contributors: gaoqisen