ELK

使用Elasticsearch API接口开发企业级IP反查工具

"ELK"

Posted by y1r0nz on September 30, 2017

前言

目前大部分企业都使用ELK实时日志分析框架(elasticsearch+logstrace+kibana)对日志数据进行收集和分析,ELK良好的拓展性使其更好的适合对日志数据进行深入挖掘。为了更好的使用ELasticSearch接口进行数据收集,必须首先熟悉ElasticSearch的查询语法。Elasticsearch的查询语法使用类似lucence查询语句,常用DSL结构化查询语句对需求数据进行查询,DSL以JSON请求体形式出现。
这里以ip反查为例,假如需求是在指定的时间范围内,根据ip定位到具体的帐号和人名。

GET ES的ip地址/websense_acc*/_search  
{ 
    "query":{
        "bool":{
            "filter":[{
               "range":{
                    "@timestamp":{
                         "gte":starttime,"lte":endtime,"format":"epoch_mills"
                     }
                 }
             },{
                    "query_string":{
                         "analyze_wildcard":"true","query":keyword
                     }
             }]
        }
    } 
}

starttime和endtime为开始结束时间,keyword为查询ip地址关键字
query表示查询类似sql的select
bool表示多条件查询,一般配合must,must not,should使用,相当与sql中and,or语句 filter指的是根据查询条件过滤查询语句 range指定查询范围 query_string是查询字符串关键字,analyze_wildcard参数表示通配符或者前缀是否被分析,默认为false,这里设置为true我的理解是可以根据正则匹配去查询,若为false不会使用通配符,会将查询字符串的各个词分句。
更多ES查询表达式见ESsearch


需求定义

如何根据ip反查精确定位使用电脑的用户账户是比较复杂的是,根据日志数据查询也需要考虑大规模数据量的存储和查询效率,而且ip和使用电脑的用户账户是无法点对点绑定的,是一对多的。同时单个电脑使用dhcp动态解析时候对应的ip地址不可能永远不变,这里就要考虑到可能别的用户使用你要查询的ip地址对应的主机登录之后无法准确定位用户的情况,这种情况只能根据时间段去聚合ip和用户名的对应关系。综上所述,能够达到最好的效果需求如下例所示:
需求其实很简单,如果需要查询的ip对应的主机用户换了马上输出另一个用户并输出该用户使用该ip的时间段。(ES那边根据网关收集的日志里面ip,用户名和时间戳其实都有了。) 比如:
时间段1:用户1 时间段2:用户2 时间段3:用户1 表示时间段2用户2登录过该ip,时间段3用户1又重新登录回该ip对应的主机。


代码实现思路

实现很简单四个函数就行,查询函数、以时间排序函数,时间聚合函数和时间处理函数,当然需要python的Elasticsearch库了,查询函数这里是个坑,由于需要根据时间段返回大量查询结果,查询效率一直是问题,这里初步使用Elasticsearch.helper库中的scan无序查询结果,类似于关系数据库中的游标缓存。再从返回的数据中以时间排序,聚合时间段。后面想用多线程对时间段进行分割,每个线程处理自己规定时间段内的数据,看是否能提高查询效率。


多线程并发ip反查工具开发

未完待续。。。