Skip to content

Commit

Permalink
changing audio video and image pipeline, so transcript is not saved s…
Browse files Browse the repository at this point in the history
…eperatly.
  • Loading branch information
AhmadHAW committed Dec 3, 2024
1 parent 326003c commit f54b588
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 143 deletions.
127 changes: 103 additions & 24 deletions backend/src/app/preprocessing/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
def build_text_pipeline(
is_init: bool = True,
) -> PreprocessingPipeline:
from app.preprocessing.pipeline.steps.common.detect_content_language import (
detect_content_language,
)
from app.preprocessing.pipeline.steps.common.remove_erroneous_sdoc import (
remove_erroneous_or_unfinished_sdocs,
)
Expand All @@ -27,9 +30,6 @@ def build_text_pipeline(
from app.preprocessing.pipeline.steps.text.create_pptd import (
create_pptd,
)
from app.preprocessing.pipeline.steps.text.detect_content_language import (
detect_content_language,
)
from app.preprocessing.pipeline.steps.text.extract_content_in_html_from_raw_text_docs import (
extract_content_in_html_from_raw_text_docs,
)
Expand Down Expand Up @@ -210,7 +210,7 @@ def build_image_pipeline(
run_object_detection,
)
from app.preprocessing.pipeline.steps.image.store_metadata_to_database import (
store_metadata_to_database,
store_metadata_and_data_to_database,
)
from app.preprocessing.pipeline.steps.image.write_ppid_to_database import (
write_ppid_to_database,
Expand Down Expand Up @@ -277,7 +277,7 @@ def build_image_pipeline(

pipeline.register_step(
func=create_pptd_from_caption,
required_data=["ppid", "sdoc_id"],
required_data=["ppid"],
)

# run caption through spacy and add to elasticsearch to make it searchable
Expand Down Expand Up @@ -307,7 +307,7 @@ def build_image_pipeline(
)

pipeline.register_step(
func=store_metadata_to_database,
func=store_metadata_and_data_to_database,
required_data=[
"pptd",
"ppid",
Expand Down Expand Up @@ -348,11 +348,14 @@ def build_audio_pipeline(foo: str = "bar") -> PreprocessingPipeline:
generate_webp_thumbnail_for_audio,
)
from app.preprocessing.pipeline.steps.audio.store_metadata_to_database import (
store_metadata_to_database,
store_metadata_and_data_to_database,
)
from app.preprocessing.pipeline.steps.audio.write_ppad_to_database import (
write_ppad_to_database,
)
from app.preprocessing.pipeline.steps.common.detect_content_language import (
detect_content_language,
)
from app.preprocessing.pipeline.steps.common.remove_erroneous_sdoc import (
remove_erroneous_or_unfinished_sdocs,
)
Expand All @@ -362,9 +365,6 @@ def build_audio_pipeline(foo: str = "bar") -> PreprocessingPipeline:
from app.preprocessing.pipeline.steps.common.update_sdoc_status_to_finish import (
update_sdoc_status_to_finish,
)
from app.preprocessing.pipeline.steps.text.detect_content_language import (
detect_content_language,
)
from app.preprocessing.pipeline.steps.text.generate_keywords import (
generate_keywords,
)
Expand Down Expand Up @@ -458,7 +458,7 @@ def build_audio_pipeline(foo: str = "bar") -> PreprocessingPipeline:
)

pipeline.register_step(
func=store_metadata_to_database,
func=store_metadata_and_data_to_database,
required_data=[
"pptd",
"ppad",
Expand All @@ -482,6 +482,19 @@ def build_audio_pipeline(foo: str = "bar") -> PreprocessingPipeline:

@lru_cache(maxsize=1)
def build_video_pipeline(foo: str = "bar") -> PreprocessingPipeline:
from app.preprocessing.pipeline.steps.audio.convert_to_pcm import convert_to_pcm
from app.preprocessing.pipeline.steps.audio.create_ffmpeg_probe_audio_metadata import (
create_ffmpeg_probe_audio_metadata,
)
from app.preprocessing.pipeline.steps.audio.create_pptd_from_transcription import (
create_pptd_from_transcription,
)
from app.preprocessing.pipeline.steps.audio.generate_automatic_transcription import (
generate_automatic_transcription,
)
from app.preprocessing.pipeline.steps.common.detect_content_language import (
detect_content_language,
)
from app.preprocessing.pipeline.steps.common.remove_erroneous_sdoc import (
remove_erroneous_or_unfinished_sdocs,
)
Expand All @@ -491,8 +504,20 @@ def build_video_pipeline(foo: str = "bar") -> PreprocessingPipeline:
from app.preprocessing.pipeline.steps.common.update_sdoc_status_to_finish import (
update_sdoc_status_to_finish,
)
from app.preprocessing.pipeline.steps.video.add_word_level_transcriptions_to_ppvd_metadata import (
add_word_level_transcriptions_to_ppvd_metadata,
from app.preprocessing.pipeline.steps.text.generate_keywords import (
generate_keywords,
)
from app.preprocessing.pipeline.steps.text.generate_sentence_annotations import (
generate_sentence_annotations,
)
from app.preprocessing.pipeline.steps.text.generate_word_frequencies import (
generate_word_frequncies,
)
from app.preprocessing.pipeline.steps.text.run_spacy_pipeline import (
run_spacy_pipeline,
)
from app.preprocessing.pipeline.steps.text.store_document_in_elasticsearch import (
store_document_in_elasticsearch,
)
from app.preprocessing.pipeline.steps.video.create_and_store_audio_stream_file import (
create_and_store_audio_stream_file,
Expand All @@ -507,11 +532,13 @@ def build_video_pipeline(foo: str = "bar") -> PreprocessingPipeline:
from app.preprocessing.pipeline.steps.video.generate_webp_thumbnail_for_video import (
generate_webp_thumbnail_for_video,
)
from app.preprocessing.pipeline.steps.video.store_metadata_to_database import (
store_metadata_and_data_to_database,
)
from app.preprocessing.pipeline.steps.video.write_ppvd_to_database import (
write_ppvd_to_database,
)

audio_pipeline = build_audio_pipeline()
pipeline = PreprocessingPipeline(doc_type=DocType.video)

pipeline.register_step(
Expand Down Expand Up @@ -539,14 +566,71 @@ def build_video_pipeline(foo: str = "bar") -> PreprocessingPipeline:
required_data=["ppvd"],
)

pipeline.join_pipeline(
pipeline=audio_pipeline,
skip_steps_with_name=["create_ppad"],
pipeline.register_step(
func=write_ppvd_to_database,
required_data=["ppvd"],
)

pipeline.register_step(
func=create_ffmpeg_probe_audio_metadata,
required_data=["ppad"],
)

pipeline.register_step(
func=convert_to_pcm,
required_data=["ppad"],
)

pipeline.register_step(
func=generate_automatic_transcription,
required_data=["ppad"],
)

# instead create pptd before and now add it as metadata
pipeline.register_step(
func=create_pptd_from_transcription,
required_data=["ppad"],
)

pipeline.register_step(
func=detect_content_language,
required_data=["pptd"],
)

# run caption through spacy and add to elasticsearch to make it searchable
pipeline.register_step(
func=run_spacy_pipeline,
required_data=["pptd"],
)

pipeline.register_step(
func=add_word_level_transcriptions_to_ppvd_metadata,
required_data=["ppvd", "ppad"],
func=generate_word_frequncies,
required_data=["pptd"],
)

pipeline.register_step(
func=generate_keywords,
required_data=["pptd"],
)

pipeline.register_step(
func=generate_sentence_annotations,
required_data=["pptd"],
)

pipeline.register_step(
func=store_document_in_elasticsearch,
required_data=["pptd", "sdoc_id"],
)

pipeline.register_step(
func=store_metadata_and_data_to_database,
required_data=[
"pptd",
"ppad",
"ppvd",
"sdoc_id",
],
)

pipeline.register_step(
Expand All @@ -556,11 +640,6 @@ def build_video_pipeline(foo: str = "bar") -> PreprocessingPipeline:
func=remove_erroneous_or_unfinished_sdocs,
)

pipeline.register_step(
func=write_ppvd_to_database,
required_data=["ppvd"],
)

pipeline.register_step(
func=resolve_sdoc_links,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os

from loguru import logger
Expand Down Expand Up @@ -42,4 +43,6 @@ def generate_automatic_transcription(cargo: PipelineCargo) -> PipelineCargo:
)
ppad.word_level_transcriptions.append(wlt)

wlt = list(map(lambda wlt: wlt.model_dump(), ppad.word_level_transcriptions))
ppad.metadata["word_level_transcriptions"] = json.dumps(wlt)
return cargo
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import json

from loguru import logger
from sqlalchemy.orm import Session

Expand All @@ -16,6 +14,7 @@
from app.preprocessing.pipeline.model.audio.preproaudiodoc import PreProAudioDoc
from app.preprocessing.pipeline.model.pipeline_cargo import PipelineCargo
from app.preprocessing.pipeline.model.text.preprotextdoc import PreProTextDoc
from app.preprocessing.pipeline.steps.common.persist_sdoc_data import persist_sdoc_data

repo: RepoService = RepoService()
sql: SQLService = SQLService()
Expand All @@ -31,10 +30,8 @@ def _persist_sdoc_metadata(
sdoc_id = sdoc_db_obj.id
sdoc = SourceDocumentRead.model_validate(sdoc_db_obj)
ppad.metadata["url"] = str(RepoService().get_sdoc_url(sdoc=sdoc))
wlt = list(map(lambda wlt: wlt.model_dump(), ppad.word_level_transcriptions))
ppad.metadata["word_level_transcriptions"] = json.dumps(wlt)
ppad.metadata["language"] = pptd.metadata["language"]
ppad.metadata["transcription_keywords"] = pptd.keywords
ppad.metadata["transcription_keywords"] = pptd.metadata["keywords"]

project_metadata = [
ProjectMetadataRead.model_validate(pm)
Expand Down Expand Up @@ -68,7 +65,7 @@ def _persist_sdoc_metadata(
crud_sdoc_meta.create_multi(db=db, create_dtos=metadata_create_dtos)


def store_metadata_to_database(cargo: PipelineCargo) -> PipelineCargo:
def store_metadata_and_data_to_database(cargo: PipelineCargo) -> PipelineCargo:
ppad: PreProAudioDoc = cargo.data["ppad"]
pptd: PreProTextDoc = cargo.data["pptd"]
audio_sdoc_id: int = cargo.data["sdoc_id"]
Expand All @@ -80,6 +77,9 @@ def store_metadata_to_database(cargo: PipelineCargo) -> PipelineCargo:
# persist SourceDocument Metadata
_persist_sdoc_metadata(db=db, sdoc_db_obj=sdoc_db_obj, ppad=ppad, pptd=pptd)

# persist SourceDocument Data
persist_sdoc_data(db=db, sdoc_db_obj=sdoc_db_obj, pptd=pptd)

except Exception as e:
logger.error(
f"Error while persisting SourceDocument Metadata "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from sqlalchemy.orm import Session

from app.core.data.crud.source_document_data import crud_sdoc_data
from app.core.data.dto.source_document_data import SourceDocumentDataCreate
from app.core.data.orm.source_document import SourceDocumentORM
from app.preprocessing.pipeline.model.text.preprotextdoc import PreProTextDoc


def persist_sdoc_data(
db: Session, sdoc_db_obj: SourceDocumentORM, pptd: PreProTextDoc
) -> None:
sdoc_data = SourceDocumentDataCreate(
id=sdoc_db_obj.id,
content=pptd.text,
html=pptd.html,
token_starts=[s for s, _ in pptd.token_character_offsets],
token_ends=[e for _, e in pptd.token_character_offsets],
sentence_starts=[s.start for s in pptd.sentences],
sentence_ends=[s.end for s in pptd.sentences],
)
crud_sdoc_data.create(db=db, create_dto=sdoc_data)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from app.preprocessing.pipeline.model.image.preproimagedoc import PreProImageDoc
from app.preprocessing.pipeline.model.pipeline_cargo import PipelineCargo
from app.preprocessing.pipeline.model.text.preprotextdoc import PreProTextDoc
from app.preprocessing.pipeline.steps.common.persist_sdoc_data import persist_sdoc_data

repo: RepoService = RepoService()
sql: SQLService = SQLService()
Expand All @@ -28,7 +29,7 @@ def _persist_sdoc_metadata(
sdoc_id = sdoc_db_obj.id
sdoc = SourceDocumentRead.model_validate(sdoc_db_obj)
ppid.metadata["url"] = str(RepoService().get_sdoc_url(sdoc=sdoc))
ppid.metadata["keywords"] = pptd.keywords
ppid.metadata["keywords"] = pptd.metadata["keywords"]

project_metadata = [
ProjectMetadataRead.model_validate(pm)
Expand All @@ -41,7 +42,6 @@ def _persist_sdoc_metadata(
metadata_create_dtos = []
for project_metadata_key, project_metadata in project_metadata_map.items():
if project_metadata_key in ppid.metadata.keys():
logger.info(f"test {project_metadata_key}")
metadata_create_dtos.append(
SourceDocumentMetadataCreate.with_metatype(
value=ppid.metadata[project_metadata_key],
Expand All @@ -62,7 +62,7 @@ def _persist_sdoc_metadata(
crud_sdoc_meta.create_multi(db=db, create_dtos=metadata_create_dtos)


def store_metadata_to_database(cargo: PipelineCargo) -> PipelineCargo:
def store_metadata_and_data_to_database(cargo: PipelineCargo) -> PipelineCargo:
ppid: PreProImageDoc = cargo.data["ppid"]
pptd: PreProTextDoc = cargo.data["pptd"]
image_sdoc_id: int = cargo.data["sdoc_id"]
Expand All @@ -74,6 +74,9 @@ def store_metadata_to_database(cargo: PipelineCargo) -> PipelineCargo:
# persist SourceDocument Metadata
_persist_sdoc_metadata(db=db, sdoc_db_obj=sdoc_db_obj, ppid=ppid, pptd=pptd)

# persist SourceDocument Data
persist_sdoc_data(db=db, sdoc_db_obj=sdoc_db_obj, pptd=pptd)

except Exception as e:
logger.error(
f"Error while persisting SourceDocument Metadata "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

def generate_keywords(cargo: PipelineCargo) -> PipelineCargo:
pptd: PreProTextDoc = cargo.data["pptd"]
if "keywords" in pptd.metadata:
pptd.keywords = pptd.metadata["keywords"] # type: ignore
else:
if "keywords" not in pptd.metadata:
out = pptd.spacy_pipeline_output
if out is None:
logger.error(
Expand Down Expand Up @@ -69,6 +67,6 @@ def generate_keywords(cargo: PipelineCargo) -> PipelineCargo:
# if any of the words is not in the pos dict, we skip the keyword
pass

pptd.keywords = keywords
pptd.metadata["keywords"] = keywords

return cargo
Loading

0 comments on commit f54b588

Please sign in to comment.