forked from MHarbi/bagedit
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbag2mp4.py
executable file
·56 lines (49 loc) · 2.29 KB
/
bag2mp4.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python3
import sys
import rosbag
import os
import argparse
from contextlib import ExitStack
from tqdm import tqdm
from sensor_msgs.msg import CompressedImage
import subprocess
def save_mp4(bag, topics, output_dir, write_frames=False, framerate=0):
"""Extract a h264 file from a rosbag.
"""
topic: str
filenames = [os.path.join(output_dir, "%s.264" % topic[1:].replace("/", ".")) for topic in topics]
with ExitStack() as stack:
files = {
topic: stack.enter_context(open(filename, 'wb'))
for topic, filename in zip(topics, filenames)
}
msg: CompressedImage
for topic, msg, t in tqdm(bag.read_messages(topics=topics), desc="Extracting images"):
files[topic].write(msg.data)
for filename in filenames:
mp4_filename = filename.replace(".264", ".mp4")
frame_dir = os.path.join(output_dir, filename.replace(".264", ""))
if write_frames:
os.makedirs(frame_dir, exist_ok=True)
ret = subprocess.call(
['ffmpeg', '-i', filename, '-c', 'copy', mp4_filename])
if ret == 0:
os.remove(filename)
if write_frames:
ret = subprocess.call(
['ffmpeg', '-i', mp4_filename] + (['-vf', f'fps={framerate}'] if framerate > 0 else []) + [os.path.join(frame_dir, 'frame-%06d.png')]
)
if ret != 0:
print(f"ffmpeg failed on file {filename} with error code {ret}", file=sys.stderr)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Extract images from a ROS bag.")
parser.add_argument("bag_file", help="Input ROS bag.")
parser.add_argument("output_dir", help="Output directory.")
parser.add_argument("image_topic", nargs="+", help="Image topic(s).")
parser.add_argument("--write_frames", action="store_true", help="Whether to output each frame as an image file; use --framerate to determine number of frames")
parser.add_argument("--framerate", default=0, type=int, help="Framerate to write individual frames (0 = write every frame)")
args = parser.parse_args()
bag = rosbag.Bag(args.bag_file, "r")
os.makedirs(args.output_dir, exist_ok=True)
save_mp4(bag, args.image_topic, args.output_dir, args.write_frames, args.framerate)
bag.close()