Skip to content

Commit

Permalink
import audio and video pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmadHAW committed Dec 10, 2024
1 parent 218662b commit bdd3d13
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 56 deletions.
18 changes: 13 additions & 5 deletions backend/src/app/celery/background_jobs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,25 @@ def execute_image_preprocessing_pipeline_(
pipeline.execute(cargo=cargo)


def execute_audio_preprocessing_pipeline_(cargo: PipelineCargo) -> None:
pipeline = prepro.get_audio_pipeline()
def execute_audio_preprocessing_pipeline_(
cargo: PipelineCargo,
is_init: bool = True,
) -> None:
pipeline = prepro.get_audio_pipeline(is_init=is_init)
logger.debug(
f"Executing audio Preprocessing Pipeline\n\t{pipeline}\n\t for cargo"
f" {cargo.ppj_payload.filename}!"
)
pipeline.execute(cargo=cargo)
pipeline.execute(
cargo=cargo,
)


def execute_video_preprocessing_pipeline_(cargo: PipelineCargo) -> None:
pipeline = prepro.get_video_pipeline()
def execute_video_preprocessing_pipeline_(
cargo: PipelineCargo,
is_init: bool = True,
) -> None:
pipeline = prepro.get_video_pipeline(is_init=is_init)
logger.debug(
f"Executing audio Preprocessing Pipeline\n\t{pipeline}\n\t for cargo"
f" {cargo.ppj_payload.filename}!"
Expand Down
14 changes: 10 additions & 4 deletions backend/src/app/celery/background_jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,23 @@ def execute_image_preprocessing_pipeline_task(
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 5, "countdown": 5},
)
def execute_audio_preprocessing_pipeline_task(cargo: PipelineCargo) -> None:
execute_audio_preprocessing_pipeline_(cargo=cargo)
def execute_audio_preprocessing_pipeline_task(
cargo: PipelineCargo,
is_init: bool = True,
) -> None:
execute_audio_preprocessing_pipeline_(cargo=cargo, is_init=is_init)


@celery_worker.task(
acks_late=True,
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 5, "countdown": 5},
)
def execute_video_preprocessing_pipeline_task(cargo: PipelineCargo) -> None:
execute_video_preprocessing_pipeline_(cargo=cargo)
def execute_video_preprocessing_pipeline_task(
cargo: PipelineCargo,
is_init: bool = True,
) -> None:
execute_video_preprocessing_pipeline_(cargo=cargo, is_init=is_init)


