-
Notifications
You must be signed in to change notification settings - Fork 0
/
ext_trigger.py
247 lines (216 loc) · 9.21 KB
/
ext_trigger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
#! /usr/bin/python3
#========================================================================
#
# Poll a list of cameras and if they report motion detected,
# user the ZMTrigger socket interface to trigger recording.
#
#========================================================================
#
# Don't forget to activate the trigger port on ZoneMinder before
# starting this app. See https://wiki.zoneminder.com/ZMTrigger
#
# To start this script at boot enter at the command line:
# sudo crontab -e
#
# Select your editor is prompted and add to the end of the file:
# @reboot /path/to/you/script/ext_trigger.py
#
import requests
import time
import socket
import logging
from logging.handlers import RotatingFileHandler
#========================================================================
# Control values
#
log_file = r'/var/log/ext_trigger.log'
# If running this app on a different machine to the ZoneMinder app then
# specify the IPv4 address of ZoneMinder
# zm_ipaddress = 'X.X.X.X'
zm_ipaddress = '127.0.0.1'
zm_port = 6802
#========================================================================
# Populate this array of dictionaries with you camera data
# You can customise the motion_query string per camera if you have
# a mixed instalation.
#
# Make sure the zoneminder_id matches the associated camera
#
Cameras = [
{
'name': 'A Cam',
'ipaddr': 'X.X.X.X',
'zoneminder_id': 1,
'motion_query': 'http://{ip}/api.cgi?cmd=GetMdState&user={user}&password={passwd}',
'username': 'your_username',
'password': 'your_password'
},
{
'name': 'B Cam',
'ipaddr': 'X.X.X.X',
'zoneminder_id': 2,
'motion_query': 'http://{ip}/api.cgi?cmd=GetMdState&user={user}&password={passwd}',
'username': 'your_username',
'password': 'your_password'
}
]
Initial_Boot_Delay = 60 # Units: seconds Allows ZoneMinder to fully boot
Trigger_Time = 20 # Units: seconds
Inter_Camera_Delay = 0.5 # Units: seconds Specifies the delay between checking cameras.
# Total polling interval will be this x No. of cameras.
#========================================================================
# Setup logging
#
logger = logging.getLogger('ext_trigger_log')
handler = RotatingFileHandler(log_file, maxBytes=20000, backupCount=5)
logger.addHandler(handler)
handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s '
'[in %(pathname)s:%(lineno)d]'))
logger.setLevel(logging.INFO) # DEBUG INFO WARNING ERROR
#========================================================================
# Query a camera to se if motion has been detected
# Returns:
# 0 No motion reported
# 1 Motion reported
# 2 Error getting data
#
def check_cam_for_motion( Camera ):
headers = {'Accept': 'application/json'}
url = Camera['motion_query'].format( ip=Camera['ipaddr'], user=Camera['username'], passwd=Camera['password'] )
try:
resp = requests.get(url, headers=headers)
except Exception as error:
logger.error('Error checking motion state. Camera {} IPv4 {} Username {} Response {}'.format(
Camera['name'], Camera['ipaddr'], Camera['username'], error
))
return 2
#=======================================================================
# For a Reolink camera the response to the query is JSON in the format:
#
# Good response:
# [{'cmd': 'GetMdState', 'code': 0, 'value': {'state': 0}}]
# Failure response (example):
# [{'cmd': 'GetMdState', 'code': 1, 'error': {'detail': 'invalid user', 'rspCode': -27}}]
#
# Check your cameras documentation for the response it gives and then
# customise this section for your system.
#
# This code is deliberatly verbose to make customisation quicker & easier.
#
logger.debug("Response to motion query: {}".format(resp.json()))
data = resp.json()
data = data[0]
logger.debug("Data from motion query: {}".format(data))
if 'cmd' not in data:
# The query response was not understood, no cmd element was returned
logger.error('Error the response to the motion query was not understood (cmd attr). Camera {} IPv4 {} Username {} Response {}'.format(
Camera['name'], Camera['ipaddr'], Camera['username'], data
))
return 2
if data['cmd'] != 'GetMdState':
# The query response was not understood, the command answered does not match the expected one
logger.error('Error the response to the motion query was not understood (cmd value). Camera {} IPv4 {} Username {} Response {}'.format(
Camera['name'], Camera['ipaddr'], Camera['username'], data
))
return 2
if 'code' not in data:
# The query response was not understood, the camera didn't respond in the expected format
logger.error('Error the response to the motion query was not understood (code attr). Camera {} IPv4 {} Username {} Response {}'.format(
Camera['name'], Camera['ipaddr'], Camera['username'], data
))
return 2
if data['code'] == 1:
# The query command did not execute, the camera wouldn't execute the command. This is often caused by bad credentials
logger.error('Error the response to the motion query was not executed. Camera {} IPv4 {} Username {} Response {}'.format(
Camera['name'], Camera['ipaddr'], Camera['username'], data[0]['error']
))
return 2
if 'value' not in data:
# The query response was not understood, the camera didn't respond in the expected format
logger.error('Error the response to the motion query was not understood. Camera {} IPv4 {} Username {} Response {}'.format(
Camera['name'], Camera['ipaddr'], Camera['username'], data
))
return 2
# The query command executed, test if motion was detected value->state = 0 or 1 (on Reolink)
if data['value']['state'] == 1:
return 1
return 0
#========================================================================
# Open a socket to the ZoneMinder machine and send the trigger record
# command.
#
# Returns:
# 0 Command sent
# 1 Error while sending command.
#
def send_trigger( cam ):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.connect((zm_ipaddress, zm_port))
except Exception as error:
logger.error("Can't open socket to ZoneMinder address {} port {} respose {}".format(zm_ipaddress, zm_port, error))
return 1
id_bytes = bytearray(str(cam['zoneminder_id']), 'utf8')
Time = bytearray(str(Trigger_Time), 'utf8')
cmd = id_bytes + b'|on+' + Time + b'|1|External Motion|Ext Python trigger|'
logger.debug("Sending trigger for {} id {} command {}".format(cam['name'], cam['zoneminder_id'], cmd))
try:
s.sendall(cmd)
except Exception as error:
logger.error("Can't transfer command to ZoneMinder address {} port {} respose {}".format(zm_ipaddress, zm_port, error))
return 1
try:
reply = s.recv(1024) # The server has received the command
except Exception as error:
logger.error("No response after trigger command address {} port {} respose {}".format(zm_ipaddress, zm_port, error))
return 1
logger.debug("Reply {}".format(reply))
return 0
#=============================================================
# Main function
#
def main():
logger.info('External camera trigger app starting.')
time.sleep(Initial_Boot_Delay)
logger.info('Checking configuration.')
Checked_Cameras = []
App_Running = True
#=============================================================
# Initial parameter check,
# Check all cameras are accessable and respond as expected
# to a query
for cam in Cameras:
if check_cam_for_motion(cam) != 2:
Checked_Cameras.append(cam)
logger.debug("Camera checked ok: {}".format(cam['name']))
else:
logger.error("Camera failed test check: {}".format(cam['name']))
if len(Checked_Cameras) == 0:
App_Running = False
logger.info('External camera trigger app exiting, no cameras to check.')
#=============================================================
# Initial parameter check,
# Check that the ZoneMinder machine accepts a trigger signal
#
trigger_rnt = send_trigger( Checked_Cameras[0] )
if trigger_rnt != 0:
logger.error("Failed to send trigger to ZoneMinder: {}".format( zm_ipaddress ))
App_Running = False
#=============================================================
# Run continuous checking
while App_Running:
for cam in Checked_Cameras:
rtn = check_cam_for_motion(cam)
if rtn == 2:
logger.warning("Camera failed motion poll: {}".format(cam['name']))
else:
if rtn == 1:
logger.debug("Motion reported: {}".format(cam['name']))
trigger_rnt = send_trigger( cam )
if trigger_rnt != 0:
logger.warning("Failed to send trigger to ZoneMinder: {}".format( zm_ipaddress ))
time.sleep( Inter_Camera_Delay )
return
if __name__ == '__main__':
main()