diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8fb07fe..31143ef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,9 @@ jobs: with: python-version: 3.8 - name: Install dependencies - run: pip install -r requirements.txt + run: | + pip install openai==0.28.1 + pip install -r requirements.txt - name: Run tests and collect coverage run: | coverage run -m unittest discover diff --git a/README.md b/README.md index a163e13..7d259a5 100644 --- a/README.md +++ b/README.md @@ -568,6 +568,24 @@ class StoreSales(CommonFeatures, Filters): return self._dct["total_trans"] ``` +## 6. LLM Support + +Feature Factory supports Retrieval Augmented Generaetion by creating chunks of texts from documents. Vector store indices can be populated from the chunks of texts and be utilized for augmenting the prompts before feeding into LLMs. + +A LLM feature is a column of a dataframe which contains the chunks generated from input documents. This is an example of how can Feature Facotry APIs can be invoked to create a LLM feature: + +```python +df = ff.assemble_llm_feature(spark, srcDirectory= "a directory containing documents", llmFeature=llm_feature, partitionNum=partition_num) +``` +In this example, `srcDirectory` is the directory containing all intput documents. The `partitionNum` is the number of spark partitions during computation: i.e. if you have two work nodes as GPU instances, you can set the partitionNum to be 2 to distribute the documents onto the two worker nodes. + +`llm_feature` is an instance of class `LLMFeature`, which consists of a doc reader and splitter. The current implementation of doc readers includes SimpleDirectoryReader of LlamaIndex and UnstructuredDocReader using Unstructured API. Cusotimized readers can be implemented by overriding class DocReader and re-implement the `create` and `apply` method. `create` method is called to create the resources needed for the computation, and the `apply` make inference for each file/row. + +The current implementation of doc splitters supports `SimpleNodeParser` of LlamaIndex, `RecursiveCharacterTextSplitter` of LangChain, and a custom tokeninzer based splitter (`TokenizerTextSpliter`). Like doc readers, the splitter classes can be extended by subclass DocSplitter. Please note that meta data extractor is supported for the `SimpleNodeParser`. A LLM instance needs to be created for the metadata extracion. The LLM definition needs to subclass `LLMDef` and override the `create` method. An example of LLM definition can be found at: [LLM notebook](./notebooks/feature_factory_llms.py). + +Metadata of documents can be extracted using the Metadata extractor of LlamaIndex. Feature factory also provides a method to extract metadadta from the file pathes. For example, if your documents are stored in directories of years, you can extract the year as metadata if the directories are named as `year=[actual year]`. For example, if your document has the path of /tmp/year_of_publication=2023/doc1, after splitting, each chunk from that document will have `year of publication: 2023` as the part of the header of the chunk. + + ## Project Support Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects. diff --git a/framework/feature_factory/__init__.py b/framework/feature_factory/__init__.py index b3359fd..9214813 100644 --- a/framework/feature_factory/__init__.py +++ b/framework/feature_factory/__init__.py @@ -87,7 +87,15 @@ def append_catalog(self, df: DataFrame, groupBy_cols, catalog_cls, feature_names return self.append_features(df, groupBy_cols, [fs], withTrendsForFeatures, granularityEnum) def assemble_llm_feature(self, spark: SparkSession, srcDirectory: str, llmFeature: LLMFeature, partitionNum: int): - + """ + Creates a dataframe which contains only one column named as llmFeature.name. + The method will distribute the files under srcDirectory to the partitions determined by the partitionNum. + Each file will be parsed and chunked using the reader and splitter in the llmFeature object. + :param spark: a spark session instance + :param srcDirectory: the directory containing documents to parse + :llmFeature: the LLM feature instance + :partitionNum: the number of partitions the src documents will be distributed onto. + """ all_files = self.helpers.list_files_recursively(srcDirectory) src_rdd = spark.sparkContext.parallelize(all_files, partitionNum) diff --git a/framework/feature_factory/catalog.py b/framework/feature_factory/catalog.py index 9233d95..26be271 100644 --- a/framework/feature_factory/catalog.py +++ b/framework/feature_factory/catalog.py @@ -1,5 +1,5 @@ from .feature import Feature - +from .llm_tools import LLMFeature class CatalogBase: @classmethod @@ -29,3 +29,19 @@ def get_all_features(cls): members[nm] = variable variable.set_feature_name(nm) return members + +class LLMCatalogBase: + @classmethod + def get_all_features(cls) -> LLMFeature: + """ + Returns a LLMFeature which contains a DocReader and DocSplitter instance. + """ + llm_feat = None + for aclass in reversed(cls.__mro__): + vars_dct = vars(aclass) + for nm, variable in vars_dct.items(): + if not callable(getattr(aclass, nm)) and not nm.startswith("__"): + if isinstance(variable, LLMFeature): + llm_feat = variable + llm_feat.name = nm + return llm_feat diff --git a/framework/feature_factory/llm_tools.py b/framework/feature_factory/llm_tools.py index 6e229c5..b9bf844 100644 --- a/framework/feature_factory/llm_tools.py +++ b/framework/feature_factory/llm_tools.py @@ -12,11 +12,16 @@ from langchain.text_splitter import RecursiveCharacterTextSplitter from transformers import AutoTokenizer from langchain.document_loaders import UnstructuredPDFLoader -import math +import math, os, re class LLMTool(ABC): - + """Generic interface for LLMs tools. + apply and create methods need to be implemented in the children classes. + create method creates resources for the tool and apply method makes inference using the resources. + If the resources are not created before calling apply(), create() will be invoked in the beginning of the apply(). + Having a separate create() will make it more efficient to initalize/create all required resouces only once per partition. + """ def __init__(self) -> None: self._initialized = False @@ -37,7 +42,8 @@ def create(self): class DocReader(LLMTool): - + """ Generic class for doc reader. + """ def create(self): ... @@ -46,7 +52,8 @@ def apply(self, filename: str) -> Union[str, List[Document]]: class DocSplitter(LLMTool): - + """ Generic class for doc splitter. + """ def __init__(self) -> None: super().__init__() @@ -108,13 +115,26 @@ def _to_lcdocuments(cls, docs: Union[str, List[Document], List[LCDocument]]): new_docs.append(new_doc) return new_docs + @classmethod + def extract_directory_metadata(cls, fileName: str): + path_parts = os.path.normpath(fileName).split(os.path.sep) + attrs = {} + for part in path_parts: + if "=" in part: + attr, val = part.split('=') + if attr and val: + attr = re.sub(r'[-_]', ' ', attr, flags=re.IGNORECASE) + attrs[attr] = val + return attrs def apply(self, docs: Union[str, List[Document]]) -> List[str]: ... class LlamaIndexDocReader(DocReader): - + """A wrapper class for SimpleDirectoryReader of LlamaIndex. + For more details, refer to https://gpt-index.readthedocs.io/en/latest/examples/data_connectors/simple_directory_reader.html + """ def __init__(self) -> None: super().__init__() @@ -124,6 +144,9 @@ def apply(self, filename: str) -> List[Document]: class UnstructuredDocReader(DocReader): + """ + A doc reader class using Unstructured API. Only allowed categories will be included in the final parsed text. + """ def __init__(self, allowedCategories: Tuple[str]=('NarrativeText', 'ListItem')) -> None: super().__init__() @@ -143,7 +166,9 @@ def apply(self, filename: str) -> str: class LLMDef(LLMTool): - + """ A generic class to define LLM instance e.g. using HuggingFace APIs. + An example can be found at notebooks/feature_factory_llms.py + """ def __init__(self) -> None: self._instance = None @@ -153,7 +178,11 @@ def get_instance(self): class LlamaIndexDocSplitter(DocSplitter): - + """A class to split documents using LlamaIndex SimpleNodeParser. + TokenTextSplitter and TitleExtractor are used to generate text chunks and metadata for each chunk. + `chunk_size`, `chunk_overlap` are the super parameters to tweak for better response from LLMs. + `llm` is the LLM instance used for metadata extraction. If not provided, the splitter will generate text chunks only. + """ def __init__(self, chunk_size:int=1024, chunk_overlap:int=64, llm:LLMDef=None) -> None: super().__init__() self.chunk_size = chunk_size @@ -186,12 +215,20 @@ def apply(self, docs: List[Document]): docs = DocSplitter._to_documents(docs) self.create() doc_nodes = self.node_parser.get_nodes_from_documents(docs) + for node in doc_nodes: + if 'file_path' in node.metadata: + filepath = node.metadata['file_path'] + doc_attrs = DocSplitter.extract_directory_metadata(filepath) + node.metadata.update(doc_attrs) chunks = [node.get_content(metadata_mode=MetadataMode.LLM) for node in doc_nodes] return chunks class LangChainRecursiveCharacterTextSplitter(DocSplitter): - + """ A splitter class to utilize Langchain RecursiveCharacterTextSplitter to generate text chunks. + If `pretrained_model_path` is provided, the `chunk_size` and `chunk_overlap` will be measured in tokens. + If `pretrained_model_path` is not provided, the `chunk_size` and `chunk_overlap` will be measured in characters. + """ def __init__(self, chunk_size=1024, chunk_overlap=64, pretrained_model_path: str=None) -> None: super().__init__() self.chunk_size = chunk_size @@ -219,7 +256,9 @@ def apply(self, docs): class TokenizerTextSpliter(DocSplitter): - + """ A text splitter which uses LLM defined by `pretrained_tokenizer_path` to encode the input text. + The splitting will be applied to the tokens instead of characters. + """ def __init__(self, chunk_size=1024, chunk_overlap=64, pretrained_tokenizer_path: str=None) -> None: super().__init__() self.chunk_size = chunk_size @@ -248,8 +287,24 @@ def apply(self, text: Union[str, List[Document]]) -> List[str]: class LLMFeature(LLMTool): + """ A container class to hold all required reader and splitter instances. + The name is the column name for text chunks in the generated spark dataframe. + If the name is not provided, it will take the variable name in the LLM catalog as the name. + e.g. + class TestCatalog(LLMCatalogBase): + + # define a reader for the documents + doc_reader = LlamaIndexDocReader() + + # define a text splitter + doc_splitter = LangChainRecursiveCharacterTextSplitter() + + # define a LLM feature, the name is the column name in the result dataframe + chunk_col_name = LLMFeature(reader=doc_reader, splitter=doc_splitter) - def __init__(self, name: str, reader: DocReader, splitter: DocSplitter) -> None: + The name of output dataframe will be `chunk_col_name`. + """ + def __init__(self, reader: DocReader, splitter: DocSplitter, name: str = "chunks") -> None: super().__init__() self.name = name self.reader = reader @@ -267,7 +322,8 @@ def apply(self, filename: str): class LLMUtils: - + """ Util class to define generic split and process methods invoked from spark. + """ @classmethod def split_docs(cls, fileName: str, llmFeat: LLMFeature): print(fileName) @@ -278,4 +334,5 @@ def split_docs(cls, fileName: str, llmFeat: LLMFeature): def process_docs(cls, partitionData, llmFeat): llmFeat.create() for row in partitionData: - yield cls.split_docs(row, llmFeat) \ No newline at end of file + yield cls.split_docs(row, llmFeat) + diff --git a/requirements.txt b/requirements.txt index d8f7401..8ee2a21 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,12 +4,11 @@ py4j==0.10.9 pyarrow==5.0.0 pyspark==3.1.3 python-dateutil==2.8.1 -pdf2image>=1.16.3 scipy==1.7.1 six==1.15.0 coverage -langchain>=0.0.317 -llama-index>=0.8.61 +langchain==0.0.317 +llama-index==0.8.61 pypdf>=3.17.0 PyPDF2>=3.0.1 transformers>=4.31.0 diff --git a/setup.py b/setup.py index 0596daf..68b6e8b 100644 --- a/setup.py +++ b/setup.py @@ -14,11 +14,12 @@ def read(fname): setup( name='featurefactory', - version="0.13.0", + version="0.14.0", author="Databricks", packages=find_packages(exclude=['tests', 'tests.*', 'data', 'data.*', 'notebook', 'notebook.*']), install_requires=[ - 'python-dateutil' + 'python-dateutil', + 'openai>=0.27.8,<1.0' ], description='feature factory', long_description=read('README.md'), diff --git a/test/test_chunking.py b/test/test_chunking.py index 7c90b97..258f54e 100644 --- a/test/test_chunking.py +++ b/test/test_chunking.py @@ -7,7 +7,7 @@ import json from pyspark.sql.types import StructType from test.local_spark_singleton import SparkSingleton -from framework.feature_factory.catalog import CatalogBase +from framework.feature_factory.catalog import LLMCatalogBase from enum import IntEnum from framework.feature_factory.llm_tools import * @@ -49,7 +49,7 @@ def test_recursive_splitter_llamaindex_docs(self): def test_process_docs(self): doc_reader = LlamaIndexDocReader() doc_splitter = LlamaIndexDocSplitter() - llm_feature = LLMFeature("test_llm", reader=doc_reader, splitter=doc_splitter) + llm_feature = LLMFeature(reader=doc_reader, splitter=doc_splitter) chunks = LLMUtils.process_docs(["test/data/sample.pdf"], llmFeat=llm_feature) for chunk in chunks: assert len(chunk) == 1 @@ -113,4 +113,24 @@ def test_token_splitter(self): doc_splitter = TokenizerTextSpliter(chunk_size=1024, chunk_overlap=32, pretrained_tokenizer_path="hf-internal-testing/llama-tokenizer") chunks = doc_splitter.apply(docs) assert len(chunks) == 1 - \ No newline at end of file + + def test_llm_catalog(self): + class TestCatalog(LLMCatalogBase): + + # define a reader for the documents + doc_reader = LlamaIndexDocReader() + + # define a text splitter + doc_splitter = LangChainRecursiveCharacterTextSplitter() + + # define a LLM feature, the name is the column name in the result dataframe + chunk_col_name = LLMFeature(reader=doc_reader, splitter=doc_splitter) + + llm_feature = TestCatalog.get_all_features() + assert llm_feature.name == "chunk_col_name" + assert llm_feature.reader == TestCatalog.doc_reader + assert llm_feature.splitter == TestCatalog.doc_splitter + + def test_dir_meta_extraction(self): + attrs = DocSplitter.extract_directory_metadata("/tmp/year_of_publication=2023") + assert attrs["year of publication"] == "2023" \ No newline at end of file