前一段时间做了一个App,出于完整性的考虑,打算做一个完整的平台,包括监控等.所以,现在重点就在做监控方面.
我们对监控系统的初步需求是:
- 能够获取到网络延时较大的接口,包括调用时间,网络延时的大小,调用的接口名称
- 能够显示出现错误的接口,包括接口的名称,传入的参数,调用时间等
为了实现这个监控系统,我们初步的打算是:
通过类Unix系统上提供的Logrotate工具,对服务器端日志进行切割,切割成log_{YY}{MM}{DD}.log的形式,然后通过Flume收集服务器端日志,发送给Storm,然后通过Storm进行数据的统计,然后让Storm通过WebSocket发送给前端WebUI.
然而,在实现时,遇到了几个问题.
首先,Flume的Source中,没有一个支持日志切换的Source.Flume中和我们的需求比较相似的有TailDirSource,它能够根据你配置的文件的过滤规则,实现当文件被修改了之后,发送那些添加的内容.而我们要求的Source,不仅需要具有当有日志追加到日志文件之后,将追加的日志发送出去的功能,还需要具有能够切换读取的日志的功能.比如说,今天是2017年08月04号,那么今天读取的日志就是log_2017_08_04.log,而到了明天,就要读取log_2017_08_05.log这个日志.
为了实现这个功能,我们在TailDirSource的基础上,进行了扩展,加上了切换读取的日志的功能.
另外,通过研究TailDirSource的实现,我们发现它是采用的IO的方式,每隔一秒便按照预先设定的批次尺寸读取特定数目的追加的文本.这样便会有一些问题.比如说,我们设定了读取的间隔是1s,然后每次读取的时候,是读取10w行文本.如果在这一秒中,增加了100w行文本,那么剩下的90w行文本,就得在下一次读取时才能被读取到.这样,随着时间的增长,积累的没有被读到的文本会越来越多.就达不到实时分析的目的了.
意识到这一点之后,我们对这部分的实现进行了改写,通过使用NIO的方式,使得每次有日志追加到日志文件时,触发读取操作,将追加的日志发送出去.
在将日志发送出去的时候,我们本来并没有打算加上Kafka这样一个消息队列,本来打算直接通过Flume的Thrift Sink直接连接到Storm的Thrift Spout,后来查看Flume的文档时,发现Flume的Thrift Sink好像只能发送数据给Flume的Thrift Source,这就很尴尬了.不得已,只好寻找其他的途径.发现Flume中有一个Kafka Sink,Storm中正好也有一个Kafka Spout,就打算这样来做.
后来,在阅读Kafka的日志的时候,发现Kafka中,提供了流处理的功能,是Kafka Stream API.论强大程度的话,我觉得肯定还是比不上Storm的,但是,对于我们这种很简单的用例,也能满足我们的需求了.于是,就将项目结构改成了下面的这种形式:
通过类Unix系统上提供的Logrotate工具,对服务器端日志进行切割,切割成log_{YY}{MM}{DD}.log的形式,然后通过Flume收集服务器端日志,发送给Kafka进行统计,然后通过WebSocket发送给前端WebUI.
这时,一个最大的问题摆在我们面前,就是Kafka并没有提供WebSocket的功能.Google之后,发现了一个方案是通过Reactive Steams + Akka Streams来实现,看了半天没看懂.就只能另避蹊径了.
最终结构图如下:
在上图中,我们可以看到,我们是把Kafka Consumer放到一个WebApplication中了.
这个结构图是我在参考了网络上一位朋友的代码之后,得出来的.在此之前,我一直一位写的跟Kafka相关的代码需要通过Kafka提供的kafka-run-class.sh脚本来运行,后来发现并不是这样.
这部分的具体实现,我会在后面的一篇文章中进行介绍.