-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrigger-pipeline.py
executable file
·50 lines (42 loc) · 1.78 KB
/
trigger-pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/usr/bin/env python3
import inotify.adapters
import requests
import os
import datetime
print("Starting up.")
config = {
"url": os.environ.get("PIPELINE_URL", "http://example.org"),
"token": os.environ.get("PIPELINE_TOKEN", "glptt-foobar"),
"directory": os.environ.get("WATCH_DIR", "/sedex-interface/inbox")
}
def _main():
validate_config(config)
i = inotify.adapters.Inotify()
i.add_watch(config["directory"])
print(f"Watching directory {config['directory']} and will trigger {config['url']} when new files arrive.")
for event in i.event_gen(yield_nones=False):
(_, type_names, path, filename) = event
# We want to wait for the file to finish being written before we trigger the pipeline
if 'IN_CLOSE_WRITE' in type_names:
trigger_pipeline(filename)
def trigger_pipeline(filename = ''):
print(f"{datetime.datetime.now()} New file {filename} detected in inbox, triggering pipeline at {config['url']}")
data = {
"token": config["token"],
"ref": "main"
}
try:
response = requests.post(config["url"], data=data)
print(f"{datetime.datetime.now()} Response code: {response.status_code}")
except Exception as e:
print(f"{datetime.datetime.now()} ERROR: Exception while sending POST request to {config['url']}")
print(f"Exception is: {e}")
print(f"Response code: {response.status_code}")
print(f"Response: {response.content.decode()}")
# Only validates the watch dir, but feel free to add more
def validate_config(config):
directory = config["directory"]
if not (os.path.exists(directory) and os.path.isdir(directory)):
exit(f"{directory} does not exist or isn't a directory")
if __name__ == '__main__':
_main()