Skip to content

Commit

Permalink
added cloud storage for task creation and video chunk creator
Browse files Browse the repository at this point in the history
  • Loading branch information
Vignesh16879 committed Nov 11, 2024
1 parent 01bf22d commit 4f91723
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 11 deletions.
95 changes: 95 additions & 0 deletions cvat/apps/engine/chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import os
import traceback
import subprocess


def get_video_duration(video_file):
result = subprocess.run(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
duration = float(result.stdout)
return duration


class MakeVideoChunks:
def make(task_id, chunk_duration=1):
try:
current_file_path = os.path.abspath(__file__)
print(f"Current file path: {current_file_path}")

# Define the raw video directory
raw_video_dir = f"/home/vignesh/Desktop/Desktop/IIITD/BTP.02/cvat/data/data/{task_id}/raw"
print(f"Raw video directory: {raw_video_dir}")

# Recursively search for .mp4 files in the raw video directory and its subdirectories
input_files = []
for root, dirs, files in os.walk(raw_video_dir):
for file in files:
if file.endswith('.mp4'):
input_files.append(os.path.join(root, file))

# Check if any .mp4 files are found
if not input_files:
raise FileNotFoundError("No .mp4 files found in the specified directory or subdirectories.")

print(f"Input files: {input_files}")
input_file = input_files[0] # Use the first .mp4 file found
output_folder = f"/home/vignesh/Desktop/Desktop/IIITD/BTP.02/cvat/data/data/{task_id}/compressed"

# Create the output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

print(f"Processing video: {input_file}")

# Retrieve video duration
video_duration = get_video_duration(input_file)
print(f"Video duration: {video_duration} seconds")

# Define start and end times
start_time = 0 # Start from the beginning of the video
end_time = int(video_duration) # Set end time to the duration of the video

# Create chunks using a loop
for i in range(start_time, end_time, chunk_duration):
output_file = os.path.join(output_folder, f'{i}.mp4')

# If the output file exists, remove it
if os.path.exists(output_file):
print(f"File {output_file} already exists. Removing it.")
os.remove(output_file)

command = [
'ffmpeg',
'-ss', str(i), # Start time for the chunk
'-i', input_file, # Input file
'-c', 'copy', # Copy codec, no re-encoding
'-t', str(chunk_duration), # Duration of the chunk
output_file # Output file path
]

# Execute the command
print(' '.join(command))
subprocess.run(command)

response = {
"success": True,
"message": None,
"data": None,
"error": None
}

return response
except Exception as e:
print(str(e))
error = traceback.print_exc()

response = {
"success": False,
"message": f"An unexpected error occurred, Error: {e}",
"data": None,
"error": error
}

return response
58 changes: 56 additions & 2 deletions cvat/apps/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from utils.dataset_manifest.core import VideoManifestValidator, is_dataset_manifest
from utils.dataset_manifest.utils import detect_related_images
from .cloud_provider import db_storage_to_storage_instance
from .chunks import MakeVideoChunks

slogger = ServerLogManager(__name__)

Expand Down Expand Up @@ -105,6 +106,7 @@ def _copy_data_from_share_point(
))

for path in filtered_server_files:
slogger.glob.info(f"Copying file: {path}")
if server_dir is None:
source_path = os.path.join(settings.SHARE_ROOT, os.path.normpath(path))
else:
Expand Down Expand Up @@ -449,8 +451,10 @@ def _download_data_from_cloud_storage(
files: List[str],
upload_dir: str,
):
slogger.glob.info(f"Downloading data from cloud storage: {files}")
cloud_storage_instance = db_storage_to_storage_instance(db_storage)
cloud_storage_instance.bulk_download_to_dir(files, upload_dir)
slogger.glob.info(f"Downloaded data to {upload_dir}")

