diff --git a/README.md b/README.md index c335ddc..0116b7c 100644 --- a/README.md +++ b/README.md @@ -161,11 +161,21 @@ Before applying tf-idf, articles containing any of the specified keywords in the From the rest of articles, to choose the most relevant ones, you can specify one of the following criteria in [config.py](https://github.com/UtrechtUniversity/historical-news-sentiment/blob/main/config.json): -- Threshold for the tf-idf score value -- Maximum number of selected articles with the top scores +- Percentage of selected articles with the top scores +- Maximum number of selected articles with the top scores +- Threshold for the value of cosine similarity between the embeddings of list of keywords and each article. + ```commandline -"article_selector": + "article_selector": + { + "type": "percentage", + "value": "30" + }, + + OR + + "article_selector": { "type": "threshold", "value": "0.02" @@ -186,14 +196,18 @@ python3 scripts/3_select_final_articles.py --input_dir "output/output_timestampe ``` ### 5. Generate output -As the final step of the pipeline, the text of the selected articles is saved in a .csv file, which can be used for manual labeling. The user has the option to choose whether the text should be divided into paragraphs. +As the final step of the pipeline, the text of the selected articles is saved in a .csv file, which can be used for manual labeling. The user has the option to choose whether the text should be divided into paragraphs or a segmentation of the text. This feature can be set in [config.py](https://github.com/UtrechtUniversity/historical-news-sentiment/blob/main/config.json). ```commandline "output_unit": "paragraph" OR -"output_unit": "text" +"output_unit": "full_text" + +OR +"output_unit": "segmented_text" +"sentences_per_segment": 10 ``` ```commandline diff --git a/config.json b/config.json index 399d141..84f6a9a 100644 --- a/config.json +++ b/config.json @@ -10,8 +10,9 @@ ], "article_selector": { - "type": "threshold", - "value": "0.02" + "type": "percentage", + "value": "30" }, - "output_unit": "paragraph" + "output_unit": "segmented_text", + "sentences_per_segment": 10 } diff --git a/interest/article_final_selection/article_selector.py b/interest/article_final_selection/article_selector.py index cd734cf..1add0a2 100644 --- a/interest/article_final_selection/article_selector.py +++ b/interest/article_final_selection/article_selector.py @@ -45,4 +45,10 @@ def select_articles(self) -> List[int]: num_articles = int(self.config["value"]) selected_indices.extend(sorted_indices[:num_articles]) + elif self.config["type"] == "percentage": + percentage = float(self.config["value"]) + num_articles = int(len(self.similarity_scores) * + (percentage / 100.0)) + selected_indices.extend(sorted_indices[:num_articles]) + return selected_indices diff --git a/interest/article_final_selection/process_article.py b/interest/article_final_selection/process_article.py index 198c185..aafbcfe 100644 --- a/interest/article_final_selection/process_article.py +++ b/interest/article_final_selection/process_article.py @@ -8,7 +8,7 @@ text_cleaner = TextCleaner() -def clean(text: str) -> str: +def clean(text: Union[str, List[str]]) -> str: """ Clean the input text using TextCleaner. @@ -47,13 +47,13 @@ def __init__(self, gzip_file_path: str, article_id: int): self._body: Union[str, list, None] = '' self.selected: bool = False - def read_article_from_gzip(self, in_paragraph: bool = False) -> ( - Tuple)[Union[str, None], Union[str, list, None]]: + def read_article_from_gzip(self) -> ( + Tuple)[Union[str, None], Union[List[str], None]]: """ Read article content from a gzip file. Returns: - Tuple[Union[str, None], Union[str, None]]: A tuple containing + Tuple[Union[str, None], Union[list, None]]: A tuple containing the title and body of the article. """ try: @@ -63,7 +63,7 @@ def read_article_from_gzip(self, in_paragraph: bool = False) -> ( article = articles.get(str(self._article_id), {}) title = article.get('title', {}) body = article.get('body', {}) - return title, body if in_paragraph else " ".join(body) + return title, body except Exception as e: # pylint: disable=broad-except logging.error("Error reading article %s from %s: %s", str(self._article_id), self._file_path, e) @@ -88,6 +88,5 @@ def process_article(self, clean_keywords: List[str]) -> str: if title_with_keyword: self.selected = True return "" - if isinstance(self._body, str): - return clean(self._body) - return "" + + return clean(self._body) diff --git a/interest/output_generator/text_formater.py b/interest/output_generator/text_formater.py new file mode 100644 index 0000000..42ee92b --- /dev/null +++ b/interest/output_generator/text_formater.py @@ -0,0 +1,117 @@ +""" This module defines a TextFormatter class for formatting text based on +specified output units. """ +from typing import List, Union +import logging +from interest.settings import SPACY_MODEL +from interest.utils import load_spacy_model + +PARAGRAPH_FORMATTER = 'paragraph' +FULLTEXT_FORMATTER = 'full_text' +SEGMENTED_TEXT_FORMATTER = 'segmented_text' + + +class TextFormatter: + # pylint: disable=R0903 + """Class for formatting text based on specified output units. """ + + def __init__(self, output_unit: str, sentences_per_segment: int, + spacy_model=SPACY_MODEL): # : Union[str, Language] + """ + Initializes the TextFormatter object. + + Args: + output_unit (str): The type of output unit ('paragraph', + 'full_text', 'segmented_text'). + sentences_per_segment (int): Number of sentences per + segment when output_unit is 'segmented_text'. + spacy_model (Union[str, Language], optional): Spacy model + or model name used for text processing. Defaults to the global + SPACY_MODEL value. + """ + self.nlp = ( + load_spacy_model(spacy_model) + if isinstance(spacy_model, str) + else spacy_model + ) + self.sentences_per_segment = sentences_per_segment + self.formatter = output_unit + self.is_fulltext = self._is_fulltext() + self.texts: List[str] = [] + + def format_output(self, texts: Union[None, List[str]]) -> ( + Union)[str, List[str], None]: + """ + Formats input texts based on the specified output unit. + + Args: + texts (List[str]): List of input texts to be formatted. + + Returns: + Union[str, List[List[str]]]: Formatted output text based on the + selected output_unit. For 'full_text', returns a single string. + For 'paragraph' and 'segmented_text', returns a list of segmented + text lists. + + Raises: + ValueError: If input 'texts' is not a list of strings. + ValueError: If an unsupported formatter type is specified. + """ + try: + if (not isinstance(texts, list) or (texts is None) or + not all(isinstance(text, str) for text in texts)): + raise ValueError("Input 'texts' must be a list of strings.") + + self.texts = texts + + if self.formatter == PARAGRAPH_FORMATTER: + return self._format_paragraph() + if self.formatter == FULLTEXT_FORMATTER: + return self._format_fulltext() + if self.formatter == SEGMENTED_TEXT_FORMATTER: + return self._format_segmented_text() + + except ValueError as e: + logging.error("Unsupported formatter %s: %s", self.formatter, e) + return None + return None + + def _format_paragraph(self) -> List[str]: + """Formats texts as a single paragraph. + + Returns: + List[List[str]]: List of input texts, segmented in paragraphs. + """ + return self.texts + + def _format_fulltext(self) -> str: + """Formats texts as full text with newline separators. + + Returns: + str: Newline-separated string of input texts. + """ + return '\n'.join(self.texts) + + def _format_segmented_text(self) -> List[str]: + """Formats texts as segmented text based on sentences_per_segment. + + Returns: + List[str]: Flattened list of segmented text strings. + """ + segmented_texts = [] + for text in self.texts: + doc = self.nlp(text) + sentences = [sent.text for sent in doc.sents] + + for i in range(0, len(sentences), self.sentences_per_segment): + segment = sentences[i:i + self.sentences_per_segment] + segmented_texts.extend(segment) + + return segmented_texts + + def _is_fulltext(self) -> bool: + """Checks if the formatter type is 'full_text'. + + Returns: + bool: True if formatter is 'full_text', False otherwise. + """ + return self.formatter == FULLTEXT_FORMATTER diff --git a/interest/preprocessor/text_cleaner.py b/interest/preprocessor/text_cleaner.py index b7eeefb..ca96945 100644 --- a/interest/preprocessor/text_cleaner.py +++ b/interest/preprocessor/text_cleaner.py @@ -3,11 +3,28 @@ data using various cleaning techniques. """ import re -# from typing import Optional +from typing import Union, List from interest.settings import SPACY_MODEL from interest.utils import load_spacy_model +def merge_texts_list(text: Union[str, List[str]]) -> str: + """ + Merge a list of texts into a single string by joining them with spaces. + + Args: + text (Union[str, List[str]]): The input text or list of texts to merge. + + Returns: + str: The merged text if input is a list of strings, otherwise returns + the input text unchanged. + """ + if isinstance(text, list): + merged_text = ' '.join(text) + return merged_text + return text + + class TextCleaner: """A class for cleaning text data using various preprocessing techniques.""" @@ -82,15 +99,12 @@ def preprocess(self, text): """Preprocess the given text using a series of cleaning steps. Args: - text (str): The text to preprocess. + text ( List[str]): The text to preprocess. Returns: str: The preprocessed text. """ - self.text = text - # self.get_words() - # self.lower() - # self.remove_stopwords() + self.text = merge_texts_list(text) self.get_lower_lemma_tokens() self.remove_numeric() self.remove_extra_whitespace_tabs() @@ -107,6 +121,7 @@ def clean(self, text): Returns: str: The cleaned text. """ + self.text = merge_texts_list(text) self.text = text self.get_words() self.keep_standard_chars() diff --git a/interest/utils.py b/interest/utils.py index 95ac80d..4f9a12e 100644 --- a/interest/utils.py +++ b/interest/utils.py @@ -40,6 +40,7 @@ def load_spacy_model(model_name: str, retry: bool = True) \ spacy.cli.download(model_name) return load_spacy_model(model_name, False) raise exc + nlp.add_pipe("sentencizer") return nlp @@ -106,60 +107,30 @@ def get_keywords_from_config(config_file: Path) -> List[str]: raise KeyError("Keywords not found in config file") from exc -def get_article_selector_from_config(config_file: Path) -> dict: +def read_config(config_file: Path, item_key: str) -> Dict[str, str]: """ - Get the article selector configuration from a JSON file. + Get the value of the given key item from a JSON file. Args: config_file (Path): The path to the JSON config file. - - Returns: - Dict[str, str]: The article selector configuration. - - Raises: - ArticleSelectorNotFoundError: If the article selector - is not found in the config file. - FileNotFoundError: If the config file is not found. - """ - try: - with open(config_file, 'r', encoding=ENCODING) as f: - config: Dict[str, str] = json.load(f)["article_selector"] - if not config: - raise ValueError("Config is empty") - return config - except FileNotFoundError as exc: - raise FileNotFoundError("Config file not found") from exc - except KeyError as exc: - raise KeyError("Article selector not found in config file") \ - from exc - - -def get_output_unit_from_config(config_file: Path) -> dict: - """ - Get the article selector configuration from a JSON file. - - Args: - config_file (Path): The path to the JSON config file. - + item_key (str): Key item defined in config file. Returns: Dict[str, str]: The article selector configuration. Raises: - ArticleSelectorNotFoundError: If the article selector - is not found in the config file. + KeyError: If the key item is not found in the config file. FileNotFoundError: If the config file is not found. """ try: with open(config_file, 'r', encoding=ENCODING) as f: - config: Dict[str, str] = json.load(f)["output_unit"] + config: Dict[str, str] = json.load(f)[item_key] if not config: raise ValueError("Config is empty") return config except FileNotFoundError as exc: raise FileNotFoundError("Config file not found") from exc except KeyError as exc: - raise KeyError("Article selector not found in config file") \ - from exc + raise KeyError("Key item %s not found in config file") from exc def save_filtered_articles(input_file: Any, article_id: str, diff --git a/scripts/step1_filter_articles.py b/scripts/step1_filter_articles.py index 6962405..99d59a0 100644 --- a/scripts/step1_filter_articles.py +++ b/scripts/step1_filter_articles.py @@ -29,7 +29,7 @@ help="Glob pattern for find input files; e.g. '*.gz' ", ) parser.add_argument( - "--config_path", + "--config-path", type=Path, default="config.json", help="File path of config file.", diff --git a/scripts/step3_select_final_articles.py b/scripts/step3_select_final_articles.py index c6d46c9..37f723c 100644 --- a/scripts/step3_select_final_articles.py +++ b/scripts/step3_select_final_articles.py @@ -5,9 +5,11 @@ from pathlib import Path import pandas as pd from interest.utils import get_keywords_from_config -from interest.utils import get_article_selector_from_config +from interest.utils import read_config from interest.article_final_selection.process_articles import select_articles +ARTICLE_SELECTOR_FIELD = "article_selector" + def update_selected_indices_in_file(filepath: str, indices_selected: List[int]) -> None: @@ -42,7 +44,7 @@ def update_selected_indices_in_file(filepath: str, parser = argparse.ArgumentParser("Select final articles.") parser.add_argument( - "--input_dir", + "--input-dir", type=Path, required=True, help="Base directory for reading input files.", @@ -54,7 +56,7 @@ def update_selected_indices_in_file(filepath: str, help="Glob pattern for find input files; e.g. '*.csv'.", ) parser.add_argument( - "--config_path", + "--config-path", type=Path, default="config.json", help="File path of config file.", @@ -66,8 +68,8 @@ def update_selected_indices_in_file(filepath: str, parser.error(f"Not a directory: '{str(args.input_dir.absolute())}'") keywords = get_keywords_from_config(args.config_path) - config_article_selector = get_article_selector_from_config( - args.config_path) + config_article_selector = read_config( + args.config_path, ARTICLE_SELECTOR_FIELD) if (len(keywords) > 0) and config_article_selector: for articles_filepath in args.input_dir.rglob(args.glob): diff --git a/scripts/step4_generate_output.py b/scripts/step4_generate_output.py index 529ed8f..b71c94c 100644 --- a/scripts/step4_generate_output.py +++ b/scripts/step4_generate_output.py @@ -7,8 +7,12 @@ from typing import Union import pandas as pd from pandas import DataFrame +from interest.settings import SPACY_MODEL from interest.article_final_selection.process_article import ArticleProcessor -from interest.utils import get_output_unit_from_config +from interest.utils import read_config +from interest.output_generator.text_formater import (TextFormatter, + SEGMENTED_TEXT_FORMATTER) + FILE_PATH_FIELD = "file_path" ARTICLE_ID_FIELD = "article_id" @@ -17,15 +21,18 @@ LABEL_FIELD = "label" SELECTED_FIELD = "selected" +OUTPUT_UNIT_KEY = "output_unit" +SENTENCE_PER_SEGMENT_KEY = "sentences_per_segment" + -def read_article(row: pd.Series, in_paragraph: bool = False) -> DataFrame: +def read_article(row: pd.Series, formatter: TextFormatter) -> DataFrame: """ Read article from row and return DataFrame of articles. Args: row (pd.Series): A row from a DataFrame. - in_paragraph (bool, optional): Whether to read article in paragraphs. - Defaults to False. + formatter (TextFormatter): An object of TextFormatter to format + output text. Defaults to False. Returns: DataFrame: DataFrame containing article information. @@ -33,27 +40,38 @@ def read_article(row: pd.Series, in_paragraph: bool = False) -> DataFrame: file_path = row[FILE_PATH_FIELD] article_id = row[ARTICLE_ID_FIELD] article_processor = ArticleProcessor(file_path, article_id) - title, body = article_processor.read_article_from_gzip(in_paragraph) - - titles = [title] * len(body) if in_paragraph and body is not None else [title] - files_path = [file_path] * len(body) if in_paragraph and body is not None else [file_path] - articles_id = [article_id] * len(body) if in_paragraph and body is not None else [article_id] - label = [''] * len(body) if in_paragraph and body is not None else [''] + title, body = article_processor.read_article_from_gzip() + + body_formatted = formatter.format_output(body) + + titles = [title] * len(body_formatted) \ + if ((not formatter.is_fulltext) and body_formatted is not None) \ + else [title] + files_path = [file_path] * len(body_formatted) \ + if ((not formatter.is_fulltext) and body_formatted is not None) \ + else [file_path] + articles_id = ([article_id] * len(body_formatted)) \ + if (not formatter.is_fulltext) and body_formatted is not None \ + else [article_id] + label = [''] * len(body_formatted) \ + if (not formatter.is_fulltext) and body_formatted is not None \ + else [''] return pd.DataFrame({FILE_PATH_FIELD: files_path, ARTICLE_ID_FIELD: articles_id, TITLE_FIELD: titles, - BODY_FIELD: body, + BODY_FIELD: body_formatted, LABEL_FIELD: label}) -def find_articles_in_file(filepath: str, in_paragraph: bool) -> ( +def find_articles_in_file(filepath: str, formatter: TextFormatter) -> ( Union)[DataFrame, None]: """ Find selected articles in a CSV file and return DataFrame of articles. Args: filepath (str): Path to the CSV file. - in_paragraph (bool): Whether to read articles in paragraphs. + formatter (TextFormatter): An object of TextFormatter to format + output text. Returns: DataFrame: DataFrame containing selected articles information. @@ -62,7 +80,7 @@ def find_articles_in_file(filepath: str, in_paragraph: bool) -> ( df_articles = pd.read_csv(filepath) df_selected = df_articles.loc[df_articles[SELECTED_FIELD] == 1] - result = pd.concat([read_article(row, in_paragraph=in_paragraph) + result = pd.concat([read_article(row, formatter) for _, row in df_selected.iterrows()], axis=0, ignore_index=True) return result @@ -75,7 +93,7 @@ def find_articles_in_file(filepath: str, in_paragraph: bool) -> ( parser = argparse.ArgumentParser("Select final articles.") parser.add_argument( - "--input_dir", + "--input-dir", type=Path, required=True, help="Base directory for reading input files.", @@ -87,13 +105,13 @@ def find_articles_in_file(filepath: str, in_paragraph: bool) -> ( help="Glob pattern for find input files; e.g. '*.csv'.", ) parser.add_argument( - "--config_path", + "--config-path", type=Path, default="config.json", help="File path of config file.", ) parser.add_argument( - "--output_dir", + "--output-dir", type=Path, required=True, help="The directory for storing output files.", @@ -105,15 +123,21 @@ def find_articles_in_file(filepath: str, in_paragraph: bool) -> ( parser.error(f"Not a directory: '{str(args.input_dir.absolute())}'") args.output_dir.mkdir(parents=True, exist_ok=True) - config_output_unit = get_output_unit_from_config(args.config_path) + output_unit = read_config(args.config_path, OUTPUT_UNIT_KEY) + + SENTENCES_PER_SEGMENT = '0' + if output_unit == SEGMENTED_TEXT_FORMATTER: + SENTENCES_PER_SEGMENT = str(read_config(args.config_path, + SENTENCE_PER_SEGMENT_KEY)) result_df = pd.DataFrame(columns=[FILE_PATH_FIELD, ARTICLE_ID_FIELD, TITLE_FIELD, BODY_FIELD, LABEL_FIELD]) - IN_PARAGRAPH = config_output_unit == "paragraph" + text_formatter = TextFormatter(str(output_unit), + int(SENTENCES_PER_SEGMENT), + spacy_model=SPACY_MODEL) for articles_filepath in args.input_dir.rglob(args.glob): - df = find_articles_in_file(articles_filepath, - in_paragraph=IN_PARAGRAPH) + df = find_articles_in_file(articles_filepath, text_formatter) result_df = pd.concat([result_df, df], ignore_index=True) result_df.to_csv(os.path.join(args.output_dir, 'articles_to_label.csv'))