python实现tailf或者filebeat功能
python实现tailf或者filebeat功能
思路历程:
一开始以为小小的tailf命令功能很容易实现可是真到动手写的时候,还是迷糊了好一会。到到一点思路,又掉进代threading.Condition的坑最后是中间的逻辑,有点绕
功能:
就是一个python版本的tailf
有待扩展:
暂时不
python实现tailf或者filebeat功能
思路历程:
- 一开始以为小小的tailf命令功能很容易实现
- 可是真到动手写的时候,还是迷糊了好一会。
- 到到一点思路,又掉进代threading.Condition的坑
- 最后是中间的逻辑,有点绕
功能:
- 就是一个python版本的tailf
有待扩展:
- 暂时不支持CtrlC结束进程
- 暂时没有实现监控多个文件及发送到kafka或者logstash
直接上代码:
#!/usr/bin/env python
#coding:utf-8
实现思想:
用两个线程:线程1用来读取日志文件,并记录读取位置线程2用发送日条目两个线程之间使用多线程的条件进行控制多个文件时,可以使用pyinotify,真正的可以实现filebeat功能
# from pyinotify import WatchManager,otifier,ProcessEvent,I_MODIFY,I_DELETE
# pyinotify用法略
import os,sys
import threading
import timeclass arError(Exception):pass# 初始化两个全局变量
ret_lines = []
pos = 0# 发送数据线程,可以自己实现发送到kafka/logstash
def Return(cond):with cond:while True:cond.wait()global ret_linesif len(ret_lines) >= 1:for line in ret_lines:print(line.strip())()# 模拟tailf -n参数,当然此脚本是用ar,因此直接指定数字即可
def readFirst(file_path, unit=200, n = 10):while True:ret = []with open(file_path, r) as f:f.seek(0, 2)start_pos = ()global pospos = start_posmy_offset = unit * nif start_pos - my_offset <= 0:dest_pos = 0else:dest_pos = start_pos - my_offsetf.seek(dest_pos, 0)# print(dest_pos)while True:line = f.readline()if line:ret.append(line)else:breakf.seek(start_pos, 0)# 判断获取日志行数是否满足,不满足加大unit,继续循环if len(ret) > n:breakelif len(ret) <= n and dest_pos == 0:breakelif len(ret) <= n and dest_pos != 0:unit = unitcontinuereturn ret# 实时追踪新日志条目
def readContinue(file_path):start_time = ()global ret_linesret_lines = []while True:with open(file_path,r) as f:global posf.seek(pos,0)while True:line = f.readline()if line:cur_pos = ()pos = cur_posret_lines.append(line)elif not line:breakmiddle_time = ()if middle_time - start_time > 1 or len(ret_lines) >= 5:breakelse:time.sleep(0.)continue# 实现tailf的线程
def Tailf(cond,file_path,n=10):with cond:while True:global posif pos == 0:unit = 200ret = readFirst(file_path,unit, n)global ret_linesif len(ret) > n:ret_lines = ret[-n::]else:ret_lines = retelse:readContinue(file_path)()cond.wait()def main(file_path,n=10):cond = threading.Condition()# 这里有个大坑,t1和t2启动顺序,应该是wait()的一方先启动,类似socker server端# 而notify应该后启动.如果notify先启动,会阻塞,并不会报错,后面的顺序也就没办法继续了t1 = threading.Thread(target=Return,args=(cond,))t1.start()t2 = threading.Thread(target=Tailf,args=(cond,file_path,n))t2.start()# t1.join()# t2.join()time.sleep(1800) # 发现Ctrlc无法结束此程序,因此加一个超时时间if __name__ == __main__:if len(sys.ar) < 2:raise arError(请指定tailf文件对象)elif len(sys.ar) == 2:if os.path.isfile(sys.ar[1]):main(file_path=sys.ar[1])else:raise arError(文件不存在)elif len(sys.ar) == :try:line = int(sys.ar[1])except Exception as e:raise arError(第一个参数是数字)if not os.path.isfile(sys.ar[2]):raise arError(文件不存在)main(n=line,file_path = sys.ar[2])elif len(sys.ar) > :raise arError(tailf一次只能监控一个文件)
#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格
上传时间: 2024-02-05 13:18:32
上一篇:中小学计算机网络教室系统部署设计
下一篇:vmware虚拟机与主机共享网络
推荐阅读
留言与评论(共有 15 条评论) |
本站网友 银丰唐郡 | 26分钟前 发表 |
ret.append(line)else | |
本站网友 出国留学体检 | 2分钟前 发表 |
if len(sys.ar) < 2 | |
本站网友 曲阜团购 | 11分钟前 发表 |
0)# 判断获取日志行数是否满足 | |
本站网友 满城尽带黄金甲票房 | 25分钟前 发表 |
ret = []with open(file_path | |
本站网友 北新泾二手房 | 3分钟前 发表 |
I_DELETE # pyinotify用法略 import os | |
本站网友 柚子皮的作用 | 15分钟前 发表 |
args=(cond | |
本站网友 军无戏言 | 14分钟前 发表 |
2)start_pos = ()global pospos = start_posmy_offset = unit * nif start_pos - my_offset <= 0 | |
本站网友 花落去 | 18分钟前 发表 |
unit = 200ret = readFirst(file_path | |
本站网友 霍州煤电 | 4分钟前 发表 |
global posf.seek(pos | |
本站网友 长城证券下载 | 8分钟前 发表 |
main(file_path=sys.ar[1])else | |
本站网友 材质球 | 26分钟前 发表 |
dest_pos = 0else | |
本站网友 团购租车 | 24分钟前 发表 |
ret_lines = ret[-n | |
本站网友 赤小豆是红豆吗 | 7分钟前 发表 |
2)start_pos = ()global pospos = start_posmy_offset = unit * nif start_pos - my_offset <= 0 | |
本站网友 产科 | 23分钟前 发表 |
到到一点思路,又掉进代threading.Condition的坑最后是中间的逻辑,有点绕 功能: 就是一个python版本的tailf 有待扩展: 暂时不支持CtrlC结束进程暂时没有实现监控多个文件及发送到kafka或者logstash 直接上代码: #!/usr/bin/env python #coding |