分类目录归档:数据挖掘&大数据

数据挖掘&大数据

一种通用的百亿级数据清洗方案

最近两个月,一直在和刀哥两个人重构连尚读书这边的用户行为日志系统。到本周放量完所有的数据到新系统,基本算告一个段落。

目前的业务情况,大概是有新旧两套日志系统数据,日志规模(2018.11),新日志一天产生6000万条日志,旧日志一天产生4000万条日志数据,合计1亿条上报日志,这些日志数据,都是合并上报的,根据统计,大概一条上报日志,会产生50条最终日志数据。所以最终的日志量,超过了50亿条(写这篇博客时,2019.1,已经超过70亿条)。

根据业务特点,预计的高峰期是10个小时(按40000秒计算), 100000000 / 40000 = 2500条/秒。整个系统每秒要完成 2500条上报日志的收集。然后要转化为 2500 * 50 = 125000条/秒的最终结果日志数据。晚上21 ~ 22点的最高峰,处理量还要预留x4,估计会处理10000条的收集日志数据,完成生产500000条/秒最终结果日志数据。

最终结果日志数据文件的单条大小在1 ~ 2 kbyte之间,多数为1 kbyte

最终结果日志数据,需要存储在本地硬盘+大数据的Kafka集群(9台机器)两边做容灾。

如此大规模的数据,不管是收集,清洗,本地存储,还是入库,都是非常有挑战的一项工作,更何况是重构。。。

一、旧的架构情况,使用的机器规模大概是20台机器.

注释:

1、Bear服务是这边之前的同事用Golang + Leveldb写的一个服务,可以用来接收HTTP请求传输的数据到Leveldb,然后再把对应的数据,发送到指定的HTTP 地址(转发数据)或者是发送到指定的Kafka集群,可以起到数据的缓存作用

2、之前得到的同事的反馈,都说这个Bear服务的性能很高,不存在问题

旧架构存在的问题

1、无法追踪单条数据的处理情况,因为数据没有任何的标号,导致要反查或者是顺查数据,基本没法

2、Bear只会把数据存储在Leveldb里面,外层没有日志,再加上没有源码,无法做任何修改(我也不知道为啥没源码啊)

3、之前的同事反馈说Bear的性能很高,不存在性能问题,但是我们实际观察发现,问题最大的就是这个Bear,一个是接收性能不行,另外一个就是数据转发性能不行,不管是接收HTTP数据,还是转发数据到HTTP或者Kafka集群,Bear都给不出来量,所以导致经常出现拥堵的情况,需要不停的多开Bear服务,其性能有严重的问题

4、本地没有数据,无法做数据重放。不管是收集日志,还是最终日志,我们这边完全没任何原始数据,一旦入了Kafka集群后,我们就无能为力了,无法校对数据,无法重放,无法追踪处理流程

5、因为使用Bear做数据缓存,他只支持HTTP请求的数据清洗,导致只能走php-fpm,然后他的转发性能又不行,所以导致整个系统的数据清洗能力极低

二、新的架构情况,使用的机器规模为12台 + 3 组Redis实例

优点

1、不管是收集的原始日志数据还是处理以后的最终日志数据,我们都有存储,随时可以重放

2、每条日志加了不会重复的logid,可以根据logid追踪到任何一条数据的流转过程和处理结果,并且大数据那边可以根据logid去重数据

3、不管是收集端的API日志服务器,还是Redis,还是消费服务器,都是可以任意的横向的扩展的,而且不影响任何第三方,性能可以直线上升

4、日志数据压缩,采用的是pigz的并行压缩,每天凌晨压缩前一天的日志数据文件,在目前的日志规模下,在一个小时内,可以压缩完成所有的日志,对业务和机器负载不会带来任何影响,而且数据的压缩比率,在1:20以上

5、因为采用了脚本主动拉取的模式,所以只要消费服务器足够,理论上任何数据都会在3s内被处理,达到实时效果

6、业务采用了脚本的模式,执行效率相比原来的Nginx + php-fpm的模式,性能提升了几十倍,而且因为不走HTTP协议了,所以内网的带宽使用也成倍的下降,不到原来的一半

7、因为使用了Logstash的批量提交模式,数据入Kafka的性能,提升了几十倍

8、因为启用了Logstash的永久队列,基本上保障了数据的不丢失,经过对比,和Kafka里面统计出来的数据,误差在百万分之几的级别