@celery_worker.task(
Expand Down
24 changes: 24 additions & 0 deletions backend/src/app/core/data/import_/import_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ def _import_project(
sdoc_name, mime_type
)
sdoc_doctype = get_doc_type(mime_type)
logger.info(f"Sdoc doctype {sdoc_doctype}")
assert sdoc_doctype, "Expected Doctype to be not None."

# move raw sdocs
Expand Down Expand Up @@ -875,8 +876,10 @@ def _import_project(

# 4. init import piplines
from app.celery.background_jobs.tasks import (
execute_audio_preprocessing_pipeline_task,
execute_image_preprocessing_pipeline_task,
execute_text_preprocessing_pipeline_task,
execute_video_preprocessing_pipeline_task,
)

assert isinstance(
Expand All @@ -897,6 +900,27 @@ def _import_project(
for cargo in cargos[DocType.image]
]
tasks.extend(image_tasks)

# 6. init audio pipelines
assert isinstance(
execute_audio_preprocessing_pipeline_task, Task
), "Not a Celery Task"
audio_tasks = [
execute_audio_preprocessing_pipeline_task.s(cargo, is_init=False)
for cargo in cargos[DocType.audio]
]
tasks.extend(audio_tasks)

# 7. init video pipelines
assert isinstance(
execute_video_preprocessing_pipeline_task, Task
), "Not a Celery Task"
video_tasks = [
execute_video_preprocessing_pipeline_task.s(cargo, is_init=False)
for cargo in cargos[DocType.video]
]
tasks.extend(video_tasks)

crud_prepro_job.update(
db=db,
uuid=ppj.id,
Expand Down
6 changes: 4 additions & 2 deletions backend/src/app/preprocessing/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def build_image_pipeline(


@lru_cache(maxsize=1)
def build_audio_pipeline(foo: str = "bar") -> PreprocessingPipeline:
def build_audio_pipeline(is_init: bool = True) -> PreprocessingPipeline:
# we need to import the steps here to avoid loading models at startup
# in the api worker!
from app.preprocessing.pipeline.steps.audio.convert_to_pcm import convert_to_pcm
Expand Down Expand Up @@ -481,7 +481,9 @@ def build_audio_pipeline(foo: str = "bar") -> PreprocessingPipeline:


@lru_cache(maxsize=1)
def build_video_pipeline(foo: str = "bar") -> PreprocessingPipeline:
def build_video_pipeline(
is_init: bool = True,
) -> PreprocessingPipeline:
from app.preprocessing.pipeline.steps.audio.convert_to_pcm import convert_to_pcm
from app.preprocessing.pipeline.steps.audio.create_ffmpeg_probe_audio_metadata import (
create_ffmpeg_probe_audio_metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

def create_ffmpeg_probe_audio_metadata(cargo: PipelineCargo) -> PipelineCargo:
ppad: PreProAudioDoc = cargo.data["ppad"]
ffmpeg_probe = None
for metadata_key in EXPECTED_METADATA:
if metadata_key not in ppad.metadata:
ffmpeg_probe = ffmpeg.probe(ppad.filepath)

if ffmpeg_probe is None:
ffmpeg_probe = ffmpeg.probe(ppad.filepath)
for k, v in ffmpeg_probe["format"].items():
ppad.metadata[k] = v
return cargo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,37 @@

def generate_automatic_transcription(cargo: PipelineCargo) -> PipelineCargo:
ppad: PreProAudioDoc = cargo.data["ppad"]
logger.debug(f"Generating automatic transcription for {ppad.filename} ...")
if ppad.uncompressed_audio_filepath is None:
raise ValueError(
f"Uncompressed audio filepath for {ppad.filename} is None. "
"Please run the 'convert_to_pcm' step first!"
)

# Create Whisper Input
whisper_input = WhisperFilePathInput(
uncompressed_audio_fp=os.path.basename(str(ppad.uncompressed_audio_filepath)),
project_id=ppad.project_id,
)
transcription: WhisperTranscriptionOutput = rms.whisper_transcribe(whisper_input)

# Create Wordlevel Transcriptions
for segment in transcription.segments:
for word in segment.words:
wlt = WordLevelTranscription(
text=word.text,
start_ms=word.start_ms,
end_ms=word.end_ms,
if "word_level_transcriptions" not in ppad.metadata:
logger.debug(f"Generating automatic transcription for {ppad.filename} ...")
if ppad.uncompressed_audio_filepath is None:
raise ValueError(
f"Uncompressed audio filepath for {ppad.filename} is None. "
"Please run the 'convert_to_pcm' step first!"
)
ppad.word_level_transcriptions.append(wlt)

wlt = list(map(lambda wlt: wlt.model_dump(), ppad.word_level_transcriptions))
ppad.metadata["word_level_transcriptions"] = json.dumps(wlt)
# Create Whisper Input
whisper_input = WhisperFilePathInput(
uncompressed_audio_fp=os.path.basename(
str(ppad.uncompressed_audio_filepath)
),
project_id=ppad.project_id,
)
transcription: WhisperTranscriptionOutput = rms.whisper_transcribe(
whisper_input
)

# Create Wordlevel Transcriptions
for segment in transcription.segments:
for word in segment.words:
wlt = WordLevelTranscription(
text=word.text,
start_ms=word.start_ms,
end_ms=word.end_ms,
)
ppad.word_level_transcriptions.append(wlt)

wlt = list(map(lambda wlt: wlt.model_dump(), ppad.word_level_transcriptions))
ppad.metadata["word_level_transcriptions"] = json.dumps(wlt)
else:
logger.info("Import word level transcriptions")
return cargo
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from loguru import logger
from sqlalchemy.orm import Session

from app.core.data.crud.document_tag import crud_document_tag
from app.core.data.orm.source_document import SourceDocumentORM
from app.core.data.repo.repo_service import RepoService
from app.core.db.sql_service import SQLService
from app.preprocessing.pipeline.model.preprodoc_base import PreProDocBase

repo: RepoService = RepoService()
sql: SQLService = SQLService()


def persist_tags(
db: Session, sdoc_db_obj: SourceDocumentORM, ppd: PreProDocBase
) -> None:
logger.info(f"Persisting SourceDocument Tags for {ppd.filename}...")
tags = ppd.tags
if len(tags) > 0:
crud_document_tag.link_multiple_document_tags(
db=db,
sdoc_ids=[sdoc_db_obj.id],
tag_ids=tags,
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from app.core.data.crud.annotation_document import crud_adoc
from app.core.data.crud.code import crud_code
from app.core.data.crud.document_tag import crud_document_tag
from app.core.data.crud.project import crud_project
from app.core.data.crud.source_document import crud_sdoc
from app.core.data.crud.source_document_link import crud_sdoc_link
Expand All @@ -30,6 +29,7 @@
from app.preprocessing.pipeline.model.text.autospan import AutoSpan
from app.preprocessing.pipeline.model.text.preprotextdoc import PreProTextDoc
from app.preprocessing.pipeline.steps.common.persist_sdoc_data import persist_sdoc_data
from app.preprocessing.pipeline.steps.common.persist_tags import persist_tags
from app.util.color import get_next_color

repo: RepoService = RepoService()
Expand Down Expand Up @@ -88,19 +88,6 @@ def _persist_sdoc_metadata(
crud_sdoc_meta.create_multi(db=db, create_dtos=metadata_create_dtos)


def _persist_tags(
db: Session, sdoc_db_obj: SourceDocumentORM, pptd: PreProTextDoc
) -> None:
logger.info(f"Persisting SourceDocument Tags for {pptd.filename}...")
tags = pptd.tags
if len(tags) > 0:
crud_document_tag.link_multiple_document_tags(
db=db,
sdoc_ids=[sdoc_db_obj.id],
tag_ids=tags,
)


def _persist_sdoc_links(
db: Session, sdoc_db_obj: SourceDocumentORM, pptd: PreProTextDoc
) -> None:
Expand Down Expand Up @@ -209,7 +196,7 @@ def write_pptd_to_database(cargo: PipelineCargo) -> PipelineCargo:
_persist_sdoc_metadata(db=db, sdoc_db_obj=sdoc_db_obj, pptd=pptd)

# persist Tags
_persist_tags(db=db, sdoc_db_obj=sdoc_db_obj, pptd=pptd)
persist_tags(db=db, sdoc_db_obj=sdoc_db_obj, ppd=pptd)

# persist SourceDocument Links
_persist_sdoc_links(db=db, sdoc_db_obj=sdoc_db_obj, pptd=pptd)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from loguru import logger

from app.preprocessing.pipeline.model.audio.preproaudiodoc import PreProAudioDoc
from app.preprocessing.pipeline.model.pipeline_cargo import PipelineCargo
from app.preprocessing.pipeline.model.video.preprovideodoc import PreProVideoDoc

POSSIBLE_METADATA = [
"word_level_transcriptions",
"language",
]


def create_ppad_from_video(cargo: PipelineCargo) -> PipelineCargo:
ppvd: PreProVideoDoc = cargo.data["ppvd"]
Expand All @@ -14,6 +21,10 @@ def create_ppad_from_video(cargo: PipelineCargo) -> PipelineCargo:
project_id=ppvd.project_id,
mime_type="audio/mpeg",
)
for metadata_key in POSSIBLE_METADATA:
if metadata_key in ppvd.metadata:
logger.info(f"Passing {metadata_key} from video metadata to audio metadata")
ppad.metadata[metadata_key] = ppvd.metadata[metadata_key]

cargo.data["ppad"] = ppad
return cargo
13 changes: 13 additions & 0 deletions backend/src/app/preprocessing/pipeline/steps/video/create_ppvd.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from loguru import logger

from app.core.data.repo.repo_service import RepoService
from app.preprocessing.pipeline.model.pipeline_cargo import PipelineCargo
from app.preprocessing.pipeline.model.video.preprovideodoc import PreProVideoDoc
Expand All @@ -11,12 +13,23 @@ def create_ppvd(cargo: PipelineCargo) -> PipelineCargo:
)
if not filepath.exists():
raise FileNotFoundError(f"File {filepath} not found in repository!")
additional_parameters = dict()
if "metadata" in cargo.data:
additional_parameters["metadata"] = cargo.data["metadata"]
if "sdoc_link" in cargo.data:
additional_parameters["sdoc_link_create_dtos"] = cargo.data["sdoc_link"]
if "tags" in cargo.data:
additional_parameters["tags"] = cargo.data["tags"]
logger.info(
f"Adding additional parameters to the create PPVD with {additional_parameters}"
)

ppvd = PreProVideoDoc(
filename=cargo.ppj_payload.filename,
project_id=cargo.ppj_payload.project_id,
mime_type=cargo.ppj_payload.mime_type,
filepath=filepath,
**additional_parameters,
)

cargo.data["ppvd"] = ppvd
Expand Down
14 changes: 10 additions & 4 deletions backend/src/app/preprocessing/preprocessing_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,18 +380,24 @@ def get_image_pipeline(self, is_init: bool) -> PreprocessingPipeline:
self._pipelines[DocType.image] = pipeline
return self._pipelines[DocType.image]

def get_audio_pipeline(self) -> PreprocessingPipeline:
def get_audio_pipeline(
self,
is_init: bool = True,
) -> PreprocessingPipeline:
from app.preprocessing.pipeline import build_audio_pipeline

if DocType.audio not in self._pipelines:
pipeline = build_audio_pipeline()
pipeline = build_audio_pipeline(is_init=is_init)
self._pipelines[DocType.audio] = pipeline
return self._pipelines[DocType.audio]

def get_video_pipeline(self) -> PreprocessingPipeline:
def get_video_pipeline(
self,
is_init: bool = True,
) -> PreprocessingPipeline:
from app.preprocessing.pipeline import build_video_pipeline

if DocType.video not in self._pipelines:
pipeline = build_video_pipeline()
pipeline = build_video_pipeline(is_init=is_init)
self._pipelines[DocType.video] = pipeline
return self._pipelines[DocType.video]

0 comments on commit bdd3d13

Please sign in to comment.