您现在的位置是:首页 > 数码 > 

python实现tailf或者filebeat功能

2025-07-27 21:22:50
python实现tailf或者filebeat功能 思路历程: 一开始以为小小的tailf命令功能很容易实现可是真到动手写的时候,还是迷糊了好一会。到到一点思路,又掉进代threading.Condition的坑最后是中间的逻辑,有点绕 功能: 就是一个python版本的tailf 有待扩展: 暂时不

python实现tailf或者filebeat功能

思路历程:

  • 一开始以为小小的tailf命令功能很容易实现
  • 可是真到动手写的时候,还是迷糊了好一会。
  • 到到一点思路,又掉进代threading.Condition的坑
  • 最后是中间的逻辑,有点绕

功能:

  • 就是一个python版本的tailf

有待扩展:

  • 暂时不支持CtrlC结束进程
  • 暂时没有实现监控多个文件及发送到kafka或者logstash

直接上代码:

#!/usr/bin/env python
#coding:utf-8
实现思想:
用两个线程:线程1用来读取日志文件,并记录读取位置线程2用发送日条目两个线程之间使用多线程的条件进行控制多个文件时,可以使用pyinotify,真正的可以实现filebeat功能
# from pyinotify import WatchManager,otifier,ProcessEvent,I_MODIFY,I_DELETE
# pyinotify用法略
import os,sys
import threading
import timeclass arError(Exception):pass# 初始化两个全局变量
ret_lines = []
pos = 0# 发送数据线程,可以自己实现发送到kafka/logstash
def Return(cond):with cond:while True:cond.wait()global ret_linesif len(ret_lines) >= 1:for line in ret_lines:print(line.strip())()# 模拟tailf -n参数,当然此脚本是用ar,因此直接指定数字即可
def readFirst(file_path, unit=200, n = 10):while True:ret = []with open(file_path, r) as f:f.seek(0, 2)start_pos = ()global pospos = start_posmy_offset = unit * nif start_pos - my_offset <= 0:dest_pos = 0else:dest_pos = start_pos - my_offsetf.seek(dest_pos, 0)# print(dest_pos)while True:line = f.readline()if line:ret.append(line)else:breakf.seek(start_pos, 0)# 判断获取日志行数是否满足,不满足加大unit,继续循环if len(ret) > n:breakelif len(ret) <= n and dest_pos == 0:breakelif len(ret) <= n and dest_pos != 0:unit = unitcontinuereturn ret# 实时追踪新日志条目
def readContinue(file_path):start_time = ()global ret_linesret_lines = []while True:with open(file_path,r) as f:global posf.seek(pos,0)while True:line = f.readline()if line:cur_pos = ()pos = cur_posret_lines.append(line)elif not line:breakmiddle_time = ()if middle_time - start_time > 1 or len(ret_lines) >= 5:breakelse:time.sleep(0.)continue# 实现tailf的线程
def Tailf(cond,file_path,n=10):with cond:while True:global posif pos == 0:unit = 200ret = readFirst(file_path,unit, n)global ret_linesif len(ret) > n:ret_lines = ret[-n::]else:ret_lines = retelse:readContinue(file_path)()cond.wait()def main(file_path,n=10):cond = threading.Condition()# 这里有个大坑,t1和t2启动顺序,应该是wait()的一方先启动,类似socker server端# 而notify应该后启动.如果notify先启动,会阻塞,并不会报错,后面的顺序也就没办法继续了t1 = threading.Thread(target=Return,args=(cond,))t1.start()t2 = threading.Thread(target=Tailf,args=(cond,file_path,n))t2.start()# t1.join()# t2.join()time.sleep(1800)  # 发现Ctrlc无法结束此程序,因此加一个超时时间if __name__ == __main__:if len(sys.ar) < 2:raise arError(请指定tailf文件对象)elif len(sys.ar) == 2:if os.path.isfile(sys.ar[1]):main(file_path=sys.ar[1])else:raise arError(文件不存在)elif len(sys.ar) == :try:line = int(sys.ar[1])except Exception as e:raise arError(第一个参数是数字)if not os.path.isfile(sys.ar[2]):raise arError(文件不存在)main(n=line,file_path = sys.ar[2])elif len(sys.ar) > :raise arError(tailf一次只能监控一个文件)

#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格

本文地址:http://www.dnpztj.cn/shuma/845699.html

相关标签:无
上传时间: 2024-02-05 13:18:32
留言与评论(共有 15 条评论)
本站网友 银丰唐郡
26分钟前 发表
ret.append(line)else
本站网友 出国留学体检
2分钟前 发表
if len(sys.ar) < 2
本站网友 曲阜团购
11分钟前 发表
0)# 判断获取日志行数是否满足
本站网友 满城尽带黄金甲票房
25分钟前 发表
ret = []with open(file_path
本站网友 北新泾二手房
3分钟前 发表
I_DELETE # pyinotify用法略 import os
本站网友 柚子皮的作用
15分钟前 发表
args=(cond
本站网友 军无戏言
14分钟前 发表
2)start_pos = ()global pospos = start_posmy_offset = unit * nif start_pos - my_offset <= 0
本站网友 花落去
18分钟前 发表
unit = 200ret = readFirst(file_path
本站网友 霍州煤电
4分钟前 发表
global posf.seek(pos
本站网友 长城证券下载
8分钟前 发表
main(file_path=sys.ar[1])else
本站网友 材质球
26分钟前 发表
dest_pos = 0else
本站网友 团购租车
24分钟前 发表
ret_lines = ret[-n
本站网友 赤小豆是红豆吗
7分钟前 发表
2)start_pos = ()global pospos = start_posmy_offset = unit * nif start_pos - my_offset <= 0
本站网友 产科
23分钟前 发表
到到一点思路,又掉进代threading.Condition的坑最后是中间的逻辑,有点绕 功能: 就是一个python版本的tailf 有待扩展: 暂时不支持CtrlC结束进程暂时没有实现监控多个文件及发送到kafka或者logstash 直接上代码: #!/usr/bin/env python #coding