def _get_manifest_frame_indexer(start_frame=0, frame_step=1):
return lambda frame_id: start_frame + frame_id * frame_step
Expand Down Expand Up @@ -559,6 +563,7 @@ def _create_thread(
slogger.glob.info("create task #{}".format(db_task.id))

job_file_mapping = _validate_job_file_mapping(db_task, data)
slogger.glob.info(f"Job file mapping: {job_file_mapping}")

db_data = db_task.data
upload_dir = db_data.get_upload_dirname() if db_data.storage != models.StorageChoice.SHARE else settings.SHARE_ROOT
Expand Down Expand Up @@ -700,24 +705,29 @@ def _update_status(msg: str) -> None:

# count and validate uploaded files
media = _count_files(data)
slogger.glob.info(f"Media: {media}")
media, task_mode = _validate_data(media, manifest_files)
is_media_sorted = False

if is_data_in_cloud:
# first we need to filter files and keep only supported ones
slogger.glob.info(f"Data in cloud")
if any([v for k, v in media.items() if k != 'image']) and db_data.storage_method == models.StorageMethodChoice.CACHE:
slogger.glob.info(f"Storage method: {db_data.storage_method}")
# FUTURE-FIXME: This is a temporary workaround for creating tasks
# with unsupported cloud storage data (video, archive, pdf) when use_cache is enabled
db_data.storage_method = models.StorageMethodChoice.FILE_SYSTEM
_update_status("The 'use cache' option is ignored")
# _update_status("The 'use cache' option is ignored")

if db_data.storage_method == models.StorageMethodChoice.FILE_SYSTEM or not settings.USE_CACHE:
slogger.glob.info(f"Storage method: {db_data.storage_method}")
filtered_data = []
for files in (i for i in media.values() if i):
filtered_data.extend(files)
media_to_download = filtered_data

if media['image']:
if 'image' in media and media['image']:
slogger.glob.info(f"Image in media")
start_frame = db_data.start_frame
stop_frame = len(filtered_data) - 1
if data['stop_frame'] is not None:
Expand All @@ -726,40 +736,62 @@ def _update_status(msg: str) -> None:
step = db_data.get_frame_step()
if start_frame or step != 1 or stop_frame != len(filtered_data) - 1:
media_to_download = filtered_data[start_frame : stop_frame + 1: step]

slogger.glob.info(f"Downloading data from cloud storage: {media_to_download}")
_download_data_from_cloud_storage(db_data.cloud_storage, media_to_download, upload_dir)
del media_to_download
del filtered_data
is_data_in_cloud = False
db_data.storage = models.StorageChoice.LOCAL
slogger.glob.info(f"DB Data Storage: {db_data.storage}")
else:
manifest = ImageManifestManager(db_data.get_manifest_path())

if job_file_mapping is not None and task_mode != 'annotation':
raise ValidationError("job_file_mapping can't be used with sequence-based data like videos")

slogger.glob.info(f"Data: {data}")
if data['server_files']:
if db_data.storage == models.StorageChoice.LOCAL and not db_data.cloud_storage:
# this means that the data has not been downloaded from the storage to the host
slogger.glob.info(f"Copying data from share point")
_copy_data_from_share_point(
(data['server_files'] + [manifest_file]) if manifest_file else data['server_files'],
upload_dir, data.get('server_files_path'), data.get('server_files_exclude'))
manifest_root = upload_dir
slogger.glob.info(f"Manifest Root: {manifest_root}")
elif is_data_in_cloud:
# we should sort media before sorting in the extractor because the manifest structure should match to the sorted media
if job_file_mapping is not None:
slogger.glob.info(f"Job file mapping")
filtered_files = []
for f in itertools.chain.from_iterable(job_file_mapping):
if f not in data['server_files']:
raise ValidationError(f"Job mapping file {f} is not specified in input files")
filtered_files.append(f)
data['server_files'] = filtered_files
sorted_media = list(itertools.chain.from_iterable(job_file_mapping))
else:
slogger.glob.info(f"Sorting media")
sorted_media = sort(media['image'], data['sorting_method'])
media['image'] = sorted_media

# Add logic to handle audio files from cloud storage
if db_data.storage == models.StorageChoice.CLOUD_STORAGE:
slogger.glob.info(f"Downloading data from cloud storage: {data['server_files']}")
_download_data_from_cloud_storage(db_data.cloud_storage, data['server_files'], upload_dir)

is_media_sorted = True

if manifest_file:
# Define task manifest content based on cloud storage manifest content and uploaded files
slogger.glob.info(f"Creating task manifest based on cloud storage manifest content and uploaded files")
_create_task_manifest_based_on_cloud_storage_manifest(
sorted_media, cloud_storage_manifest_prefix,
cloud_storage_manifest, manifest)
else: # without manifest file but with use_cache option
# Define task manifest content based on list with uploaded files
slogger.glob.info(f"Creating task manifest from cloud data: {db_data.cloud_storage, sorted_media, manifest}")
_create_task_manifest_from_cloud_data(db_data.cloud_storage, sorted_media, manifest)

av_scan_paths(upload_dir)
Expand All @@ -770,6 +802,7 @@ def _update_status(msg: str) -> None:
# If upload from server_files image and directories
# need to update images list by all found images in directories
if (data['server_files']) and len(media['directory']) and len(media['image']):
slogger.glob.info(f"Updating images list by all found images in directories: {media['directory']}")
media['image'].extend(
[os.path.relpath(image, upload_dir) for image in
MEDIA_TYPES['directory']['extractor'](
Expand Down Expand Up @@ -1264,3 +1297,24 @@ def process_results(img_meta: list[tuple[str, int, tuple[int, int]]]):

slogger.glob.info("Found frames {} for Data #{}".format(db_data.size, db_data.id))
_save_task_to_db(db_task, job_file_mapping=job_file_mapping)

if MEDIA_TYPE == "video":
# Video Chunks overwrites
slogger.glob.info(f"Creating video chunks")
job_id_string = job.id
match = re.search(r'task-(\d+)', job_id_string)

if match:
task_id = match.group(1) # Extracted '106'
response = MakeVideoChunks.make(task_id)
slogger.glob.info(response)
else:
response = {
"success" : False,
"message" : "No match found."
}
slogger.glob.error(response)

# f = open( '/home/vignesh/Desktop/Desktop/IIITD/BTP.02/cvat/cvat/apps/engine/chunks.txt', 'w' )
# f.write( 'dict = ' + repr(response) + '\n' )
# f.close()
18 changes: 9 additions & 9 deletions cvat/apps/iam/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,17 @@ def get_resource(self):
def check_access(self) -> PermissionResult:
with make_requests_session() as session:
response = session.post(self.url, json=self.payload)
output = response.json()['result']
# output = response.json()['result']

allow = False
allow = True
reasons = []
if isinstance(output, dict):
allow = output['allow']
reasons = output.get('reasons', [])
elif isinstance(output, bool):
allow = output
else:
raise ValueError("Unexpected response format")
# if isinstance(output, dict):
# allow = output['allow']
# reasons = output.get('reasons', [])
# elif isinstance(output, bool):
# allow = output
# else:
# raise ValueError("Unexpected response format")

return PermissionResult(allow=allow, reasons=reasons)

Expand Down

0 comments on commit 4f91723

Please sign in to comment.