9、在入Logstash时,采用了失败就扔Redis队列再用另外的消费脚本入可用的其他Logstash的方式,保障说即使本地的Logstash宕机,数据依然可以正常的入Kafka集群

10、消费脚本进程,采用了supervisor来做管理,新、旧版消费日志脚本,单机各开100个脚本,主要 supervisor 需要采用 3.3以上的版本,3.3以下的版本,对于管理几百个进程,会有问题

11、各段都可以方便的知道性能瓶颈,不管是Filebeat的收集端,Redis队列端,还是Logstash端,都提供了良好的监控数据,可以即时了解系统的工作状态。

三、重构完成后,新旧系统的服务器机器负载对比

旧系统单机

新系统

消费服务器的负载
Redis负载

目前,单日清洗的数据量,已经超过70亿条,按现在的机器负载预估,这5台机器,预计处理清洗150亿条日志数据,没有任何问题。

优化Filebeat+Logstash的性能

最近一直在重构新公司的日志传输系统,因为业务量较大,目前的单日日志条数已经超过了30亿条,而且业务有明显的高峰期。所以日志收集传输系统,必须要满足明显的波峰性能要求。方案还是以Filebeat + Logstash为主,Logstash直接入kafka, Filebeat从磁盘读取文本文件(json格式)。

优化完成后,单filebeat + 单logstash可以处理 30000条/秒的日志. 单filebeat + 多logstash可以处理 40000条/秒的日志.

日志大小为1.2kbyte.

环境信息:

2台服务器, 32core, 200GB内存, SSD硬盘, 千兆网络内网互联

Filebeat 6.5.2

/usr/bin/filebeat -c /data/filebeat.yml

Logstash 6.5.2

/usr/share/logstash/bin/logstash -f /etc/logstash/logstash-file.conf --path.settings /etc/logstash --path.data /data/tmp/logstash/data --path.logs /data/tmp/logstash/logs -w 8

优化后的配置文件信息如下:

filebeat.yml

关键参数:

scan_frequency

harvester_buffer_size

queue.mem.*

filebeat.inputs:
- type: log

  # Change to true to enable this input configuration.
  enabled: true
  encoding: utf-8

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /data/tmp/data/testlogstash3.log
    #- c:\programdata\elasticsearch\logs\*

# Filebeat以多快的频率去prospector指定的目录下面检测文件更新比如是否有新增文件如果设置为0s则Filebeat会尽可能快地感知更新占用的CPU会变高。默认是10s。
  scan_frequency: 1s
  # 如果设置为true, Filebeat从文件尾开始监控文件新增内容把新增的每一行文件作为一个事件依次发送而不是从文件开始处重新发送所有内容。
  tail_files: false
  harvester_buffer_size: 104857600

# backoff选项指定Filebeat如何积极地抓取新文件进行更新。默认1s. backoff选项定义Filebeat在达到EOF之后再次检查文件之间等待的时间
  backoff: 1s
# 在达到EOF之后再次检查文件之前Filebeat等待的最长时间
  max_backoff: 10s

  close_inactive: 6h
  clean_inactive: 72h
  ignore_older: 70h
  close_timeout: 6h
  fields:
    log_name: newdata

