flume怎么采集远程服务器上的日志
log4jrootLogger=INFO,A1,R
# ConsoleAppender out
log4jappenderA1= org apachelog4jConsoleAppender
log4jappenderA1layout= org apachelog4jPatternLayout
log4jappenderA1layoutConversionPattern=%d{ yyyy/MM/ ddHH:mm:ss}%-5p%-10C {1} %m%n
# File out
//日志Appender修改为flume提供的Log4jAppender
log4jappenderR= org apache flumeclientslog4jappenderLog4jAppender
log4jappenderRFile=${ catalinahome}/logs/ ultraIDCPServerlog
//日志需要发送到的端口号,该端口要有ARVO类型的source在监听
log4jappenderRPort =44444
//日志需要发送到的主机ip,该主机运行着ARVO类型的source
log4jappenderRHostname = localhost
log4jappenderRMaxFileSize=102400KB
# log4jappenderRMaxBackupIndex=5
log4jappenderRlayout= org apachelog4jPatternLayout
log4jappenderRlayoutConversionPattern=%d{ yyyy/MM/ ddHH\: mm\: ss}%-5p%-10C {1} %m%n
log4jappenderRencoding=UTF-8
log4jloggercomultrapowerultracollectorwebserviceMessageIntercommunionInterfaceImpl=INFO, webservice
log4jappenderwebservice= org apachelog4jFileAppender
log4jappenderwebserviceFile=${ catalinahome}/logs/logsMsgIntercommunionInterfacelog
log4jappenderwebservicelayout= org apachelog4jPatternLayout
log4jappenderwebservicelayoutConversionPattern=%d{ yyyy/MM/ ddHH\: mm\: ss}%-5p[%t]%l%X-%m%n
log4jappenderwebserviceencoding=UTF-8
注:Log4jAppender继承自AppenderSkeleton,没有日志文件达到特定大小,转换到新的文件的功能
113 flume agent配置
agent1sources = source1
agent1sinks = sink1
agent1channels = channel1
# Describe/configure source1
agent1sourcessource1type = avro
agent1sourcessource1bind = 1921680141
agent1sourcessource1port = 44444
# Describe sink1
agent1sinkssink1type = FILE_ROLL
agent1sinkssink1sinkdirectory = /home/yubojie/flume/apache-flume-120/flume-out
# Use a channel which buffers events in memory
agent1channelschannel1type = memory
agent1channelschannel1capacity = 1000
agent1channelschannel1transactionCapactiy = 100
# Bind the source and sink to the channel
agent1sourcessource1channels = channel1
agent1sinkssink1channel = channel1
注:生成的文件的规则为每隔固定时间间隔生成一个新的文件,文件里面保存该时间段agent接收到的信息
12 分析
1 使用简便,工作量小。
2 用户应用程序使用log4j作为日志记录jar包,而且项目中使用的jar包要在log4j-1215版本以上,
3 应用系统必须将flume所需jar包引入到项目中。如下所示为所有必须jar包:可能会存在jar冲突,影响应用运行
4 能够提供可靠的数据传输,使用flume log4jAppender采集日志可以不在客户机上启动进程,而只通过修改logapppender直接把日志信息发送到采集机(参见图一),此种情况可以保证采集机接受到数据之后的数据可靠性,但是客户机与采集机连接失败时候数据会丢失。改进方案是在客户机上启动一个agent,这样可以保证客户机和采集机不能连通时,当能连通是日志也被采集上来,不会发送数据的丢失(参见图二),为了可靠性,需在客户机上启动进程
13 日志代码
Loginfo(“this message has DEBUG in it”);
14 采集到的数据样例
this message has DEBUG in it
this message has DEBUG in it
2 Exec source(放弃)
The problem with ExecSource and other asynchronous sources is that thesource can not guarantee that if there is a failure to put the event into theChannel the client knows about it In such cases, the data will be lost As afor instance, one of the most commonly requested features is thetail -F [file]-like use casewhere an application writes to a log file on disk and Flume tails the file,sending each line as an event While this is possible, there’s an obviousproblem; what happens if the channel fills up and Flume can’t send an eventFlume has no way of indicating to the application writing the log file that itneeds to retain the log or that the event hasn’t been sent, for some reason Ifthis doesn’t make sense, you need only know this: Your application can neverguarantee data has been received when using a unidirectional asynchronousinterface such as ExecSource! As an extension of this warning - and to becompletely clear - there is absolutely zero guarantee of event delivery whenusing this source You have been warned
注:即使是agent内部的可靠性都不能保证
21 使用说明
211 flume agent配置
# The configuration file needs to define the sources,
# the channels and the sinks
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
# exampleconf: A single-node Flume configuration
# Name the components on this agent
agent1sources = source1
agent1sinks = sink1
agent1channels = channel1
# Describe/configure source1
#agent1sourcessource1type = avro
agent1sourcessource1type = exec
agent1sourcessource1command = tail -f /home/yubojie/logs/ultraIDCPServerlog
#agent1sourcessource1bind = 1921680146
#agent1sourcessource1port = 44444
agent1sourcessource1interceptors = a
agent1sourcessource1interceptorsatype = orgapacheflumeinterceptorHostInterceptor$Builder
agent1sourcessource1interceptorsapreserveExisting = false
agent1sourcessource1interceptorsahostHeader = hostname
# Describe sink1
#agent1sinkssink1type = FILE_ROLL
#agent1sinkssink1sinkdirectory = /home/yubojie/flume/apache-flume-120/flume-out
agent1sinkssink1type = hdfs
agent1sinkssink1hdfspath = hdfs://localhost:9000/user/
agent1sinkssink1hdfsfileType = DataStream
# Use a channel which buffers events in memory
agent1channelschannel1type = memory
agent1channelschannel1capacity = 1000
agent1channelschannel1transactionCapactiy = 100
# Bind the source and sink to the channel
agent1sourcessource1channels = channel1
agent1sinkssink1channel = channel1
22 分析
1 tail方式采集日志需要宿主主机能够执行tail命令,应该是只有linux系统可以执行,不支持window系统日志采集
2 EXEC采用异步方式采集,会发生日志丢失,即使在节点内的数据也不能保证数据的完整
3 tail方式采集需要宿主操作系统支持tail命令,即原始的windows操作系统不支持tail命令采集
23 采集到的数据样例
2012/10/26 02:36:34 INFO LogTest this message has DEBUG 中文 in it
2012/10/26 02:40:12 INFO LogTest this message has DEBUG 中文 in it
24 日志代码
Loginfo(“this message has DEBUG 中文 in it”);
3 Syslog
Passing messages using syslogprotocol doesn't work well for longer messages The syslog appender forLog4j is hardcoded to linewrap around 1024 characters in order to comply withthe RFC I got a sample program logging to syslog, picking it up with asyslogUdp source, with a JSON layout (to avoid new-lines in stack traces) onlyto find that anything but the smallest stack trace line-wrapped anyway Ican't see a way to reliably reconstruct the stack trace once it is wrapped andsent through the flume chain(注:内容不确定是否12版本)
Syslog TCP需要指定eventsize,默认为2500
Syslog UDP为不可靠传输,数据传输过程中可能出现丢失数据的情况。
1syslog 警报兼容性
Suricata可以通过sylog发出警报,这是中央日志收集,合规性和向SIEM报告的一个非常方便的功能。有关设置的说明可以在yaml文件中找到,您可以在其中配置您想要的警报(和其他)日志类型。
但是,有不同的syslog守护进程,并且可能存在SIEM期望的syslog格式以及Suricata发送的syslog格式的解析问题。Suricata的syslog格式依赖于Suricata传感器上运行的syslog守护程序,但它发送的格式通常不是SIEM期望的格式,也无法正确解析。
11 通用的syslog守护进程
syslogd - 记录系统消息
syslog-ng - 记录系统消息,但也支持TCP,TLS和其他增强的企业功能
rsyslogd - 记录系统消息,但也支持TCP,TLS,多线程和其他增强功能
klogd - 记录内核消息
sysklogd - 基本上是一堆 syslogd和klogd
如果Suricata传感器发送的syslog格式与SIEM或syslog收集器所期望的格式不兼容,则需要解决此问题。您可以在SIEM上执行此操作如果它能够配置为解释消息,或者通过在Suricata传感器本身上配置syslog守护程序以SIEM可以解析的格式发送。后者可以通过将模板应用于syslog配置文件来完成。
12 找到你正在使用的syslog守护进程
有很多方法可以找出你正在使用的syslog守护进程,但这里有一种方法:
cd /etc/initd
ls | grep syslog
您应该看到一个带有syslog字样的文件,例如“syslog”,“rsyslogd”等。显然,如果名称是“rsyslogd”,您可以相当自信地运行rsyslogd。如果不确定或文件名只是“syslog”,请查看该文件。例如,如果是“rsyslogd”,请运行:
less rsyslogd
在顶部你应该看到一个看起来像这样的注释行:
# rsyslog Starts rsyslogd/rklogd
找到这些文件并查看它们,以便为您提供有关正在运行的syslog守护程序的线索。另请查看您运行“less”的文件的 start() 部分,看看哪些二进制文件已启动,因为这也可以为您提供线索。
13 示例
下面是一个示例,其中Suricata传感器以rsyslogd格式发送系统日志消息,但SIEM期望并以sysklogd格式解析它们。在syslog配置文件中(通常在/ etc中使用rsyslogconf或syslogconf等文件名),首先添加模板:
$template sysklogd, "<%PRI%>%syslogtag:1:32%%msg:::sp-if-no-1st-sp%%msg%"
然后将其发送到syslog服务器并应用模板:
useralert@1087524:514;sysklogd
当然这只是一个例子,它可能会在您的环境中有所不同,具体取决于您使用的syslog守护程序和SIEM,但希望这会指向正确的方向。
作为一个BSD衍生的操作系统,OSX继承了很多BSD的特性,包括POSIX系统调用、一些BSD扩展(内核队列)以及BSD的强制访问控制。
苹果新增的内容:“沙盒”机制。替换了原本系统配置的/etc目录。标准的UNIX syslog被AppleSystem Log增强了。还有FSEvents新技术。
尽管XNU的绝对的核心是Mach,但XNU向用户态展现出来的主要接口是BSD层。
sysctl(8)访问内核内部状态的标准方法。直接查询内核变量的值,获得重要的运行时诊断信息。也可以设置可写变量的值。
内核组件也可以在运行会注册额外的sysctl变量值,甚至增加整个名称空间。
sysctl变量的范围很广,包括从一些简单的调试变量到其他可控制整个子系统行为的读写变量。ps和netstat都依赖于sysctl获得PID和socket系列,当然也可以通过其他方式获得。
kqueue是BSD中使用的内核事件通知机制。一个kqueue指的是一个描述符,这个描述符会阻塞等待知道一个特定类型和种类的事件发生。用户态的进程(或内核)可以等待这个描述符,因为kqueue提供了一种用于一个或多个进程同步的简单高效的方法。
kqueue和对应的 kevent(表示事件的数据结构) 构成了内核异步I/O的基础。
审计是OSX中一个自包含的子系统。主要的用户态组件是auditd,由launchd根据需要而启动的后台服务进程。这个后台服务进程不负责实际的审计日志记录,审计日志记录是由内核本身直接完成的。然后这个后台服务进程能控制内核。
如果启用了审计,那么XNU中编号从350到359的系统调用都被分配用于启用和控制审计。
添加了对象级别的安全性,限制特定进程针对具体文件或资源的访问权限。可以控制一个给定的应用程序不允许访问用户的私有数据或某网站。
从内核的角度看,在各种系统调用的实现中插入了对MAC的调用,每一个系统调用都必须先通过MAC的验证,然后才能真正处理来自用户态的请求。
MAC是OSX的隔离机制和iOS的entitlement机制的基础。
UNIX从传统上都依赖密码文件 /etc/passwd 和保存密码的散列文件 /etc/shadow 。在OSX单用户(以及iOS)中,使用 /etc/masterpasswd 作为shadow文件,其他情况都放弃了这些密码文件,使用自己的 目录服务 。
目录服务的维护的内容不仅仅是用户和用户组,还保存了系统配置和很多其他方面的信息。
OSX还抛弃了大部分其他配置文件,这些配置文件传统上在UNIX中是当做系统“注册表”使用的。
为了维护系统配置,OSX和IOS使用了一个特殊的守护程序—— configd(8) ,这个守护程序可以加载额外的可加载的bundle(插件),这个bundle位于 /System/Library/SystemConfiguration 下,包含IP和IPv6配置、日志以及其他bundle。
可通过工具 scutil 来流量和查询系统配置。
OSX也继承了传统UNIX的系统日志功能。OSX104引入新的日志模型,ASL,提供了更多特性,例如过滤和搜索。
ASL采用模块化设置,同时提供了以下四种日志接口:
ASL日志收集在 /var/log/asl 目录中。 aslmanager(8) 命令负责管理这些日志,launchd自动运行这条命令。
ASL日志采用的是二进制格式,而不是syslog采用的文本文件。日志文件变小了,但不像syslog那么对grep友好。OSX包含了syslog()命令用于显示和查看日志,并提供了搜索和过滤的功能。
OSX有非常强大的支持脚本的能力。通过osascript(1)命令和友好的Automator应用程序可以访问AppleScript
FSEvents是文件系统的通知API,应用程序可以简单快速地响应文件添加、修改和删除事件。
OSX提供了一个系统级的通知机制,这是分布式IPC的一种形式。
通知机制核心部分在于notifyd(8)守护进程,在系统引导时启动,这是Darwin的通知服务器。
Entitlement可以将一些表现行为良好的应用程序单独从监禁中释放出来。
场景描述:一个Web应用,前端设置了8个具有相同配置的Tomcat服务器,跑在Nginx反向代理后。每个Tomcat服务器运行在一个虚拟机上,要求能对Tomcat服务器的访问日志汇总存储并提供一定的分析能力。
需要的开源软件:Logstash和Elasticsearch。通过在各个虚拟机上安装Logstash收集Tomcat的日志数据,并存储在Elasticsearch中达到日志集中收集和分析的目的。
过程有两个步骤:
一、配置Tomcat的日志存储格式。编辑Tomcat目录下serverxml,填写如下内容
<Host name="localhost" appBase="webapps"
unpackWARs="true" autoDeploy="true">
<!-- Access log processes all example
Documentation at: /docs/config/valvehtml
Note: The pattern used is equivalent to using pattern="common" -->
<Valve className="orgapachecatalinavalvesAccessLogValve" directory="logs" prefix="localhost_access_log" suffix="txt" pattern="%h %l %u %t "%r" %s %b %D "%{Referer}i" "%{User-Agent}i"" />
</Host>
directory表示访问日志存储在Tomcat的logs目录中。
prefix表示日志文件名以localhost_access_log开头。
suffix表示日志文件名以txt截尾。
pattern="%h %l %u %t "%r" %s %b %D "%{Referer}i" "%{User-Agent}i""
pattern这一句非常重要,用于表示日志的存储格式,一般为了方便会使用common或者combined,这两种自定义格式已经可以记录足够详细的信息,我这里使用了自定义方式。在这个pattern描述里:
%h表示访问来源的主机名或者IP地址;
%l表示客户端的标示,通常是 -;
%u表示得到了授权的访问者标示,通常都是 -;
%t表示日志事件的发生时间戳,用 [ 和 ] 括起来的;
"%r"表示用双引号"括起来的访问命令和链接,比如“GET /resource/logopng”;
%s表示HTTP状态码,如200和404等;
%b是服务器返回的数据量,以字节为单位;
%D表示服务器的响应时间,可以用于分析页面的执行效率;
"%{Referer}i"表示用两个双引号括起来的网址,用于告诉服务器这个访问请求是从哪个页面链接过来的;
"%{User-Agent}i"表示用双引号括起来的浏览器的HTTP代理信息,可以得到客户端使用了什么浏览器内核。
二、配置Logstash
1、在每个虚拟机上传logstash安装文件,安装logstash,以222版本为例
rpm -ivh logstash-222-1noarchrpm
2、创建Logstash的工作目录
mkdir /root/logstash_work_dir;mkdir /root/logstash_work_dir/config;mkdir /root/logstash_work_dir/logs;mkdir /root/logstash_work_dir/pid
其中/root/logstash_work_dir是工作目录,config目录用于存储Logstash的配置文件,logs目录用于存储Logstash的日志数据,pid目录用于存储Logstash的pid文件。
3、设置Logstash的运行脚本,修改/etc/initd/logstash中,替换其中的代码如下
LS_WORK_DIR=/root/logstash_work_dir
name=logstash
LS_USER=root
LS_GROUP=root
LS_HOME=/var/lib/logstash
LS_HEAP_SIZE="1g"
pidfile=${LS_WORK_DIR}/pid/$namepid
LS_LOG_DIR=${LS_WORK_DIR}/logs
LS_LOG_FILE=${LS_WORK_DIR}/logs/$namelog
LS_CONF_DIR=${LS_WORK_DIR}/config/root_tomcatconf
LS_OPEN_FILES=16384
LS_NICE=19
LS_OPTS=""
LS_USER和LS_GROUP指定了Logstash进程运行时的用户名和组,我这里使用了root,也可以使用其他权限更低的一般用户和组。
LS_CONF_DIR=${LS_WORK_DIR}/config/root_tomcatconf这一句最重要,指定了Logstash服务运行时的配置文件路径。
4、在/root/logstash_work_dir/config/目录中编写Logstash的配置文件root_tomcatconf,这是本系统最重要的文件。
input {
file {
path => "/root/tomcat/logs/localhost_access_logtxt"
sincedb_path => "/root/logstash_work_dir/config/sincedb_apache_access_logtxt"
type => "apache_access_log"
add_field => {"tomcatip" => "101281861"}
}
}
filter{
if [type] == "apache_access_log" {
grok{
match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(:%{WORD:verb} %{URIPATHPARAM:request}(: HTTP/%{NUMBER:httpversion})|-)\" %{NUMBER:response} (:%{NUMBER:bytes}|-) %{NUMBER:responsetime} \"(:%{URI:referrer}|-)\" %{QS:agent}" }
}
date{
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => ["writetime"]
}
mutate {
convert => {
"response" => "integer"
"bytes" => "integer"
"responsetime" => "integer"
}
}
}
}
output {
if [type] == "apache_access_log" {
elasticsearch {
hosts => ["101281874:9200","101281875:9200","101281877:9200"]
index => "logstash-apacheaccesslog-%{+YYYYMMdd}"
}
}
}
Logstash的配置文件包括input、filter和output三部分。
input部分,使用了file插件。path指定了Logstash扫描的文件,每当有文件变化时,Logstash会读取文件尾部新增的数据;sincedb_path用于存储上一次文件读取的位置信息,如果这个文件不存在,则会从日志文件首部获取所有数据;type用于对这个配置插件做标识,当一个配置文件中有多个数据收集任务时尤其有用;add_field用于标识本机的ip地址,当数据存储在Elasticsearch后,用于区分来自哪一个Tomcat服务器。
filter插件,使用了grok、date和mutate三个插件。
grok插件用于解析Tomcat的访问日志,logstash自带了COMBINEDAPACHELOG等多个配置模式,但由于我使用了自定义的Tomcat日志配置,这里也自己编写;
date部分用于从日志中提取时间戳信息;
mutate中用convert将response、byte和responsetime三个解析得到的字符串转化为整数integer类型,这个步骤对于后续的分析比较重要,因为这样可以在Elasticsearch中做数值比较运算。
output插件,使用了elasticsearch插件,其中hosts指定了Elasticsearch集群的地址,本例子中指定了三个实例;index指定了数据存储在Elasticsearch中的索引名字,以logstash作为开头是因为Logstash自带的针对ELasticsearch的mapping映射中,对于所有的字符串类型都附带设置了一个raw不做解析的设置,这样便于在Elasticsearch中做底层的文本检索。
5、设置chkconfig的启动命令
chkconfig --add logstash
6、启动Logstash服务
service logstash start
0条评论