From 81c5953fa582b04d8192581b1739c6e2658aa989 Mon Sep 17 00:00:00 2001 From: "Dr. Kiji" Date: Thu, 2 Jan 2025 18:50:23 +0900 Subject: [PATCH 1/9] add mecab keywords handler for japanese --- .../rag/datasource/keyword/keyword_factory.py | 4 +- .../rag/datasource/keyword/keyword_type.py | 1 + .../rag/datasource/keyword/mecab/README.md | 273 +++++++++++++++++ .../rag/datasource/keyword/mecab/config.py | 19 ++ .../rag/datasource/keyword/mecab/mecab.py | 287 ++++++++++++++++++ .../mecab/mecab_keyword_table_handler.py | 152 ++++++++++ .../rag/datasource/keyword/mecab/stopwords.py | 36 +++ 7 files changed, 771 insertions(+), 1 deletion(-) create mode 100644 api/core/rag/datasource/keyword/mecab/README.md create mode 100644 api/core/rag/datasource/keyword/mecab/config.py create mode 100644 api/core/rag/datasource/keyword/mecab/mecab.py create mode 100644 api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py create mode 100644 api/core/rag/datasource/keyword/mecab/stopwords.py diff --git a/api/core/rag/datasource/keyword/keyword_factory.py b/api/core/rag/datasource/keyword/keyword_factory.py index f1a6ade91f9bd1..5e30c873a7ea41 100644 --- a/api/core/rag/datasource/keyword/keyword_factory.py +++ b/api/core/rag/datasource/keyword/keyword_factory.py @@ -22,8 +22,10 @@ def get_keyword_factory(keyword_type: str) -> type[BaseKeyword]: match keyword_type: case KeyWordType.JIEBA: from core.rag.datasource.keyword.jieba.jieba import Jieba - return Jieba + case KeyWordType.MECAB: + from core.rag.datasource.keyword.mecab.mecab import MeCab + return MeCab case _: raise ValueError(f"Keyword store {keyword_type} is not supported.") diff --git a/api/core/rag/datasource/keyword/keyword_type.py b/api/core/rag/datasource/keyword/keyword_type.py index d845c7111dd578..a4fb63b794bc6b 100644 --- a/api/core/rag/datasource/keyword/keyword_type.py +++ b/api/core/rag/datasource/keyword/keyword_type.py @@ -3,3 +3,4 @@ class KeyWordType(StrEnum): JIEBA = "jieba" + MECAB = "mecab" diff --git a/api/core/rag/datasource/keyword/mecab/README.md b/api/core/rag/datasource/keyword/mecab/README.md new file mode 100644 index 00000000000000..f589c96d442ba2 --- /dev/null +++ b/api/core/rag/datasource/keyword/mecab/README.md @@ -0,0 +1,273 @@ +# MeCab Keyword Processor + +A Japanese text keyword extraction module using MeCab morphological analyzer for the Dify RAG system. + +## Overview + +This module provides Japanese text keyword extraction capabilities using the MeCab morphological analyzer. It's designed to: + +- Extract meaningful keywords from Japanese text +- Handle compound words and technical terms +- Support custom dictionaries +- Provide configurable scoring based on parts of speech +- Handle mixed Japanese-English text + +## Components + +### 1. MeCabKeywordTableHandler + +The core component responsible for keyword extraction using MeCab: + +```python +handler = MeCabKeywordTableHandler( + dictionary_path="/path/to/dict", # Optional custom dictionary + user_dictionary_path="/path/to/user_dict" # Optional user dictionary +) +keywords = handler.extract_keywords(text, max_keywords=10) +``` + +#### Features: + +- **Part of Speech (POS) Weighting**: + + ```python + pos_weights = { + '名詞': 1.0, # Nouns + '動詞': 0.8, # Verbs + '形容詞': 0.6, # Adjectives + '副詞': 0.4, # Adverbs + '連体詞': 0.3, # Adnominal adjectives + '感動詞': 0.2, # Interjections + } + ``` + +- **Special Term Handling**: + - Boosts scores for proper nouns (固有名詞) + - Boosts scores for technical terms (専門用語) + - Compound word detection (e.g., "機械学習", "自然言語処理") + +- **Reading Normalization**: + - Handles different forms of the same word + - Normalizes compound terms using readings + +### 2. Configuration (MeCabConfig) + +Configurable settings for the processor: + +```python +class MeCabConfig(BaseModel): + max_keywords_per_chunk: int = 10 + min_keyword_length: int = 2 + score_threshold: float = 0.3 + storage_type: str = "database" + cache_timeout: int = 3600 + dictionary_path: str = "" + user_dictionary_path: str = "" + pos_weights: dict = {...} +``` + +### 3. Stopwords + +Comprehensive Japanese stopword list including: + +- Particles (は, が, の, etc.) +- Auxiliary verbs (です, ます, etc.) +- Pronouns (これ, それ, etc.) +- Common words +- Numbers and punctuation +- Common English stopwords for mixed text + +## Usage + +### Basic Usage + +```python +from core.rag.datasource.keyword.keyword_factory import Keyword +from models.dataset import Dataset + +# Initialize +dataset = Dataset(...) +keyword_processor = Keyword(dataset) # Will use MeCab if KEYWORD_STORE = "mecab" + +# Process text +documents = [ + Document( + page_content="自然言語処理は人工知能の重要な分野です。", + metadata={"doc_id": "1", ...} + ) +] +keyword_processor.create(documents) + +# Search +results = keyword_processor.search("自然言語処理について") +``` + +### Custom Dictionary Usage + +```python +# In your configuration: +KEYWORD_PROCESSOR_CONFIG = { + "dictionary_path": "/path/to/mecab/dict", + "user_dictionary_path": "/path/to/user.dic", + "pos_weights": { + "名詞": 1.2, + "動詞": 0.8, + # ... customize weights + } +} +``` + +## Features + +### 1. Keyword Extraction + +- **POS-based Scoring**: + - Weights different parts of speech + - Boosts important terms + - Configurable scoring thresholds + +- **Compound Word Detection**: + + ```python + # Input text: "自然言語処理の研究" + # Detected compounds: + # - "自然言語" + # - "自然言語処理" + # - "言語処理" + ``` + +- **Reading Normalization**: + + ```python + # Handles variations: + # - "データベース" (katakana) + # - "データベース" (with readings) + # Both normalize to same term + ``` + +### 2. Storage + +- **Flexible Storage Options**: + - Database storage + - File-based storage + - Redis-based locking for concurrency + +- **Data Structure**: + + ```python + { + "__type__": "keyword_table", + "__data__": { + "index_id": "dataset_id", + "table": { + "keyword1": ["doc_id1", "doc_id2"], + "keyword2": ["doc_id2", "doc_id3"], + } + } + } + ``` + +### 3. Error Handling + +- Comprehensive error handling +- Custom exception classes +- Logging integration +- Graceful fallbacks + +## Performance Considerations + +1. **Memory Usage**: + - Efficient keyword table structure + - Batch processing support + - Caching mechanisms + +2. **Concurrency**: + - Redis-based locking + - Transaction handling + - Safe concurrent access + +3. **Optimization Tips**: + - Use appropriate batch sizes + - Configure caching timeouts + - Adjust scoring thresholds + +## Dependencies + +- MeCab and Python bindings: + + ```bash + # Ubuntu/Debian + apt-get install mecab mecab-ipadic-utf8 python3-mecab + + # macOS + brew install mecab mecab-ipadic + pip install mecab-python3 + ``` + +## Best Practices + +1. **Dictionary Management**: + - Keep dictionaries updated + - Use domain-specific user dictionaries + - Regular maintenance of custom terms + +2. **Configuration Tuning**: + - Adjust POS weights for your use case + - Set appropriate thresholds + - Monitor and adjust batch sizes + +3. **Error Handling**: + - Implement proper logging + - Monitor extraction quality + - Handle edge cases + +## Testing + +Example test cases: + +```python +def test_basic_extraction(): + text = "自然言語処理は人工知能の重要な分野です。" + keywords = handler.extract_keywords(text) + assert "自然言語処理" in keywords + assert "人工知能" in keywords + +def test_compound_words(): + text = "機械学習モデルを使った自然言語処理" + keywords = handler.extract_keywords(text) + assert "機械学習" in keywords + assert "自然言語処理" in keywords + +def test_mixed_text(): + text = "AIを使った自然言語処理のResearch" + keywords = handler.extract_keywords(text) + assert "AI" in keywords + assert "自然言語処理" in keywords + assert "Research" in keywords +``` + +## Common Issues and Solutions + +1. **Dictionary Loading Failures**: + + ```python + try: + handler = MeCabKeywordTableHandler(dictionary_path=path) + except RuntimeError as e: + # Handle dictionary loading error + ``` + +2. **Memory Usage**: + + ```python + # Use batch processing for large datasets + for batch in chunks(documents, size=100): + process_batch(batch) + ``` + +3. **Concurrent Access**: + + ```python + with redis_client.lock(f"lock_{dataset_id}"): + # Safe concurrent operations + ``` diff --git a/api/core/rag/datasource/keyword/mecab/config.py b/api/core/rag/datasource/keyword/mecab/config.py new file mode 100644 index 00000000000000..0abfc24a3dfc1e --- /dev/null +++ b/api/core/rag/datasource/keyword/mecab/config.py @@ -0,0 +1,19 @@ +from pydantic import BaseModel + +class MeCabConfig(BaseModel): + """Configuration for MeCab keyword processor.""" + max_keywords_per_chunk: int = 10 + min_keyword_length: int = 2 + score_threshold: float = 0.3 + storage_type: str = "database" + cache_timeout: int = 3600 + + # MeCab specific settings + dictionary_path: str = "" # Optional custom dictionary path + user_dictionary_path: str = "" # Optional user dictionary path + pos_weights: dict = { + '名詞': 1.0, # Nouns + '動詞': 0.8, # Verbs + '形容詞': 0.6, # Adjectives + '副詞': 0.4, # Adverbs + } diff --git a/api/core/rag/datasource/keyword/mecab/mecab.py b/api/core/rag/datasource/keyword/mecab/mecab.py new file mode 100644 index 00000000000000..f40e3c229e563c --- /dev/null +++ b/api/core/rag/datasource/keyword/mecab/mecab.py @@ -0,0 +1,287 @@ +import json +import logging +from typing import Any, Optional +from collections import defaultdict + +from core.rag.datasource.keyword.keyword_base import BaseKeyword +from core.rag.datasource.keyword.mecab.mecab_keyword_table_handler import MeCabKeywordTableHandler +from core.rag.datasource.keyword.mecab.config import MeCabConfig +from core.rag.models.document import Document +from extensions.ext_database import db +from extensions.ext_redis import redis_client +from extensions.ext_storage import storage +from models.dataset import Dataset, DatasetKeywordTable, DocumentSegment + + +logger = logging.getLogger(__name__) + + +class KeywordProcessorError(Exception): + """Base error for keyword processing.""" + pass + + +class KeywordExtractionError(KeywordProcessorError): + """Error during keyword extraction.""" + pass + + +class KeywordStorageError(KeywordProcessorError): + """Error during storage operations.""" + pass + + +class SetEncoder(json.JSONEncoder): + """JSON encoder that handles sets.""" + def default(self, obj): + if isinstance(obj, set): + return list(obj) + return super().default(obj) + + +class MeCab(BaseKeyword): + """Japanese keyword processor using MeCab morphological analyzer.""" + + def __init__(self, dataset: Dataset): + super().__init__(dataset) + self._config = MeCabConfig() + self._keyword_handler = None + self._init_handler() + + def _init_handler(self): + """Initialize MeCab handler with configuration.""" + try: + self._keyword_handler = MeCabKeywordTableHandler( + dictionary_path=self._config.dictionary_path, + user_dictionary_path=self._config.user_dictionary_path + ) + if self._config.pos_weights: + self._keyword_handler.pos_weights = self._config.pos_weights + self._keyword_handler.min_score = self._config.score_threshold + except Exception as e: + logger.error(f"Failed to initialize MeCab handler: {str(e)}") + raise KeywordProcessorError(f"MeCab initialization failed: {str(e)}") + + def create(self, texts: list[Document], **kwargs) -> BaseKeyword: + """Create keyword index for documents.""" + lock_name = f"keyword_indexing_lock_{self.dataset.id}" + with redis_client.lock(lock_name, timeout=600): + keyword_table = self._get_dataset_keyword_table() + + for text in texts: + keywords = self._keyword_handler.extract_keywords( + text.page_content, + self._config.max_keywords_per_chunk + ) + if text.metadata is not None: + self._update_segment_keywords( + self.dataset.id, + text.metadata["doc_id"], + list(keywords) + ) + keyword_table = self._add_text_to_keyword_table( + keyword_table or {}, + text.metadata["doc_id"], + list(keywords) + ) + + self._save_dataset_keyword_table(keyword_table) + return self + + def add_texts(self, texts: list[Document], **kwargs): + """Add new texts to existing index.""" + lock_name = f"keyword_indexing_lock_{self.dataset.id}" + with redis_client.lock(lock_name, timeout=600): + keyword_table = self._get_dataset_keyword_table() + keywords_list = kwargs.get("keywords_list") + + for i, text in enumerate(texts): + if keywords_list: + keywords = keywords_list[i] + if not keywords: + keywords = self._keyword_handler.extract_keywords( + text.page_content, + self._config.max_keywords_per_chunk + ) + else: + keywords = self._keyword_handler.extract_keywords( + text.page_content, + self._config.max_keywords_per_chunk + ) + + if text.metadata is not None: + self._update_segment_keywords( + self.dataset.id, + text.metadata["doc_id"], + list(keywords) + ) + keyword_table = self._add_text_to_keyword_table( + keyword_table or {}, + text.metadata["doc_id"], + list(keywords) + ) + + self._save_dataset_keyword_table(keyword_table) + + def text_exists(self, id: str) -> bool: + """Check if text exists in index.""" + keyword_table = self._get_dataset_keyword_table() + if keyword_table is None: + return False + return id in set.union(*keyword_table.values()) if keyword_table else False + + def delete_by_ids(self, ids: list[str]) -> None: + """Delete texts by IDs.""" + lock_name = f"keyword_indexing_lock_{self.dataset.id}" + with redis_client.lock(lock_name, timeout=600): + keyword_table = self._get_dataset_keyword_table() + if keyword_table is not None: + keyword_table = self._delete_ids_from_keyword_table(keyword_table, ids) + self._save_dataset_keyword_table(keyword_table) + + def delete(self) -> None: + """Delete entire index.""" + lock_name = f"keyword_indexing_lock_{self.dataset.id}" + with redis_client.lock(lock_name, timeout=600): + dataset_keyword_table = self.dataset.dataset_keyword_table + if dataset_keyword_table: + db.session.delete(dataset_keyword_table) + db.session.commit() + if dataset_keyword_table.data_source_type != "database": + file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt" + storage.delete(file_key) + + def search(self, query: str, **kwargs: Any) -> list[Document]: + """Search documents using keywords.""" + keyword_table = self._get_dataset_keyword_table() + k = kwargs.get("top_k", 4) + + sorted_chunk_indices = self._retrieve_ids_by_query( + keyword_table or {}, + query, + k + ) + + documents = [] + for chunk_index in sorted_chunk_indices: + segment = ( + db.session.query(DocumentSegment) + .filter( + DocumentSegment.dataset_id == self.dataset.id, + DocumentSegment.index_node_id == chunk_index + ) + .first() + ) + + if segment: + documents.append( + Document( + page_content=segment.content, + metadata={ + "doc_id": chunk_index, + "doc_hash": segment.index_node_hash, + "document_id": segment.document_id, + "dataset_id": segment.dataset_id, + }, + ) + ) + + return documents + + def _get_dataset_keyword_table(self) -> Optional[dict]: + """Get keyword table from storage.""" + dataset_keyword_table = self.dataset.dataset_keyword_table + if dataset_keyword_table: + keyword_table_dict = dataset_keyword_table.keyword_table_dict + if keyword_table_dict: + return dict(keyword_table_dict["__data__"]["table"]) + return {} + + def _save_dataset_keyword_table(self, keyword_table): + """Save keyword table to storage.""" + table_dict = { + "__type__": "keyword_table", + "__data__": { + "index_id": self.dataset.id, + "summary": None, + "table": keyword_table + } + } + + dataset_keyword_table = self.dataset.dataset_keyword_table + data_source_type = dataset_keyword_table.data_source_type + + if data_source_type == "database": + dataset_keyword_table.keyword_table = json.dumps(table_dict, cls=SetEncoder) + db.session.commit() + else: + file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt" + if storage.exists(file_key): + storage.delete(file_key) + storage.save( + file_key, + json.dumps(table_dict, cls=SetEncoder).encode("utf-8") + ) + + def _add_text_to_keyword_table(self, keyword_table: dict, id: str, keywords: list[str]) -> dict: + """Add text keywords to table.""" + for keyword in keywords: + if keyword not in keyword_table: + keyword_table[keyword] = set() + keyword_table[keyword].add(id) + return keyword_table + + def _delete_ids_from_keyword_table(self, keyword_table: dict, ids: list[str]) -> dict: + """Delete IDs from keyword table.""" + node_idxs_to_delete = set(ids) + keywords_to_delete = set() + + for keyword, node_idxs in keyword_table.items(): + if node_idxs_to_delete.intersection(node_idxs): + keyword_table[keyword] = node_idxs.difference(node_idxs_to_delete) + if not keyword_table[keyword]: + keywords_to_delete.add(keyword) + + for keyword in keywords_to_delete: + del keyword_table[keyword] + + return keyword_table + + def _retrieve_ids_by_query(self, keyword_table: dict, query: str, k: int = 4): + """Retrieve document IDs by query.""" + keywords = self._keyword_handler.extract_keywords(query) + + # Score documents based on matching keywords + chunk_indices_count = defaultdict(int) + keywords_list = [ + keyword for keyword in keywords + if keyword in set(keyword_table.keys()) + ] + + for keyword in keywords_list: + for node_id in keyword_table[keyword]: + chunk_indices_count[node_id] += 1 + + sorted_chunk_indices = sorted( + chunk_indices_count.keys(), + key=lambda x: chunk_indices_count[x], + reverse=True + ) + + return sorted_chunk_indices[:k] + + def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: list[str]): + """Update segment keywords in database.""" + document_segment = ( + db.session.query(DocumentSegment) + .filter( + DocumentSegment.dataset_id == dataset_id, + DocumentSegment.index_node_id == node_id + ) + .first() + ) + + if document_segment: + document_segment.keywords = keywords + db.session.add(document_segment) + db.session.commit() diff --git a/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py new file mode 100644 index 00000000000000..0eaf230300d321 --- /dev/null +++ b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py @@ -0,0 +1,152 @@ +import re +from typing import Optional, Set +import MeCab +from collections import defaultdict + +from core.rag.datasource.keyword.mecab.stopwords import STOPWORDS + +class MeCabKeywordTableHandler: + """Japanese keyword extraction using MeCab morphological analyzer.""" + + def __init__(self, dictionary_path: str = "", user_dictionary_path: str = ""): + """Initialize MeCab tokenizer. + + Args: + dictionary_path: Path to custom system dictionary + user_dictionary_path: Path to user dictionary + """ + try: + # Build MeCab argument string + mecab_args = ["-Ochasen"] # Use ChaSen format for detailed POS info + if dictionary_path: + mecab_args.append(f"-d {dictionary_path}") + if user_dictionary_path: + mecab_args.append(f"-u {user_dictionary_path}") + + self.tagger = MeCab.Tagger(" ".join(mecab_args)) + self.tagger.parse('') # Force initialization to catch dictionary errors + + except RuntimeError as e: + raise RuntimeError(f"Failed to initialize MeCab: {str(e)}") + + # POS weights for scoring + self.pos_weights = { + '名詞': 1.0, # Nouns + '動詞': 0.8, # Verbs + '形容詞': 0.6, # Adjectives + '副詞': 0.4, # Adverbs + '連体詞': 0.3, # Adnominal adjectives + '感動詞': 0.2, # Interjections + } + self.min_score = 0.3 + + def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10) -> Set[str]: + """Extract keywords from Japanese text using MeCab. + + Args: + text: Input text to extract keywords from + max_keywords_per_chunk: Maximum number of keywords to extract + + Returns: + Set of extracted keywords + """ + if not text or not text.strip(): + return set() + + try: + # Parse text with MeCab + self.tagger.parse('') # Clear tagger state + node = self.tagger.parseToNode(text) + + # Calculate term frequencies and scores + term_scores = defaultdict(float) + while node: + features = node.feature.split(',') + if len(features) > 0: + pos = features[0] # Part of speech + pos_subtype = features[1] if len(features) > 1 else '' + base_form = features[6] if len(features) > 6 else node.surface + + # Score the term based on its POS + if pos in self.pos_weights and base_form not in STOPWORDS: + score = self.pos_weights[pos] + # Boost proper nouns and technical terms + if pos == '名詞' and pos_subtype in ['固有名詞', '専門用語']: + score *= 1.5 + if len(base_form) > 1: # Filter out single characters + term_scores[base_form] += score + + node = node.next + + # Get top scoring terms + sorted_terms = sorted( + term_scores.items(), + key=lambda x: x[1], + reverse=True + ) + + # Filter by minimum score and take top N + keywords = { + term for term, score in sorted_terms + if score >= self.min_score + } + + if max_keywords_per_chunk: + keywords = set(list(keywords)[:max_keywords_per_chunk]) + + # Expand with compound terms + expanded_keywords = self._expand_tokens_with_compounds(keywords, text) + + return expanded_keywords + + except Exception as e: + raise RuntimeError(f"Failed to extract keywords: {str(e)}") + + def _expand_tokens_with_compounds(self, keywords: Set[str], text: str) -> Set[str]: + """Expand keywords with compound terms. + + This method looks for adjacent keywords in the original text to capture + compound terms like '機械学習' (machine learning) or '自然言語処理' (natural language processing). + """ + results = set(keywords) + + try: + # Parse again to find compounds + node = self.tagger.parseToNode(text) + compound = [] + compound_readings = [] # For handling different forms of the same compound + + while node: + features = node.feature.split(',') + if len(features) > 6: + base_form = features[6] + reading = features[7] if len(features) > 7 else None + else: + base_form = node.surface + reading = None + + if base_form in keywords: + compound.append(base_form) + if reading: + compound_readings.append(reading) + else: + if len(compound) > 1: + # Add the compound term + compound_term = ''.join(compound) + if len(compound_term) > 1: + results.add(compound_term) + # If readings are available, add normalized form + if compound_readings: + normalized_term = ''.join(compound_readings) + if normalized_term != compound_term: + results.add(normalized_term) + compound = [] + compound_readings = [] + + node = node.next + + return results + + except Exception as e: + # If compound expansion fails, return original keywords + return keywords diff --git a/api/core/rag/datasource/keyword/mecab/stopwords.py b/api/core/rag/datasource/keyword/mecab/stopwords.py new file mode 100644 index 00000000000000..13802ac947a1fb --- /dev/null +++ b/api/core/rag/datasource/keyword/mecab/stopwords.py @@ -0,0 +1,36 @@ +STOPWORDS = { + # Japanese particles and basic stopwords + "は", "が", "の", "に", "を", "で", "へ", "と", "から", "より", "まで", "によって", + "あそこ", "あっ", "あの", "あのかた", "あの人", "あり", "あります", "ある", "あれ", + "い", "いう", "います", "いる", "う", "うち", "え", "お", "および", "おり", "おります", + "か", "かつて", "から", "が", "き", "ここ", "こちら", "こと", "この", "これ", "これら", + "さ", "さらに", "し", "しかし", "する", "ず", "せ", "せる", "そこ", "そして", "その", + "その他", "その後", "それ", "それぞれ", "それで", "た", "ただし", "たち", "ため", "たり", + "だ", "だっ", "だれ", "つ", "て", "で", "でき", "できる", "です", "では", "でも", "と", + "という", "といった", "とき", "ところ", "として", "とともに", "とも", "と共に", "どこ", + "どの", "な", "ない", "なお", "なかっ", "ながら", "なく", "なっ", "など", "なに", "なら", + "なり", "なる", "なん", "に", "において", "における", "について", "にて", "によって", "により", + "による", "に対して", "に対する", "に関する", "の", "ので", "のみ", "は", "ば", "へ", "ほか", + "ほとんど", "ほど", "ます", "また", "または", "まで", "も", "もの", "ものの", "や", "よう", + "より", "ら", "られ", "られる", "れ", "れる", "を", "ん", "何", "及び", "彼", "彼女", + "我々", "特に", "私", "私達", "貴方", "貴方方", + + # Japanese auxiliary verbs + "です", "ます", "でした", "ました", "である", "だ", "な", "だった", + + # Japanese pronouns + "これ", "それ", "あれ", "この", "その", "あの", "ここ", "そこ", "あそこ", + + # Japanese common words + "いる", "ある", "なる", "する", "できる", "おる", "いく", "くる", + + # Numbers + "一", "二", "三", "四", "五", "六", "七", "八", "九", "十", + "1", "2", "3", "4", "5", "6", "7", "8", "9", "0", + + # Punctuation + "、", "。", "「", "」", "『", "』", "(", ")", "[", "]", + + # Common English stopwords (for mixed text) + "the", "is", "at", "which", "on", "in", "and", "or", "a", "an", +} From 77030d758157cc2fe5279b9798d63b870749fd5d Mon Sep 17 00:00:00 2001 From: "Dr. Kiji" Date: Thu, 2 Jan 2025 19:14:16 +0900 Subject: [PATCH 2/9] linting --- .../rag/datasource/keyword/keyword_factory.py | 2 + .../rag/datasource/keyword/mecab/config.py | 14 +- .../rag/datasource/keyword/mecab/mecab.py | 149 +++++-------- .../mecab/mecab_keyword_table_handler.py | 98 ++++---- .../rag/datasource/keyword/mecab/stopwords.py | 210 +++++++++++++++--- 5 files changed, 294 insertions(+), 179 deletions(-) diff --git a/api/core/rag/datasource/keyword/keyword_factory.py b/api/core/rag/datasource/keyword/keyword_factory.py index 5e30c873a7ea41..1645aaebba232f 100644 --- a/api/core/rag/datasource/keyword/keyword_factory.py +++ b/api/core/rag/datasource/keyword/keyword_factory.py @@ -22,9 +22,11 @@ def get_keyword_factory(keyword_type: str) -> type[BaseKeyword]: match keyword_type: case KeyWordType.JIEBA: from core.rag.datasource.keyword.jieba.jieba import Jieba + return Jieba case KeyWordType.MECAB: from core.rag.datasource.keyword.mecab.mecab import MeCab + return MeCab case _: raise ValueError(f"Keyword store {keyword_type} is not supported.") diff --git a/api/core/rag/datasource/keyword/mecab/config.py b/api/core/rag/datasource/keyword/mecab/config.py index 0abfc24a3dfc1e..5c9cb0449d8e90 100644 --- a/api/core/rag/datasource/keyword/mecab/config.py +++ b/api/core/rag/datasource/keyword/mecab/config.py @@ -1,19 +1,21 @@ from pydantic import BaseModel + class MeCabConfig(BaseModel): """Configuration for MeCab keyword processor.""" + max_keywords_per_chunk: int = 10 min_keyword_length: int = 2 score_threshold: float = 0.3 storage_type: str = "database" cache_timeout: int = 3600 - + # MeCab specific settings dictionary_path: str = "" # Optional custom dictionary path user_dictionary_path: str = "" # Optional user dictionary path pos_weights: dict = { - '名詞': 1.0, # Nouns - '動詞': 0.8, # Verbs - '形容詞': 0.6, # Adjectives - '副詞': 0.4, # Adverbs - } + "名詞": 1.0, # Nouns + "動詞": 0.8, # Verbs + "形容詞": 0.6, # Adjectives + "副詞": 0.4, # Adverbs + } diff --git a/api/core/rag/datasource/keyword/mecab/mecab.py b/api/core/rag/datasource/keyword/mecab/mecab.py index f40e3c229e563c..660b38650f083e 100644 --- a/api/core/rag/datasource/keyword/mecab/mecab.py +++ b/api/core/rag/datasource/keyword/mecab/mecab.py @@ -1,38 +1,41 @@ import json import logging -from typing import Any, Optional from collections import defaultdict +from typing import Any, Optional from core.rag.datasource.keyword.keyword_base import BaseKeyword -from core.rag.datasource.keyword.mecab.mecab_keyword_table_handler import MeCabKeywordTableHandler from core.rag.datasource.keyword.mecab.config import MeCabConfig +from core.rag.datasource.keyword.mecab.mecab_keyword_table_handler import MeCabKeywordTableHandler from core.rag.models.document import Document from extensions.ext_database import db from extensions.ext_redis import redis_client from extensions.ext_storage import storage -from models.dataset import Dataset, DatasetKeywordTable, DocumentSegment - +from models.dataset import Dataset, DocumentSegment logger = logging.getLogger(__name__) class KeywordProcessorError(Exception): """Base error for keyword processing.""" + pass class KeywordExtractionError(KeywordProcessorError): """Error during keyword extraction.""" + pass class KeywordStorageError(KeywordProcessorError): """Error during storage operations.""" + pass class SetEncoder(json.JSONEncoder): """JSON encoder that handles sets.""" + def default(self, obj): if isinstance(obj, set): return list(obj) @@ -41,19 +44,18 @@ def default(self, obj): class MeCab(BaseKeyword): """Japanese keyword processor using MeCab morphological analyzer.""" - + def __init__(self, dataset: Dataset): super().__init__(dataset) self._config = MeCabConfig() self._keyword_handler = None self._init_handler() - + def _init_handler(self): """Initialize MeCab handler with configuration.""" try: self._keyword_handler = MeCabKeywordTableHandler( - dictionary_path=self._config.dictionary_path, - user_dictionary_path=self._config.user_dictionary_path + dictionary_path=self._config.dictionary_path, user_dictionary_path=self._config.user_dictionary_path ) if self._config.pos_weights: self._keyword_handler.pos_weights = self._config.pos_weights @@ -61,75 +63,60 @@ def _init_handler(self): except Exception as e: logger.error(f"Failed to initialize MeCab handler: {str(e)}") raise KeywordProcessorError(f"MeCab initialization failed: {str(e)}") - + def create(self, texts: list[Document], **kwargs) -> BaseKeyword: """Create keyword index for documents.""" lock_name = f"keyword_indexing_lock_{self.dataset.id}" with redis_client.lock(lock_name, timeout=600): keyword_table = self._get_dataset_keyword_table() - + for text in texts: keywords = self._keyword_handler.extract_keywords( - text.page_content, - self._config.max_keywords_per_chunk + text.page_content, self._config.max_keywords_per_chunk ) if text.metadata is not None: - self._update_segment_keywords( - self.dataset.id, - text.metadata["doc_id"], - list(keywords) - ) + self._update_segment_keywords(self.dataset.id, text.metadata["doc_id"], list(keywords)) keyword_table = self._add_text_to_keyword_table( - keyword_table or {}, - text.metadata["doc_id"], - list(keywords) + keyword_table or {}, text.metadata["doc_id"], list(keywords) ) - + self._save_dataset_keyword_table(keyword_table) return self - + def add_texts(self, texts: list[Document], **kwargs): """Add new texts to existing index.""" lock_name = f"keyword_indexing_lock_{self.dataset.id}" with redis_client.lock(lock_name, timeout=600): keyword_table = self._get_dataset_keyword_table() keywords_list = kwargs.get("keywords_list") - + for i, text in enumerate(texts): if keywords_list: keywords = keywords_list[i] if not keywords: keywords = self._keyword_handler.extract_keywords( - text.page_content, - self._config.max_keywords_per_chunk + text.page_content, self._config.max_keywords_per_chunk ) else: keywords = self._keyword_handler.extract_keywords( - text.page_content, - self._config.max_keywords_per_chunk + text.page_content, self._config.max_keywords_per_chunk ) - + if text.metadata is not None: - self._update_segment_keywords( - self.dataset.id, - text.metadata["doc_id"], - list(keywords) - ) + self._update_segment_keywords(self.dataset.id, text.metadata["doc_id"], list(keywords)) keyword_table = self._add_text_to_keyword_table( - keyword_table or {}, - text.metadata["doc_id"], - list(keywords) + keyword_table or {}, text.metadata["doc_id"], list(keywords) ) - + self._save_dataset_keyword_table(keyword_table) - + def text_exists(self, id: str) -> bool: """Check if text exists in index.""" keyword_table = self._get_dataset_keyword_table() if keyword_table is None: return False return id in set.union(*keyword_table.values()) if keyword_table else False - + def delete_by_ids(self, ids: list[str]) -> None: """Delete texts by IDs.""" lock_name = f"keyword_indexing_lock_{self.dataset.id}" @@ -138,7 +125,7 @@ def delete_by_ids(self, ids: list[str]) -> None: if keyword_table is not None: keyword_table = self._delete_ids_from_keyword_table(keyword_table, ids) self._save_dataset_keyword_table(keyword_table) - + def delete(self) -> None: """Delete entire index.""" lock_name = f"keyword_indexing_lock_{self.dataset.id}" @@ -150,29 +137,22 @@ def delete(self) -> None: if dataset_keyword_table.data_source_type != "database": file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt" storage.delete(file_key) - + def search(self, query: str, **kwargs: Any) -> list[Document]: """Search documents using keywords.""" keyword_table = self._get_dataset_keyword_table() k = kwargs.get("top_k", 4) - - sorted_chunk_indices = self._retrieve_ids_by_query( - keyword_table or {}, - query, - k - ) - + + sorted_chunk_indices = self._retrieve_ids_by_query(keyword_table or {}, query, k) + documents = [] for chunk_index in sorted_chunk_indices: segment = ( db.session.query(DocumentSegment) - .filter( - DocumentSegment.dataset_id == self.dataset.id, - DocumentSegment.index_node_id == chunk_index - ) + .filter(DocumentSegment.dataset_id == self.dataset.id, DocumentSegment.index_node_id == chunk_index) .first() ) - + if segment: documents.append( Document( @@ -185,9 +165,9 @@ def search(self, query: str, **kwargs: Any) -> list[Document]: }, ) ) - + return documents - + def _get_dataset_keyword_table(self) -> Optional[dict]: """Get keyword table from storage.""" dataset_keyword_table = self.dataset.dataset_keyword_table @@ -196,21 +176,17 @@ def _get_dataset_keyword_table(self) -> Optional[dict]: if keyword_table_dict: return dict(keyword_table_dict["__data__"]["table"]) return {} - + def _save_dataset_keyword_table(self, keyword_table): """Save keyword table to storage.""" table_dict = { "__type__": "keyword_table", - "__data__": { - "index_id": self.dataset.id, - "summary": None, - "table": keyword_table - } + "__data__": {"index_id": self.dataset.id, "summary": None, "table": keyword_table}, } - + dataset_keyword_table = self.dataset.dataset_keyword_table data_source_type = dataset_keyword_table.data_source_type - + if data_source_type == "database": dataset_keyword_table.keyword_table = json.dumps(table_dict, cls=SetEncoder) db.session.commit() @@ -218,11 +194,8 @@ def _save_dataset_keyword_table(self, keyword_table): file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt" if storage.exists(file_key): storage.delete(file_key) - storage.save( - file_key, - json.dumps(table_dict, cls=SetEncoder).encode("utf-8") - ) - + storage.save(file_key, json.dumps(table_dict, cls=SetEncoder).encode("utf-8")) + def _add_text_to_keyword_table(self, keyword_table: dict, id: str, keywords: list[str]) -> dict: """Add text keywords to table.""" for keyword in keywords: @@ -230,58 +203,48 @@ def _add_text_to_keyword_table(self, keyword_table: dict, id: str, keywords: lis keyword_table[keyword] = set() keyword_table[keyword].add(id) return keyword_table - + def _delete_ids_from_keyword_table(self, keyword_table: dict, ids: list[str]) -> dict: """Delete IDs from keyword table.""" node_idxs_to_delete = set(ids) keywords_to_delete = set() - + for keyword, node_idxs in keyword_table.items(): if node_idxs_to_delete.intersection(node_idxs): keyword_table[keyword] = node_idxs.difference(node_idxs_to_delete) if not keyword_table[keyword]: keywords_to_delete.add(keyword) - + for keyword in keywords_to_delete: del keyword_table[keyword] - + return keyword_table - + def _retrieve_ids_by_query(self, keyword_table: dict, query: str, k: int = 4): """Retrieve document IDs by query.""" keywords = self._keyword_handler.extract_keywords(query) - + # Score documents based on matching keywords chunk_indices_count = defaultdict(int) - keywords_list = [ - keyword for keyword in keywords - if keyword in set(keyword_table.keys()) - ] - + keywords_list = [keyword for keyword in keywords if keyword in set(keyword_table.keys())] + for keyword in keywords_list: for node_id in keyword_table[keyword]: chunk_indices_count[node_id] += 1 - - sorted_chunk_indices = sorted( - chunk_indices_count.keys(), - key=lambda x: chunk_indices_count[x], - reverse=True - ) - + + sorted_chunk_indices = sorted(chunk_indices_count.keys(), key=lambda x: chunk_indices_count[x], reverse=True) + return sorted_chunk_indices[:k] - + def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: list[str]): """Update segment keywords in database.""" document_segment = ( db.session.query(DocumentSegment) - .filter( - DocumentSegment.dataset_id == dataset_id, - DocumentSegment.index_node_id == node_id - ) + .filter(DocumentSegment.dataset_id == dataset_id, DocumentSegment.index_node_id == node_id) .first() ) - + if document_segment: document_segment.keywords = keywords db.session.add(document_segment) - db.session.commit() + db.session.commit() diff --git a/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py index 0eaf230300d321..f920c0dd31dfd5 100644 --- a/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py +++ b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py @@ -1,16 +1,17 @@ -import re +from collections import defaultdict from typing import Optional, Set + import MeCab -from collections import defaultdict from core.rag.datasource.keyword.mecab.stopwords import STOPWORDS + class MeCabKeywordTableHandler: """Japanese keyword extraction using MeCab morphological analyzer.""" - + def __init__(self, dictionary_path: str = "", user_dictionary_path: str = ""): """Initialize MeCab tokenizer. - + Args: dictionary_path: Path to custom system dictionary user_dictionary_path: Path to user dictionary @@ -22,109 +23,102 @@ def __init__(self, dictionary_path: str = "", user_dictionary_path: str = ""): mecab_args.append(f"-d {dictionary_path}") if user_dictionary_path: mecab_args.append(f"-u {user_dictionary_path}") - + self.tagger = MeCab.Tagger(" ".join(mecab_args)) - self.tagger.parse('') # Force initialization to catch dictionary errors - + self.tagger.parse("") # Force initialization to catch dictionary errors + except RuntimeError as e: raise RuntimeError(f"Failed to initialize MeCab: {str(e)}") - + # POS weights for scoring self.pos_weights = { - '名詞': 1.0, # Nouns - '動詞': 0.8, # Verbs - '形容詞': 0.6, # Adjectives - '副詞': 0.4, # Adverbs - '連体詞': 0.3, # Adnominal adjectives - '感動詞': 0.2, # Interjections + "名詞": 1.0, # Nouns + "動詞": 0.8, # Verbs + "形容詞": 0.6, # Adjectives + "副詞": 0.4, # Adverbs + "連体詞": 0.3, # Adnominal adjectives + "感動詞": 0.2, # Interjections } self.min_score = 0.3 - + def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10) -> Set[str]: """Extract keywords from Japanese text using MeCab. - + Args: text: Input text to extract keywords from max_keywords_per_chunk: Maximum number of keywords to extract - + Returns: Set of extracted keywords """ if not text or not text.strip(): return set() - + try: # Parse text with MeCab - self.tagger.parse('') # Clear tagger state + self.tagger.parse("") # Clear tagger state node = self.tagger.parseToNode(text) - + # Calculate term frequencies and scores term_scores = defaultdict(float) while node: - features = node.feature.split(',') + features = node.feature.split(",") if len(features) > 0: pos = features[0] # Part of speech - pos_subtype = features[1] if len(features) > 1 else '' + pos_subtype = features[1] if len(features) > 1 else "" base_form = features[6] if len(features) > 6 else node.surface - + # Score the term based on its POS if pos in self.pos_weights and base_form not in STOPWORDS: score = self.pos_weights[pos] # Boost proper nouns and technical terms - if pos == '名詞' and pos_subtype in ['固有名詞', '専門用語']: + if pos == "名詞" and pos_subtype in ["固有名詞", "専門用語"]: score *= 1.5 if len(base_form) > 1: # Filter out single characters term_scores[base_form] += score - + node = node.next - + # Get top scoring terms - sorted_terms = sorted( - term_scores.items(), - key=lambda x: x[1], - reverse=True - ) - + sorted_terms = sorted(term_scores.items(), key=lambda x: x[1], reverse=True) + # Filter by minimum score and take top N - keywords = { - term for term, score in sorted_terms - if score >= self.min_score - } - + keywords = {term for term, score in sorted_terms if score >= self.min_score} + if max_keywords_per_chunk: keywords = set(list(keywords)[:max_keywords_per_chunk]) - + # Expand with compound terms expanded_keywords = self._expand_tokens_with_compounds(keywords, text) - + return expanded_keywords - + except Exception as e: raise RuntimeError(f"Failed to extract keywords: {str(e)}") - + def _expand_tokens_with_compounds(self, keywords: Set[str], text: str) -> Set[str]: """Expand keywords with compound terms. - + This method looks for adjacent keywords in the original text to capture compound terms like '機械学習' (machine learning) or '自然言語処理' (natural language processing). """ results = set(keywords) - + try: # Parse again to find compounds node = self.tagger.parseToNode(text) compound = [] compound_readings = [] # For handling different forms of the same compound - + while node: - features = node.feature.split(',') + features = node.feature.split(",") if len(features) > 6: base_form = features[6] reading = features[7] if len(features) > 7 else None else: base_form = node.surface reading = None - + if base_form in keywords: compound.append(base_form) if reading: @@ -132,21 +126,21 @@ def _expand_tokens_with_compounds(self, keywords: Set[str], text: str) -> Set[st else: if len(compound) > 1: # Add the compound term - compound_term = ''.join(compound) + compound_term = "".join(compound) if len(compound_term) > 1: results.add(compound_term) # If readings are available, add normalized form if compound_readings: - normalized_term = ''.join(compound_readings) + normalized_term = "".join(compound_readings) if normalized_term != compound_term: results.add(normalized_term) compound = [] compound_readings = [] - + node = node.next - + return results - + except Exception as e: # If compound expansion fails, return original keywords - return keywords + return keywords diff --git a/api/core/rag/datasource/keyword/mecab/stopwords.py b/api/core/rag/datasource/keyword/mecab/stopwords.py index 13802ac947a1fb..11eba7415d24ad 100644 --- a/api/core/rag/datasource/keyword/mecab/stopwords.py +++ b/api/core/rag/datasource/keyword/mecab/stopwords.py @@ -1,36 +1,190 @@ STOPWORDS = { # Japanese particles and basic stopwords - "は", "が", "の", "に", "を", "で", "へ", "と", "から", "より", "まで", "によって", - "あそこ", "あっ", "あの", "あのかた", "あの人", "あり", "あります", "ある", "あれ", - "い", "いう", "います", "いる", "う", "うち", "え", "お", "および", "おり", "おります", - "か", "かつて", "から", "が", "き", "ここ", "こちら", "こと", "この", "これ", "これら", - "さ", "さらに", "し", "しかし", "する", "ず", "せ", "せる", "そこ", "そして", "その", - "その他", "その後", "それ", "それぞれ", "それで", "た", "ただし", "たち", "ため", "たり", - "だ", "だっ", "だれ", "つ", "て", "で", "でき", "できる", "です", "では", "でも", "と", - "という", "といった", "とき", "ところ", "として", "とともに", "とも", "と共に", "どこ", - "どの", "な", "ない", "なお", "なかっ", "ながら", "なく", "なっ", "など", "なに", "なら", - "なり", "なる", "なん", "に", "において", "における", "について", "にて", "によって", "により", - "による", "に対して", "に対する", "に関する", "の", "ので", "のみ", "は", "ば", "へ", "ほか", - "ほとんど", "ほど", "ます", "また", "または", "まで", "も", "もの", "ものの", "や", "よう", - "より", "ら", "られ", "られる", "れ", "れる", "を", "ん", "何", "及び", "彼", "彼女", - "我々", "特に", "私", "私達", "貴方", "貴方方", - + "は", + "が", + "の", + "に", + "を", + "で", + "へ", + "と", + "から", + "より", + "まで", + "によって", + "あそこ", + "あっ", + "あの", + "あのかた", + "あの人", + "あり", + "あります", + "ある", + "あれ", + "い", + "いう", + "います", + "いる", + "う", + "うち", + "え", + "お", + "および", + "おり", + "おります", + "か", + "かつて", + "き", + "ここ", + "こちら", + "こと", + "この", + "これ", + "これら", + "さ", + "さらに", + "し", + "しかし", + "する", + "ず", + "せ", + "せる", + "そこ", + "そして", + "その", + "その他", + "その後", + "それ", + "それぞれ", + "それで", + "た", + "ただし", + "たち", + "ため", + "たり", + "だ", + "だっ", + "だれ", + "つ", + "て", + "でき", + "できる", + "です", + "では", + "でも", + "という", + "といった", + "とき", + "ところ", + "として", + "とともに", + "とも", + "と共に", + "どこ", + "どの", + "な", + "ない", + "なお", + "なかっ", + "ながら", + "なく", + "なっ", + "など", + "なに", + "なら", + "なり", + "なる", + "なん", + "において", + "における", + "について", + "にて", + "により", + "による", + "に対して", + "に対する", + "に関する", + "ので", + "のみ", + "ば", + "ほか", + "ほとんど", + "ほど", + "ます", + "また", + "または", + "も", + "もの", + "ものの", + "や", + "よう", + "ら", + "られ", + "られる", + "れ", + "れる", + "ん", + "何", + "及び", + "彼", + "彼女", + "我々", + "特に", + "私", + "私達", + "貴方", + "貴方方", # Japanese auxiliary verbs - "です", "ます", "でした", "ました", "である", "だ", "な", "だった", - + "でした", + "ました", + "である", + "だった", # Japanese pronouns - "これ", "それ", "あれ", "この", "その", "あの", "ここ", "そこ", "あそこ", - # Japanese common words - "いる", "ある", "なる", "する", "できる", "おる", "いく", "くる", - + "おる", + "いく", + "くる", # Numbers - "一", "二", "三", "四", "五", "六", "七", "八", "九", "十", - "1", "2", "3", "4", "5", "6", "7", "8", "9", "0", - + "一", + "二", + "三", + "四", + "五", + "六", + "七", + "八", + "九", + "十", + "1", + "2", + "3", + "4", + "5", + "6", + "7", + "8", + "9", + "0", # Punctuation - "、", "。", "「", "」", "『", "』", "(", ")", "[", "]", - + "、", + "。", + "「", + "」", + "『", + "』", + "(", + ")", + "[", + "]", # Common English stopwords (for mixed text) - "the", "is", "at", "which", "on", "in", "and", "or", "a", "an", -} + "the", + "is", + "at", + "which", + "on", + "in", + "and", + "or", + "a", + "an", +} From 610d069b69c9a3a75140d3da40377c42925604e1 Mon Sep 17 00:00:00 2001 From: "Dr. Kiji" Date: Thu, 2 Jan 2025 19:15:59 +0900 Subject: [PATCH 3/9] fix lint --- api/core/rag/datasource/keyword/mecab/mecab.py | 2 +- .../keyword/mecab/mecab_keyword_table_handler.py | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/api/core/rag/datasource/keyword/mecab/mecab.py b/api/core/rag/datasource/keyword/mecab/mecab.py index 660b38650f083e..70841c8efcf71e 100644 --- a/api/core/rag/datasource/keyword/mecab/mecab.py +++ b/api/core/rag/datasource/keyword/mecab/mecab.py @@ -61,7 +61,7 @@ def _init_handler(self): self._keyword_handler.pos_weights = self._config.pos_weights self._keyword_handler.min_score = self._config.score_threshold except Exception as e: - logger.error(f"Failed to initialize MeCab handler: {str(e)}") + logger.exception("Failed to initialize MeCab handler") raise KeywordProcessorError(f"MeCab initialization failed: {str(e)}") def create(self, texts: list[Document], **kwargs) -> BaseKeyword: diff --git a/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py index f920c0dd31dfd5..7efb57ce168468 100644 --- a/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py +++ b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py @@ -1,5 +1,6 @@ from collections import defaultdict -from typing import Optional, Set +from operator import itemgetter +from typing import Optional import MeCab @@ -41,7 +42,7 @@ def __init__(self, dictionary_path: str = "", user_dictionary_path: str = ""): } self.min_score = 0.3 - def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10) -> Set[str]: + def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10) -> set[str]: """Extract keywords from Japanese text using MeCab. Args: @@ -80,7 +81,7 @@ def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10 node = node.next # Get top scoring terms - sorted_terms = sorted(term_scores.items(), key=lambda x: x[1], reverse=True) + sorted_terms = sorted(term_scores.items(), key=itemgetter(1), reverse=True) # Filter by minimum score and take top N keywords = {term for term, score in sorted_terms if score >= self.min_score} @@ -96,7 +97,7 @@ def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10 except Exception as e: raise RuntimeError(f"Failed to extract keywords: {str(e)}") - def _expand_tokens_with_compounds(self, keywords: Set[str], text: str) -> Set[str]: + def _expand_tokens_with_compounds(self, keywords: set[str], text: str) -> set[str]: """Expand keywords with compound terms. This method looks for adjacent keywords in the original text to capture From 75dd8677b9595c9b67ea5064c9aa9c3324fb6fb4 Mon Sep 17 00:00:00 2001 From: "Dr. Kiji" Date: Thu, 2 Jan 2025 19:54:34 +0900 Subject: [PATCH 4/9] improve the consistancy --- .../rag/datasource/keyword/mecab/mecab.py | 466 +++++++++++++----- 1 file changed, 339 insertions(+), 127 deletions(-) diff --git a/api/core/rag/datasource/keyword/mecab/mecab.py b/api/core/rag/datasource/keyword/mecab/mecab.py index 70841c8efcf71e..6b250f530c0be4 100644 --- a/api/core/rag/datasource/keyword/mecab/mecab.py +++ b/api/core/rag/datasource/keyword/mecab/mecab.py @@ -1,7 +1,8 @@ import json import logging +import os from collections import defaultdict -from typing import Any, Optional +from typing import Any, Dict, List, Optional, Set from core.rag.datasource.keyword.keyword_base import BaseKeyword from core.rag.datasource.keyword.mecab.config import MeCabConfig @@ -10,32 +11,28 @@ from extensions.ext_database import db from extensions.ext_redis import redis_client from extensions.ext_storage import storage -from models.dataset import Dataset, DocumentSegment +from models.dataset import Dataset, DatasetKeywordTable, DocumentSegment logger = logging.getLogger(__name__) class KeywordProcessorError(Exception): """Base error for keyword processing.""" - pass class KeywordExtractionError(KeywordProcessorError): """Error during keyword extraction.""" - pass class KeywordStorageError(KeywordProcessorError): """Error during storage operations.""" - pass class SetEncoder(json.JSONEncoder): """JSON encoder that handles sets.""" - def default(self, obj): if isinstance(obj, set): return list(obj) @@ -48,164 +45,283 @@ class MeCab(BaseKeyword): def __init__(self, dataset: Dataset): super().__init__(dataset) self._config = MeCabConfig() - self._keyword_handler = None + self._keyword_handler: MeCabKeywordTableHandler = MeCabKeywordTableHandler() self._init_handler() - def _init_handler(self): + def _init_handler(self) -> None: """Initialize MeCab handler with configuration.""" try: self._keyword_handler = MeCabKeywordTableHandler( - dictionary_path=self._config.dictionary_path, user_dictionary_path=self._config.user_dictionary_path + dictionary_path=self._config.dictionary_path, + user_dictionary_path=self._config.user_dictionary_path ) if self._config.pos_weights: self._keyword_handler.pos_weights = self._config.pos_weights self._keyword_handler.min_score = self._config.score_threshold except Exception as e: logger.exception("Failed to initialize MeCab handler") - raise KeywordProcessorError(f"MeCab initialization failed: {str(e)}") + raise KeywordProcessorError("MeCab initialization failed: {}".format(str(e))) - def create(self, texts: list[Document], **kwargs) -> BaseKeyword: + def create(self, texts: List[Document], **kwargs: Any) -> BaseKeyword: """Create keyword index for documents.""" - lock_name = f"keyword_indexing_lock_{self.dataset.id}" - with redis_client.lock(lock_name, timeout=600): - keyword_table = self._get_dataset_keyword_table() - - for text in texts: - keywords = self._keyword_handler.extract_keywords( - text.page_content, self._config.max_keywords_per_chunk - ) - if text.metadata is not None: - self._update_segment_keywords(self.dataset.id, text.metadata["doc_id"], list(keywords)) - keyword_table = self._add_text_to_keyword_table( - keyword_table or {}, text.metadata["doc_id"], list(keywords) - ) - - self._save_dataset_keyword_table(keyword_table) + if not texts: return self - def add_texts(self, texts: list[Document], **kwargs): - """Add new texts to existing index.""" - lock_name = f"keyword_indexing_lock_{self.dataset.id}" - with redis_client.lock(lock_name, timeout=600): - keyword_table = self._get_dataset_keyword_table() - keywords_list = kwargs.get("keywords_list") + lock_name = "keyword_indexing_lock_{}".format(self.dataset.id) + try: + with redis_client.lock(lock_name, timeout=600): + keyword_table = self._get_dataset_keyword_table() + if keyword_table is None: + keyword_table = {} + + for text in texts: + if not text.page_content or not text.metadata or "doc_id" not in text.metadata: + logger.warning("Skipping invalid document: {}".format(text)) + continue - for i, text in enumerate(texts): - if keywords_list: - keywords = keywords_list[i] - if not keywords: + try: keywords = self._keyword_handler.extract_keywords( text.page_content, self._config.max_keywords_per_chunk ) - else: - keywords = self._keyword_handler.extract_keywords( - text.page_content, self._config.max_keywords_per_chunk - ) + self._update_segment_keywords(self.dataset.id, text.metadata["doc_id"], list(keywords)) + keyword_table = self._add_text_to_keyword_table( + keyword_table, text.metadata["doc_id"], list(keywords) + ) + except Exception as e: + logger.exception("Failed to process document: {}".format(text.metadata.get("doc_id"))) + raise KeywordExtractionError("Failed to extract keywords: {}".format(str(e))) - if text.metadata is not None: - self._update_segment_keywords(self.dataset.id, text.metadata["doc_id"], list(keywords)) - keyword_table = self._add_text_to_keyword_table( - keyword_table or {}, text.metadata["doc_id"], list(keywords) - ) + try: + self._save_dataset_keyword_table(keyword_table) + except Exception as e: + logger.exception("Failed to save keyword table") + raise KeywordStorageError("Failed to save keyword table: {}".format(str(e))) - self._save_dataset_keyword_table(keyword_table) + except Exception as e: + if not isinstance(e, (KeywordExtractionError, KeywordStorageError)): + logger.exception("Unexpected error during keyword indexing") + raise KeywordProcessorError("Keyword indexing failed: {}".format(str(e))) + raise + + return self + + def add_texts(self, texts: List[Document], **kwargs: Any) -> None: + """Add new texts to existing index.""" + if not texts: + return + + lock_name = "keyword_indexing_lock_{}".format(self.dataset.id) + try: + with redis_client.lock(lock_name, timeout=600): + keyword_table = self._get_dataset_keyword_table() + if keyword_table is None: + keyword_table = {} + keywords_list = kwargs.get("keywords_list") + + for i, text in enumerate(texts): + if not text.page_content or not text.metadata or "doc_id" not in text.metadata: + logger.warning("Skipping invalid document: {}".format(text)) + continue + + try: + if keywords_list: + keywords = keywords_list[i] + if not keywords: + keywords = self._keyword_handler.extract_keywords( + text.page_content, self._config.max_keywords_per_chunk + ) + else: + keywords = self._keyword_handler.extract_keywords( + text.page_content, self._config.max_keywords_per_chunk + ) + + self._update_segment_keywords(self.dataset.id, text.metadata["doc_id"], list(keywords)) + keyword_table = self._add_text_to_keyword_table( + keyword_table, text.metadata["doc_id"], list(keywords) + ) + except Exception as e: + logger.exception("Failed to process document: {}".format(text.metadata.get("doc_id"))) + continue + + try: + self._save_dataset_keyword_table(keyword_table) + except Exception as e: + logger.exception("Failed to save keyword table") + raise KeywordStorageError("Failed to save keyword table: {}".format(str(e))) + + except Exception as e: + if not isinstance(e, KeywordStorageError): + logger.exception("Unexpected error during keyword indexing") + raise KeywordProcessorError("Keyword indexing failed: {}".format(str(e))) + raise def text_exists(self, id: str) -> bool: """Check if text exists in index.""" + if not id: + return False + keyword_table = self._get_dataset_keyword_table() if keyword_table is None: return False return id in set.union(*keyword_table.values()) if keyword_table else False - def delete_by_ids(self, ids: list[str]) -> None: + def delete_by_ids(self, ids: List[str]) -> None: """Delete texts by IDs.""" - lock_name = f"keyword_indexing_lock_{self.dataset.id}" - with redis_client.lock(lock_name, timeout=600): - keyword_table = self._get_dataset_keyword_table() - if keyword_table is not None: - keyword_table = self._delete_ids_from_keyword_table(keyword_table, ids) - self._save_dataset_keyword_table(keyword_table) + if not ids: + return + + lock_name = "keyword_indexing_lock_{}".format(self.dataset.id) + try: + with redis_client.lock(lock_name, timeout=600): + keyword_table = self._get_dataset_keyword_table() + if keyword_table is not None: + keyword_table = self._delete_ids_from_keyword_table(keyword_table, ids) + self._save_dataset_keyword_table(keyword_table) + except Exception as e: + logger.exception("Failed to delete documents") + raise KeywordStorageError("Failed to delete documents: {}".format(str(e))) def delete(self) -> None: """Delete entire index.""" - lock_name = f"keyword_indexing_lock_{self.dataset.id}" - with redis_client.lock(lock_name, timeout=600): - dataset_keyword_table = self.dataset.dataset_keyword_table - if dataset_keyword_table: - db.session.delete(dataset_keyword_table) - db.session.commit() - if dataset_keyword_table.data_source_type != "database": - file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt" - storage.delete(file_key) + lock_name = "keyword_indexing_lock_{}".format(self.dataset.id) + try: + with redis_client.lock(lock_name, timeout=600): + dataset_keyword_table = self.dataset.dataset_keyword_table + if dataset_keyword_table: + db.session.delete(dataset_keyword_table) + db.session.commit() + if dataset_keyword_table.data_source_type != "database": + file_key = os.path.join("keyword_files", self.dataset.tenant_id, self.dataset.id + ".txt") + storage.delete(file_key) + except Exception as e: + logger.exception("Failed to delete index") + raise KeywordStorageError("Failed to delete index: {}".format(str(e))) - def search(self, query: str, **kwargs: Any) -> list[Document]: + def search(self, query: str, **kwargs: Any) -> List[Document]: """Search documents using keywords.""" - keyword_table = self._get_dataset_keyword_table() - k = kwargs.get("top_k", 4) + if not query: + return [] - sorted_chunk_indices = self._retrieve_ids_by_query(keyword_table or {}, query, k) + try: + keyword_table = self._get_dataset_keyword_table() + k = kwargs.get("top_k", 4) + + sorted_chunk_indices = self._retrieve_ids_by_query(keyword_table or {}, query, k) + if not sorted_chunk_indices: + return [] + + documents = [] + for chunk_index in sorted_chunk_indices: + segment = ( + db.session.query(DocumentSegment) + .filter( + DocumentSegment.dataset_id == self.dataset.id, + DocumentSegment.index_node_id == chunk_index + ) + .first() + ) - documents = [] - for chunk_index in sorted_chunk_indices: - segment = ( - db.session.query(DocumentSegment) - .filter(DocumentSegment.dataset_id == self.dataset.id, DocumentSegment.index_node_id == chunk_index) - .first() - ) + if segment: + documents.append( + Document( + page_content=segment.content, + metadata={ + "doc_id": chunk_index, + "doc_hash": segment.index_node_hash, + "document_id": segment.document_id, + "dataset_id": segment.dataset_id, + }, + ) + ) + + return documents + except Exception as e: + logger.exception("Failed to search documents") + raise KeywordProcessorError("Search failed: {}".format(str(e))) - if segment: - documents.append( - Document( - page_content=segment.content, - metadata={ - "doc_id": chunk_index, - "doc_hash": segment.index_node_hash, - "document_id": segment.document_id, - "dataset_id": segment.dataset_id, + def _get_dataset_keyword_table(self) -> Optional[Dict[str, Set[str]]]: + """Get keyword table from storage.""" + try: + dataset_keyword_table = self.dataset.dataset_keyword_table + if dataset_keyword_table: + keyword_table_dict = dataset_keyword_table.keyword_table_dict + if keyword_table_dict: + return dict(keyword_table_dict["__data__"]["table"]) + else: + # Create new dataset keyword table if it doesn't exist + from configs import dify_config + + keyword_data_source_type = dify_config.KEYWORD_DATA_SOURCE_TYPE + dataset_keyword_table = DatasetKeywordTable( + dataset_id=self.dataset.id, + keyword_table="", + data_source_type=keyword_data_source_type, + ) + if keyword_data_source_type == "database": + dataset_keyword_table.keyword_table = json.dumps( + { + "__type__": "keyword_table", + "__data__": {"index_id": self.dataset.id, "summary": None, "table": {}}, }, + cls=SetEncoder, ) - ) + db.session.add(dataset_keyword_table) + db.session.commit() - return documents + return {} + except Exception as e: + logger.exception("Failed to get keyword table") + raise KeywordStorageError("Failed to get keyword table: {}".format(str(e))) - def _get_dataset_keyword_table(self) -> Optional[dict]: - """Get keyword table from storage.""" - dataset_keyword_table = self.dataset.dataset_keyword_table - if dataset_keyword_table: - keyword_table_dict = dataset_keyword_table.keyword_table_dict - if keyword_table_dict: - return dict(keyword_table_dict["__data__"]["table"]) - return {} - - def _save_dataset_keyword_table(self, keyword_table): + def _save_dataset_keyword_table(self, keyword_table: Dict[str, Set[str]]) -> None: """Save keyword table to storage.""" + if keyword_table is None: + raise ValueError("Keyword table cannot be None") + table_dict = { "__type__": "keyword_table", "__data__": {"index_id": self.dataset.id, "summary": None, "table": keyword_table}, } - dataset_keyword_table = self.dataset.dataset_keyword_table - data_source_type = dataset_keyword_table.data_source_type + try: + dataset_keyword_table = self.dataset.dataset_keyword_table + if not dataset_keyword_table: + raise KeywordStorageError("Dataset keyword table not found") - if data_source_type == "database": - dataset_keyword_table.keyword_table = json.dumps(table_dict, cls=SetEncoder) - db.session.commit() - else: - file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt" - if storage.exists(file_key): - storage.delete(file_key) - storage.save(file_key, json.dumps(table_dict, cls=SetEncoder).encode("utf-8")) + data_source_type = dataset_keyword_table.data_source_type - def _add_text_to_keyword_table(self, keyword_table: dict, id: str, keywords: list[str]) -> dict: + if data_source_type == "database": + dataset_keyword_table.keyword_table = json.dumps(table_dict, cls=SetEncoder) + db.session.commit() + else: + file_key = os.path.join("keyword_files", self.dataset.tenant_id, self.dataset.id + ".txt") + if storage.exists(file_key): + storage.delete(file_key) + storage.save(file_key, json.dumps(table_dict, cls=SetEncoder).encode("utf-8")) + except Exception as e: + logger.exception("Failed to save keyword table") + raise KeywordStorageError("Failed to save keyword table: {}".format(str(e))) + + def _add_text_to_keyword_table( + self, keyword_table: Dict[str, Set[str]], id: str, keywords: List[str] + ) -> Dict[str, Set[str]]: """Add text keywords to table.""" + if not id or not keywords: + return keyword_table + for keyword in keywords: if keyword not in keyword_table: keyword_table[keyword] = set() keyword_table[keyword].add(id) return keyword_table - def _delete_ids_from_keyword_table(self, keyword_table: dict, ids: list[str]) -> dict: + def _delete_ids_from_keyword_table( + self, keyword_table: Dict[str, Set[str]], ids: List[str] + ) -> Dict[str, Set[str]]: """Delete IDs from keyword table.""" + if not keyword_table or not ids: + return keyword_table + node_idxs_to_delete = set(ids) keywords_to_delete = set() @@ -220,31 +336,127 @@ def _delete_ids_from_keyword_table(self, keyword_table: dict, ids: list[str]) -> return keyword_table - def _retrieve_ids_by_query(self, keyword_table: dict, query: str, k: int = 4): + def _retrieve_ids_by_query( + self, keyword_table: Dict[str, Set[str]], query: str, k: int = 4 + ) -> List[str]: """Retrieve document IDs by query.""" - keywords = self._keyword_handler.extract_keywords(query) + if not query or not keyword_table: + return [] - # Score documents based on matching keywords - chunk_indices_count = defaultdict(int) - keywords_list = [keyword for keyword in keywords if keyword in set(keyword_table.keys())] + try: + keywords = self._keyword_handler.extract_keywords(query) - for keyword in keywords_list: - for node_id in keyword_table[keyword]: - chunk_indices_count[node_id] += 1 + # Score documents based on matching keywords + chunk_indices_count: dict[str, int] = defaultdict(int) + keywords_list = [keyword for keyword in keywords if keyword in set(keyword_table.keys())] - sorted_chunk_indices = sorted(chunk_indices_count.keys(), key=lambda x: chunk_indices_count[x], reverse=True) + for keyword in keywords_list: + for node_id in keyword_table[keyword]: + chunk_indices_count[node_id] += 1 - return sorted_chunk_indices[:k] + # Sort by score in descending order + sorted_chunk_indices = sorted( + chunk_indices_count.keys(), + key=lambda x: chunk_indices_count[x], + reverse=True, + ) + + return sorted_chunk_indices[:k] + except Exception as e: + logger.exception("Failed to retrieve IDs by query") + raise KeywordExtractionError("Failed to retrieve IDs: {}".format(str(e))) - def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: list[str]): + def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: List[str]) -> None: """Update segment keywords in database.""" - document_segment = ( - db.session.query(DocumentSegment) - .filter(DocumentSegment.dataset_id == dataset_id, DocumentSegment.index_node_id == node_id) - .first() - ) - - if document_segment: - document_segment.keywords = keywords - db.session.add(document_segment) - db.session.commit() + if not dataset_id or not node_id: + return + + try: + document_segment = ( + db.session.query(DocumentSegment) + .filter(DocumentSegment.dataset_id == dataset_id, DocumentSegment.index_node_id == node_id) + .first() + ) + + if document_segment: + document_segment.keywords = keywords + db.session.add(document_segment) + db.session.commit() + except Exception as e: + logger.exception("Failed to update segment keywords") + raise KeywordStorageError("Failed to update segment keywords: {}".format(str(e))) + + def create_segment_keywords(self, node_id: str, keywords: List[str]) -> None: + """Create keywords for a single segment. + + Args: + node_id: The segment node ID + keywords: List of keywords to add + """ + if not node_id or not keywords: + return + + try: + keyword_table = self._get_dataset_keyword_table() + self._update_segment_keywords(self.dataset.id, node_id, keywords) + keyword_table = self._add_text_to_keyword_table(keyword_table or {}, node_id, keywords) + self._save_dataset_keyword_table(keyword_table) + except Exception as e: + logger.exception("Failed to create segment keywords") + raise KeywordProcessorError("Failed to create segment keywords: {}".format(str(e))) + + def multi_create_segment_keywords(self, pre_segment_data_list: List[Dict[str, Any]]) -> None: + """Create keywords for multiple segments in batch.""" + if not pre_segment_data_list: + return + + try: + keyword_table = self._get_dataset_keyword_table() + if keyword_table is None: + keyword_table = {} + + for pre_segment_data in pre_segment_data_list: + segment = pre_segment_data["segment"] + if not segment: + continue + + try: + if pre_segment_data.get("keywords"): + segment.keywords = pre_segment_data["keywords"] + keyword_table = self._add_text_to_keyword_table( + keyword_table, segment.index_node_id, pre_segment_data["keywords"] + ) + else: + keywords = self._keyword_handler.extract_keywords( + segment.content, self._config.max_keywords_per_chunk + ) + segment.keywords = list(keywords) + keyword_table = self._add_text_to_keyword_table( + keyword_table, segment.index_node_id, list(keywords) + ) + except Exception as e: + logger.exception("Failed to process segment: {}".format(segment.index_node_id)) + continue + + self._save_dataset_keyword_table(keyword_table) + except Exception as e: + logger.exception("Failed to create multiple segment keywords") + raise KeywordProcessorError("Failed to create multiple segment keywords: {}".format(str(e))) + + def update_segment_keywords_index(self, node_id: str, keywords: List[str]) -> None: + """Update keywords index for a segment. + + Args: + node_id: The segment node ID + keywords: List of keywords to update + """ + if not node_id or not keywords: + return + + try: + keyword_table = self._get_dataset_keyword_table() + keyword_table = self._add_text_to_keyword_table(keyword_table or {}, node_id, keywords) + self._save_dataset_keyword_table(keyword_table) + except Exception as e: + logger.exception("Failed to update segment keywords index") + raise KeywordStorageError("Failed to update segment keywords index: {}".format(str(e))) From 4f5a4e7194e796019624680cb15d0a0d895eaa4c Mon Sep 17 00:00:00 2001 From: "Dr. Kiji" Date: Thu, 2 Jan 2025 20:11:44 +0900 Subject: [PATCH 5/9] [WIP] before final test --- .../rag/datasource/keyword/mecab/README.md | 328 +++++++++++------- .../rag/datasource/keyword/mecab/mecab.py | 126 +++++-- 2 files changed, 287 insertions(+), 167 deletions(-) diff --git a/api/core/rag/datasource/keyword/mecab/README.md b/api/core/rag/datasource/keyword/mecab/README.md index f589c96d442ba2..cdf44dc5c74535 100644 --- a/api/core/rag/datasource/keyword/mecab/README.md +++ b/api/core/rag/datasource/keyword/mecab/README.md @@ -1,6 +1,6 @@ -# MeCab Keyword Processor +# MeCab Keyword Processor for Dify -A Japanese text keyword extraction module using MeCab morphological analyzer for the Dify RAG system. +A Japanese text keyword extraction module for Dify's RAG system, powered by MeCab morphological analyzer. ## Overview @@ -85,189 +85,255 @@ Comprehensive Japanese stopword list including: from core.rag.datasource.keyword.keyword_factory import Keyword from models.dataset import Dataset -# Initialize -dataset = Dataset(...) -keyword_processor = Keyword(dataset) # Will use MeCab if KEYWORD_STORE = "mecab" +# Initialize with KEYWORD_STORE = "mecab" in config +keyword_processor = Keyword(dataset) -# Process text +# Process documents documents = [ Document( page_content="自然言語処理は人工知能の重要な分野です。", - metadata={"doc_id": "1", ...} + metadata={"doc_id": "1"} ) ] keyword_processor.create(documents) # Search -results = keyword_processor.search("自然言語処理について") +results = keyword_processor.search("自然言語処理") ``` -### Custom Dictionary Usage +## Configuration + +### Basic Settings + +```python +# In your environment configuration: +KEYWORD_STORE = "mecab" +KEYWORD_DATA_SOURCE_TYPE = "database" # or other supported storage types +``` + +### Advanced Settings ```python -# In your configuration: -KEYWORD_PROCESSOR_CONFIG = { - "dictionary_path": "/path/to/mecab/dict", - "user_dictionary_path": "/path/to/user.dic", +# MeCab-specific configuration +MECAB_CONFIG = { + "max_keywords_per_chunk": 10, + "score_threshold": 0.3, + "dictionary_path": "/path/to/dict", # Optional + "user_dictionary_path": "/path/to/user_dict", # Optional "pos_weights": { - "名詞": 1.2, - "動詞": 0.8, - # ... customize weights + "名詞": 1.0, # Nouns + "動詞": 0.8, # Verbs + "形容詞": 0.6 # Adjectives } } ``` -## Features - -### 1. Keyword Extraction - -- **POS-based Scoring**: - - Weights different parts of speech - - Boosts important terms - - Configurable scoring thresholds - -- **Compound Word Detection**: - - ```python - # Input text: "自然言語処理の研究" - # Detected compounds: - # - "自然言語" - # - "自然言語処理" - # - "言語処理" - ``` +## Key Features -- **Reading Normalization**: +### 1. Intelligent Keyword Extraction - ```python - # Handles variations: - # - "データベース" (katakana) - # - "データベース" (with readings) - # Both normalize to same term - ``` +- Part-of-speech based scoring +- Compound word detection +- Technical term recognition +- Reading normalization for variations -### 2. Storage +### 2. Storage Options -- **Flexible Storage Options**: - - Database storage - - File-based storage - - Redis-based locking for concurrency - -- **Data Structure**: - - ```python - { - "__type__": "keyword_table", - "__data__": { - "index_id": "dataset_id", - "table": { - "keyword1": ["doc_id1", "doc_id2"], - "keyword2": ["doc_id2", "doc_id3"], - } - } - } - ``` +- Database storage (default) +- File-based storage +- Concurrent access support via Redis locking ### 3. Error Handling -- Comprehensive error handling -- Custom exception classes -- Logging integration +- Comprehensive exception handling +- Detailed logging - Graceful fallbacks -## Performance Considerations - -1. **Memory Usage**: - - Efficient keyword table structure - - Batch processing support - - Caching mechanisms - -2. **Concurrency**: - - Redis-based locking - - Transaction handling - - Safe concurrent access - -3. **Optimization Tips**: - - Use appropriate batch sizes - - Configure caching timeouts - - Adjust scoring thresholds - ## Dependencies -- MeCab and Python bindings: +```bash +# Ubuntu/Debian +apt-get install mecab mecab-ipadic-utf8 python3-mecab - ```bash - # Ubuntu/Debian - apt-get install mecab mecab-ipadic-utf8 python3-mecab - - # macOS - brew install mecab mecab-ipadic - pip install mecab-python3 - ``` +# macOS +brew install mecab mecab-ipadic +pip install mecab-python3 +``` ## Best Practices -1. **Dictionary Management**: - - Keep dictionaries updated - - Use domain-specific user dictionaries - - Regular maintenance of custom terms +1. **Performance** + - Use batch processing for large datasets + - Configure appropriate cache timeouts + - Monitor memory usage -2. **Configuration Tuning**: +2. **Customization** + - Update dictionaries regularly - Adjust POS weights for your use case - Set appropriate thresholds - - Monitor and adjust batch sizes -3. **Error Handling**: +3. **Error Handling** - Implement proper logging - - Monitor extraction quality - - Handle edge cases + - Handle dictionary loading errors + - Manage concurrent access + +## Example Usage -## Testing +### Basic Keyword Extraction -Example test cases: +```python +# Extract keywords from text +text = "自然言語処理は人工知能の重要な分野です。" +keywords = keyword_processor.create([ + Document(page_content=text, metadata={"doc_id": "1"}) +]) +``` + +### Custom Dictionary ```python -def test_basic_extraction(): - text = "自然言語処理は人工知能の重要な分野です。" - keywords = handler.extract_keywords(text) - assert "自然言語処理" in keywords - assert "人工知能" in keywords - -def test_compound_words(): - text = "機械学習モデルを使った自然言語処理" - keywords = handler.extract_keywords(text) - assert "機械学習" in keywords - assert "自然言語処理" in keywords - -def test_mixed_text(): - text = "AIを使った自然言語処理のResearch" - keywords = handler.extract_keywords(text) - assert "AI" in keywords - assert "自然言語処理" in keywords - assert "Research" in keywords +# Use custom dictionary +config = MeCabConfig( + dictionary_path="/path/to/dict", + user_dictionary_path="/path/to/user.dic" +) ``` -## Common Issues and Solutions +### Batch Processing -1. **Dictionary Loading Failures**: +```python +# Process multiple documents +documents = [ + Document(page_content=text1, metadata={"doc_id": "1"}), + Document(page_content=text2, metadata={"doc_id": "2"}) +] +keyword_processor.create(documents) +``` + +## Integration with Dify + +The MeCab processor integrates seamlessly with Dify's existing keyword system: + +1. Implements the `BaseKeyword` interface +2. Works with the keyword factory system +3. Supports all standard operations: + - Document indexing + - Keyword extraction + - Search functionality + - Index management + +## Common Issues + +1. **Dictionary Loading** ```python try: - handler = MeCabKeywordTableHandler(dictionary_path=path) - except RuntimeError as e: - # Handle dictionary loading error + keyword_processor.create(documents) + except KeywordProcessorError as e: + logger.error("Dictionary loading failed: %s", str(e)) ``` -2. **Memory Usage**: +2. **Memory Management** ```python - # Use batch processing for large datasets - for batch in chunks(documents, size=100): - process_batch(batch) + # Process in batches + batch_size = 100 + for i in range(0, len(documents), batch_size): + batch = documents[i:i + batch_size] + keyword_processor.create(batch) ``` -3. **Concurrent Access**: +3. **Concurrent Access** ```python - with redis_client.lock(f"lock_{dataset_id}"): - # Safe concurrent operations + # Handled automatically via Redis locks + keyword_processor.create(documents) # Safe for concurrent use ``` + +For more details, refer to the [Dify Documentation](https://docs.dify.ai). + +## Text Processing Examples + +### Compound Words + +The MeCab processor intelligently handles compound words in Japanese text: + +```python +text = "人工知能と機械学習の研究を行っています。" +keywords = keyword_processor.create([ + Document(page_content=text, metadata={"doc_id": "1"}) +]) + +# Extracted keywords include: +# - "人工知能" (artificial intelligence - compound) +# - "機械学習" (machine learning - compound) +# - "研究" (research - single) +``` + +Complex technical terms are properly recognized: + +```python +text = "自然言語処理における深層学習の応用" +# Extracts: +# - "自然言語処理" (natural language processing) +# - "深層学習" (deep learning) +# - "応用" (application) +``` + +### Stopwords Handling + +Common particles and auxiliary words are automatically filtered: + +```python +text = "私はデータベースの設計をしています。" +# Ignores: +# - "は" (particle) +# - "の" (particle) +# - "を" (particle) +# - "います" (auxiliary verb) +# Extracts: +# - "データベース" (database) +# - "設計" (design) +``` + +Mixed language text is also handled appropriately: + +```python +text = "AIシステムのパフォーマンスを改善する。" +# Ignores: +# - "の" (particle) +# - "を" (particle) +# - "する" (auxiliary verb) +# Extracts: +# - "AI" (kept as is) +# - "システム" (system) +# - "パフォーマンス" (performance) +# - "改善" (improvement) +``` + +### Reading Variations + +The processor normalizes different forms of the same word: + +```python +text1 = "データベース設計" # カタカナ +text2 = "データベース設計" # with readings +# Both normalize to the same keywords: +# - "データベース" +# - "設計" +``` + +### Technical Term Boosting + +Technical terms receive higher scores in keyword extraction: + +```python +text = "機械学習モデルを用いた自然言語処理の研究" +# Prioritizes technical terms: +# High score: +# - "機械学習" (machine learning) +# - "自然言語処理" (natural language processing) +# Lower score: +# - "研究" (research) +# - "モデル" (model) +``` diff --git a/api/core/rag/datasource/keyword/mecab/mecab.py b/api/core/rag/datasource/keyword/mecab/mecab.py index 6b250f530c0be4..a3ada85a6dd8a4 100644 --- a/api/core/rag/datasource/keyword/mecab/mecab.py +++ b/api/core/rag/datasource/keyword/mecab/mecab.py @@ -2,7 +2,7 @@ import logging import os from collections import defaultdict -from typing import Any, Dict, List, Optional, Set +from typing import Any, Optional from core.rag.datasource.keyword.keyword_base import BaseKeyword from core.rag.datasource.keyword.mecab.config import MeCabConfig @@ -18,21 +18,25 @@ class KeywordProcessorError(Exception): """Base error for keyword processing.""" + pass class KeywordExtractionError(KeywordProcessorError): """Error during keyword extraction.""" + pass class KeywordStorageError(KeywordProcessorError): """Error during storage operations.""" + pass class SetEncoder(json.JSONEncoder): """JSON encoder that handles sets.""" + def default(self, obj): if isinstance(obj, set): return list(obj) @@ -52,8 +56,7 @@ def _init_handler(self) -> None: """Initialize MeCab handler with configuration.""" try: self._keyword_handler = MeCabKeywordTableHandler( - dictionary_path=self._config.dictionary_path, - user_dictionary_path=self._config.user_dictionary_path + dictionary_path=self._config.dictionary_path, user_dictionary_path=self._config.user_dictionary_path ) if self._config.pos_weights: self._keyword_handler.pos_weights = self._config.pos_weights @@ -62,8 +65,21 @@ def _init_handler(self) -> None: logger.exception("Failed to initialize MeCab handler") raise KeywordProcessorError("MeCab initialization failed: {}".format(str(e))) - def create(self, texts: List[Document], **kwargs: Any) -> BaseKeyword: - """Create keyword index for documents.""" + def create(self, texts: list[Document], **kwargs: Any) -> BaseKeyword: + """Create keyword index for documents. + + Args: + texts: List of documents to index + **kwargs: Additional arguments + + Returns: + BaseKeyword: Self for method chaining + + Raises: + KeywordProcessorError: If indexing fails + KeywordExtractionError: If keyword extraction fails + KeywordStorageError: If storage operations fail + """ if not texts: return self @@ -105,8 +121,17 @@ def create(self, texts: List[Document], **kwargs: Any) -> BaseKeyword: return self - def add_texts(self, texts: List[Document], **kwargs: Any) -> None: - """Add new texts to existing index.""" + def add_texts(self, texts: list[Document], **kwargs: Any) -> None: + """Add new texts to existing index. + + Args: + texts: List of documents to add + **kwargs: Additional arguments including optional keywords_list + + Raises: + KeywordProcessorError: If indexing fails + KeywordStorageError: If storage operations fail + """ if not texts: return @@ -156,17 +181,38 @@ def add_texts(self, texts: List[Document], **kwargs: Any) -> None: raise def text_exists(self, id: str) -> bool: - """Check if text exists in index.""" + """Check if text exists in index. + + Args: + id: Document ID to check + + Returns: + bool: True if text exists, False otherwise + + Raises: + KeywordProcessorError: If check fails + """ if not id: return False - keyword_table = self._get_dataset_keyword_table() - if keyword_table is None: - return False - return id in set.union(*keyword_table.values()) if keyword_table else False + try: + keyword_table = self._get_dataset_keyword_table() + if keyword_table is None: + return False + return id in set.union(*keyword_table.values()) if keyword_table else False + except Exception as e: + logger.exception("Failed to check text existence") + raise KeywordProcessorError("Failed to check text existence: {}".format(str(e))) + + def delete_by_ids(self, ids: list[str]) -> None: + """Delete texts by IDs. + + Args: + ids: List of document IDs to delete - def delete_by_ids(self, ids: List[str]) -> None: - """Delete texts by IDs.""" + Raises: + KeywordStorageError: If deletion fails + """ if not ids: return @@ -182,7 +228,11 @@ def delete_by_ids(self, ids: List[str]) -> None: raise KeywordStorageError("Failed to delete documents: {}".format(str(e))) def delete(self) -> None: - """Delete entire index.""" + """Delete entire index. + + Raises: + KeywordStorageError: If deletion fails + """ lock_name = "keyword_indexing_lock_{}".format(self.dataset.id) try: with redis_client.lock(lock_name, timeout=600): @@ -197,8 +247,19 @@ def delete(self) -> None: logger.exception("Failed to delete index") raise KeywordStorageError("Failed to delete index: {}".format(str(e))) - def search(self, query: str, **kwargs: Any) -> List[Document]: - """Search documents using keywords.""" + def search(self, query: str, **kwargs: Any) -> list[Document]: + """Search documents using keywords. + + Args: + query: Search query string + **kwargs: Additional arguments including optional top_k + + Returns: + List[Document]: List of matching documents + + Raises: + KeywordProcessorError: If search fails + """ if not query: return [] @@ -214,10 +275,7 @@ def search(self, query: str, **kwargs: Any) -> List[Document]: for chunk_index in sorted_chunk_indices: segment = ( db.session.query(DocumentSegment) - .filter( - DocumentSegment.dataset_id == self.dataset.id, - DocumentSegment.index_node_id == chunk_index - ) + .filter(DocumentSegment.dataset_id == self.dataset.id, DocumentSegment.index_node_id == chunk_index) .first() ) @@ -239,7 +297,7 @@ def search(self, query: str, **kwargs: Any) -> List[Document]: logger.exception("Failed to search documents") raise KeywordProcessorError("Search failed: {}".format(str(e))) - def _get_dataset_keyword_table(self) -> Optional[Dict[str, Set[str]]]: + def _get_dataset_keyword_table(self) -> Optional[dict[str, set[str]]]: """Get keyword table from storage.""" try: dataset_keyword_table = self.dataset.dataset_keyword_table @@ -273,7 +331,7 @@ def _get_dataset_keyword_table(self) -> Optional[Dict[str, Set[str]]]: logger.exception("Failed to get keyword table") raise KeywordStorageError("Failed to get keyword table: {}".format(str(e))) - def _save_dataset_keyword_table(self, keyword_table: Dict[str, Set[str]]) -> None: + def _save_dataset_keyword_table(self, keyword_table: dict[str, set[str]]) -> None: """Save keyword table to storage.""" if keyword_table is None: raise ValueError("Keyword table cannot be None") @@ -303,8 +361,8 @@ def _save_dataset_keyword_table(self, keyword_table: Dict[str, Set[str]]) -> Non raise KeywordStorageError("Failed to save keyword table: {}".format(str(e))) def _add_text_to_keyword_table( - self, keyword_table: Dict[str, Set[str]], id: str, keywords: List[str] - ) -> Dict[str, Set[str]]: + self, keyword_table: dict[str, set[str]], id: str, keywords: list[str] + ) -> dict[str, set[str]]: """Add text keywords to table.""" if not id or not keywords: return keyword_table @@ -315,9 +373,7 @@ def _add_text_to_keyword_table( keyword_table[keyword].add(id) return keyword_table - def _delete_ids_from_keyword_table( - self, keyword_table: Dict[str, Set[str]], ids: List[str] - ) -> Dict[str, Set[str]]: + def _delete_ids_from_keyword_table(self, keyword_table: dict[str, set[str]], ids: list[str]) -> dict[str, set[str]]: """Delete IDs from keyword table.""" if not keyword_table or not ids: return keyword_table @@ -336,9 +392,7 @@ def _delete_ids_from_keyword_table( return keyword_table - def _retrieve_ids_by_query( - self, keyword_table: Dict[str, Set[str]], query: str, k: int = 4 - ) -> List[str]: + def _retrieve_ids_by_query(self, keyword_table: dict[str, set[str]], query: str, k: int = 4) -> list[str]: """Retrieve document IDs by query.""" if not query or not keyword_table: return [] @@ -366,9 +420,9 @@ def _retrieve_ids_by_query( logger.exception("Failed to retrieve IDs by query") raise KeywordExtractionError("Failed to retrieve IDs: {}".format(str(e))) - def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: List[str]) -> None: + def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: list[str]) -> None: """Update segment keywords in database.""" - if not dataset_id or not node_id: + if not dataset_id or not node_id or not keywords: return try: @@ -386,7 +440,7 @@ def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: List logger.exception("Failed to update segment keywords") raise KeywordStorageError("Failed to update segment keywords: {}".format(str(e))) - def create_segment_keywords(self, node_id: str, keywords: List[str]) -> None: + def create_segment_keywords(self, node_id: str, keywords: list[str]) -> None: """Create keywords for a single segment. Args: @@ -405,7 +459,7 @@ def create_segment_keywords(self, node_id: str, keywords: List[str]) -> None: logger.exception("Failed to create segment keywords") raise KeywordProcessorError("Failed to create segment keywords: {}".format(str(e))) - def multi_create_segment_keywords(self, pre_segment_data_list: List[Dict[str, Any]]) -> None: + def multi_create_segment_keywords(self, pre_segment_data_list: list[dict[str, Any]]) -> None: """Create keywords for multiple segments in batch.""" if not pre_segment_data_list: return @@ -443,7 +497,7 @@ def multi_create_segment_keywords(self, pre_segment_data_list: List[Dict[str, An logger.exception("Failed to create multiple segment keywords") raise KeywordProcessorError("Failed to create multiple segment keywords: {}".format(str(e))) - def update_segment_keywords_index(self, node_id: str, keywords: List[str]) -> None: + def update_segment_keywords_index(self, node_id: str, keywords: list[str]) -> None: """Update keywords index for a segment. Args: From 2f6bfe8e3052238d502df8cb0cafe318a34f006a Mon Sep 17 00:00:00 2001 From: "Dr. Kiji" Date: Thu, 2 Jan 2025 20:24:57 +0900 Subject: [PATCH 6/9] add type annotation --- .../rag/datasource/keyword/mecab/mecab_keyword_table_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py index 7efb57ce168468..dafaf9784fb0a4 100644 --- a/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py +++ b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py @@ -61,7 +61,7 @@ def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10 node = self.tagger.parseToNode(text) # Calculate term frequencies and scores - term_scores = defaultdict(float) + term_scores: defaultdict[str, float] = defaultdict(float) while node: features = node.feature.split(",") if len(features) > 0: From 7bdedbe645cd7b9bf8ec2c7bed3c273a86a505b3 Mon Sep 17 00:00:00 2001 From: "Dr. Kiji" Date: Thu, 2 Jan 2025 20:25:12 +0900 Subject: [PATCH 7/9] [WIP]for test perpose only --- api/pyproject.toml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/api/pyproject.toml b/api/pyproject.toml index 133a794a97a471..04920329683d6b 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -9,6 +9,10 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] package-mode = false +[[tool.mypy.overrides]] +module = ["MeCab.*"] +ignore_missing_imports = true + ############################################################ # [ Main ] Dependency group ############################################################ @@ -53,6 +57,7 @@ langfuse = "~2.51.3" langsmith = "~0.1.77" mailchimp-transactional = "~1.0.50" markdown = "~3.5.1" +mecab-python3 = "^1.0.6" # MeCab Python bindings nomic = "~3.1.2" novita-client = "~0.5.7" numpy = "~1.26.4" @@ -87,6 +92,7 @@ tiktoken = "~0.8.0" tokenizers = "~0.15.0" transformers = "~4.35.0" types-pytz = "~2024.2.0.20241003" +unidic-lite = "^1.0.8" unstructured = { version = "~0.16.1", extras = ["docx", "epub", "md", "msg", "ppt", "pptx"] } validators = "0.21.0" volcengine-python-sdk = {extras = ["ark"], version = "~1.0.98"} From 6d7eb6778780423cfadae551382ad4eb56d4e196 Mon Sep 17 00:00:00 2001 From: "Dr. Kiji" Date: Thu, 2 Jan 2025 20:26:44 +0900 Subject: [PATCH 8/9] Revert "[WIP]for test perpose only" This reverts commit 7bdedbe645cd7b9bf8ec2c7bed3c273a86a505b3. --- api/pyproject.toml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/api/pyproject.toml b/api/pyproject.toml index 04920329683d6b..133a794a97a471 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -9,10 +9,6 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] package-mode = false -[[tool.mypy.overrides]] -module = ["MeCab.*"] -ignore_missing_imports = true - ############################################################ # [ Main ] Dependency group ############################################################ @@ -57,7 +53,6 @@ langfuse = "~2.51.3" langsmith = "~0.1.77" mailchimp-transactional = "~1.0.50" markdown = "~3.5.1" -mecab-python3 = "^1.0.6" # MeCab Python bindings nomic = "~3.1.2" novita-client = "~0.5.7" numpy = "~1.26.4" @@ -92,7 +87,6 @@ tiktoken = "~0.8.0" tokenizers = "~0.15.0" transformers = "~4.35.0" types-pytz = "~2024.2.0.20241003" -unidic-lite = "^1.0.8" unstructured = { version = "~0.16.1", extras = ["docx", "epub", "md", "msg", "ppt", "pptx"] } validators = "0.21.0" volcengine-python-sdk = {extras = ["ark"], version = "~1.0.98"} From b94dca52be97b82303a63c865a7bd8bd9e36c976 Mon Sep 17 00:00:00 2001 From: "Dr. Kiji" Date: Thu, 2 Jan 2025 20:28:58 +0900 Subject: [PATCH 9/9] [WIP] add type ignore --- .../rag/datasource/keyword/mecab/mecab_keyword_table_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py index dafaf9784fb0a4..4475ecceffe7d3 100644 --- a/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py +++ b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py @@ -2,7 +2,7 @@ from operator import itemgetter from typing import Optional -import MeCab +import MeCab # type: ignore from core.rag.datasource.keyword.mecab.stopwords import STOPWORDS