-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdashboard_videoplayer.py
70 lines (60 loc) · 2.44 KB
/
dashboard_videoplayer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from dash import Dash, html, dcc, Output, Input
import dash_bootstrap_components as dbc
import cv2
import base64
import numpy as np
import threading
class Video:
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.ret = None
self.urlRtsp = "VIDEO_PATH"
self.frame = np.zeros((512, 512, 3), dtype="uint8")
self.capture = cv2.VideoCapture(self.urlRtsp)
self.processed_frame = None
self.output_frame = None
self.th_processing_frame = threading.Thread(target=self.update_frame, daemon=True)
self.th_read_frame = threading.Thread(target=self.read_capture, daemon=True)
self.th_read_frame.start()
self.th_processing_frame.start()
def update_frame(self):
while True:
self.video_processing()
encoded_frame = cv2.imencode('.jpg', self.processed_frame.copy())[1]
self.output_frame = base64.b64encode(encoded_frame).decode('utf-8')
def read_capture(self):
while True:
i_errors_read = 0
try:
self.ret, self.frame = self.capture.read()
if self.ret == False:
i_errors_read += 1
if i_errors_read >= 10:
self.capture = cv2.VideoCapture(self.urlRtsp)
except:
self.frame = cv2.VideoCapture(self.urlRtsp)
def frame_processing(self):
return self.frame
def video_processing(self):
self.processed_frame = self.frame_processing()
class Dashboard:
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.app = Dash(__name__, external_stylesheets=[dbc.themes.DARKLY])
self.video_player = Video()
self.app.layout = html.Div([
html.Img(style={'width': '920px', 'height': '800px', 'margin': '10px', 'margin-top': '50px'}, id='video_player'),
dcc.Interval(
id='videoframe_fps',
interval=1000/60,
n_intervals=0),
])
@self.app.callback(
[Output(component_id='video_player', component_property='src')],
[Input('videoframe_fps', 'n_intervals')]
)
def update_frame(interval):
self.output_frame = 'data:image/jpg;base64,{}'.format(self.video_player.output_frame)
return [self.output_frame]
if __name__ == '__main__':
Dashboard().app.run_server(debug=True)