-
Notifications
You must be signed in to change notification settings - Fork 41
/
tail.py
75 lines (63 loc) · 2.58 KB
/
tail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#!/usr/bin/env python
#coding=utf-8
# https://github.com/kasun/python-tail/blob/master/tail.py
'''
Python-Tail - Unix tail follow implementation in Python.
python-tail can be used to monitor changes to a file.
Example:
import tail
# Create a tail instance
t = tail.Tail('file-to-be-followed')
# Register a callback function to be called when a new line is found in the followed file.
# If no callback function is registerd, new lines would be printed to standard out.
t.register_callback(callback_function)
# Follow the file with 5 seconds as sleep time between iterations.
# If sleep time is not provided 1 second is used as the default time.
t.follow(s=5) '''
# Author - Kasun Herath <kasunh01 at gmail.com>
# Source - https://github.com/kasun/python-tail
import os
import sys
import time
class Tail(object):
''' Represents a tail command. '''
def __init__(self, tailed_file):
''' Initiate a Tail instance.
Check for file validity, assigns callback function to standard out.
Arguments:
tailed_file - File to be followed. '''
self.check_file_validity(tailed_file)
self.tailed_file = tailed_file
self.callback = sys.stdout.write
def follow(self, s=1):
''' Do a tail follow. If a callback function is registered it is called with every new line.
Else printed to standard out.
Arguments:
s - Number of seconds to wait between each iteration; Defaults to 1. '''
with open(self.tailed_file) as file_:
# Go to the end of file
file_.seek(0,2)
while True:
curr_position = file_.tell()
line = file_.readline()
if not line:
file_.seek(curr_position)
time.sleep(s)
else:
self.callback(line)
def register_callback(self, func):
''' Overrides default callback function to provided function. '''
self.callback = func
def check_file_validity(self, file_):
''' Check whether the a given file exists, readable and is a file '''
if not os.access(file_, os.F_OK):
raise TailError("File '%s' does not exist" % (file_))
if not os.access(file_, os.R_OK):
raise TailError("File '%s' not readable" % (file_))
if os.path.isdir(file_):
raise TailError("File '%s' is a directory" % (file_))
class TailError(Exception):
def __init__(self, msg):
self.message = msg
def __str__(self):
return self.message