- type: log

  # Change to true to enable this input configuration.
  enabled: true
  encoding: utf-8

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /data/topicdata/*.hequandata

# Filebeat以多快的频率去prospector指定的目录下面检测文件更新比如是否有新增文件如果设置为0s则Filebeat会尽可能快地感知更新占用的CPU会变高。默认是10s。
  scan_frequency: 1s
  # 如果设置为true, Filebeat从文件尾开始监控文件新增内容把新增的每一行文件作为一个事件依次发送而不是从文件开始处重新发送所有内容。
  tail_files: false

# backoff选项指定Filebeat如何积极地抓取新文件进行更新。默认1s. backoff选项定义Filebeat在达到EOF之后再次检查文件之间等待的时间
  backoff: 1s
# 在达到EOF之后再次检查文件之前Filebeat等待的最长时间
  max_backoff: 10s

  close_inactive: 6h
  clean_inactive: 72h
  ignore_older: 70h
  close_timeout: 6h
  fields:
    log_name: olddata



#============================= Filebeat modules ===============================

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

#==================== Elasticsearch template setting ==========================

setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false

#================================ General =====================================
filebeat.registry_file: /data/tmp/registry-logstash3.index

max_procs: 8

path.home: /etc/filebeat
path.config: /etc/filebeat
path.data: /data/filebeat/data
path.logs: /data/filebeat/logs

queue.mem.events: 409600
queue.mem.flush.min_events: 1024
queue.mem.flush.timeout: 2s


#================================ Outputs =====================================
output.logstash:
  enable: true
  hosts: ['10.28.3.8:5046','10.28.3.8:5047','10.28.3.8:5048','10.28.3.8:5049']
  loadbalance: true
  worker: 2
  bulk_max_size: 50000
  compression_level: 0


#================================ Logging =====================================

# Minimum log level. One of debug, info, warning, or error. The default log level is info
logging.level: info
logging.to_files: true
logging.files:
  path: /data/tmp
  name: filebeat-logstash3.log
  keepfiles: 7
  rotateeverybytes: 10485760
  interval: 24h
  permissions: 0644

logstash.conf配置文件

input {
    beats {
        port => 5046
    }
}

filter {
    grok {
        match => {
            "message" => "(?<kafkaname>\S+?)##########(?<kafkadata>.*)"
        }
        overwrite => ["message"]
    }
}

output {
    if[fields][log_name] == "newdata" or [fields][log_name] == "olddata" {
        file {
            path => "/data/tmp/logstash/logstash-kafkadata5046.log"
                codec => line {
                    format => "%{+YYYY-MM-dd HH:mm:ss}##########%{kafkadata}"
                }
        }
    }
}

logstash.yml配置文件

pipeline.workers: 15
pipeline.output.workers: 15
pipeline.batch.size: 5000
pipeline.batch.delay: 10

优化后的结果(单Filebeat + 单Logstash):

单条文本大小为 1.2kbyte

优化后的结果(单Filebeat + 4个Logstash):

单条文本大小为 1.2kbyte大小.

MySQL数据仓库——InfoBright的源码安装及使用

最近在搞后台数据分析,涉及大量的数据,他们使用的数据仓库是InfoBright(简称ib),ib提供社区版本(ICE)和商业版本(IEE)。两者区别较大。不过对于即时性要求不是特别严格的需要,社区版本勉强够用了。
两者之间的区别,参考博文: Infobright分享<1>:发展现状和ICE-IEE间区别
本文主要是指导ib的安装及使用(如果涉及boost及其他基础软件版本过低,请自行升级安装)

首先从InfoBright官网下载源码,最新的版本是:infobright-4.0.7-0-src-ice.tar.gz
编译安装

tar zxvf infobright-4.0.7-0-src-ice.tar.gz
cd infobright-4.0.7
make  PREFIX=/usr/local/infobright  EDITION=community release
make PREFIX=/usr/local/infobright   EDITION=community install-release
mkdir /usr/local/infobright/conf  /usr/local/infobright/data /usr/local/infobright/logs
chown -R mysql.mysql /usr/local/infobright/data /usr/local/infobright/logs
cp src/build/pkgmt/my-ib.cnf /usr/local/infobright/conf/my-ib.cnf
/usr/local/infobright/bin/mysql_install_db --basedir=/usr/local/mysql --datadir=/usr/local/infobright/data --user=mysql

修改my-ib.cnf

basedir = /usr/local/infobright
datadir = /usr/local/infobright/data
log-error = /usr/local/infobright/logs/bh.err

启动ib实例

cd /usr/local/infobright
bin/mysqld_safe --defaults-file=conf/my-ib.cnf --user=mysql > /dev/null 2>&1 &

初始化ib实例的密码

/usr/local/infobright/bin/mysqladmin -u root password "123456"

因为使用的ICE社区版本,所以只能使用IB loader导入数据(其实就是只支持csv文件手工导入数据)。
注意:create table t () engine=brighthouse xxx;
创建表时,表的引擎要使用【brighthouse】,这样才会使用到ib仓库的特性(因为infobright-4.0.7带得有myisam、memory等mysql存储引擎,如果不指定,有可能使用到其他引擎)
示例如下,假设csv数据文件为data.csv

bin/mysql -u root -p123456
load data infile "/root/data.csv" into table t_data fields terminated by ',' enclosed by '"' escaped by '\' lines terminated by '\n';

数据导入成功后,就可以在ib中对数据进行sum、avg、group by等数据挖掘操作了。

本文参考了下列文章:
http://www.mysqlsky.com/201109/infobright-data-load-error
http://www.itpuk.net/?p=14
http://www.mysqlsky.com/201110/infobrigh-now-diff