Skip to content

Commit

Permalink
Merge remote-tracking branch 'ff_llms/llms' into llms
Browse files Browse the repository at this point in the history
  • Loading branch information
lyliyu committed Nov 17, 2023
2 parents 294d4f9 + 872b900 commit b079aa2
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 23 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ jobs:
with:
python-version: 3.8
- name: Install dependencies
run: pip install -r requirements.txt
run: |
pip install openai==0.28.1
pip install -r requirements.txt
- name: Run tests and collect coverage
run: |
coverage run -m unittest discover
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,24 @@ class StoreSales(CommonFeatures, Filters):
return self._dct["total_trans"]
```

## 6. LLM Support

Feature Factory supports Retrieval Augmented Generaetion by creating chunks of texts from documents. Vector store indices can be populated from the chunks of texts and be utilized for augmenting the prompts before feeding into LLMs.

A LLM feature is a column of a dataframe which contains the chunks generated from input documents. This is an example of how can Feature Facotry APIs can be invoked to create a LLM feature:

```python
df = ff.assemble_llm_feature(spark, srcDirectory= "a directory containing documents", llmFeature=llm_feature, partitionNum=partition_num)
```
In this example, `srcDirectory` is the directory containing all intput documents. The `partitionNum` is the number of spark partitions during computation: i.e. if you have two work nodes as GPU instances, you can set the partitionNum to be 2 to distribute the documents onto the two worker nodes.

`llm_feature` is an instance of class `LLMFeature`, which consists of a doc reader and splitter. The current implementation of doc readers includes SimpleDirectoryReader of LlamaIndex and UnstructuredDocReader using Unstructured API. Cusotimized readers can be implemented by overriding class DocReader and re-implement the `create` and `apply` method. `create` method is called to create the resources needed for the computation, and the `apply` make inference for each file/row.

The current implementation of doc splitters supports `SimpleNodeParser` of LlamaIndex, `RecursiveCharacterTextSplitter` of LangChain, and a custom tokeninzer based splitter (`TokenizerTextSpliter`). Like doc readers, the splitter classes can be extended by subclass DocSplitter. Please note that meta data extractor is supported for the `SimpleNodeParser`. A LLM instance needs to be created for the metadata extracion. The LLM definition needs to subclass `LLMDef` and override the `create` method. An example of LLM definition can be found at: [LLM notebook](./notebooks/feature_factory_llms.py).

Metadata of documents can be extracted using the Metadata extractor of LlamaIndex. Feature factory also provides a method to extract metadadta from the file pathes. For example, if your documents are stored in directories of years, you can extract the year as metadata if the directories are named as `year=[actual year]`. For example, if your document has the path of /tmp/year_of_publication=2023/doc1, after splitting, each chunk from that document will have `year of publication: 2023` as the part of the header of the chunk.


## Project Support
Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.

Expand Down
10 changes: 9 additions & 1 deletion framework/feature_factory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,15 @@ def append_catalog(self, df: DataFrame, groupBy_cols, catalog_cls, feature_names
return self.append_features(df, groupBy_cols, [fs], withTrendsForFeatures, granularityEnum)

def assemble_llm_feature(self, spark: SparkSession, srcDirectory: str, llmFeature: LLMFeature, partitionNum: int):

"""
Creates a dataframe which contains only one column named as llmFeature.name.
The method will distribute the files under srcDirectory to the partitions determined by the partitionNum.
Each file will be parsed and chunked using the reader and splitter in the llmFeature object.
:param spark: a spark session instance
:param srcDirectory: the directory containing documents to parse
:llmFeature: the LLM feature instance
:partitionNum: the number of partitions the src documents will be distributed onto.
"""
all_files = self.helpers.list_files_recursively(srcDirectory)
src_rdd = spark.sparkContext.parallelize(all_files, partitionNum)

Check warning on line 100 in framework/feature_factory/__init__.py

View check run for this annotation

Codecov / codecov/patch

framework/feature_factory/__init__.py#L99-L100

Added lines #L99 - L100 were not covered by tests

Expand Down
18 changes: 17 additions & 1 deletion framework/feature_factory/catalog.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .feature import Feature

from .llm_tools import LLMFeature

class CatalogBase:
@classmethod
Expand Down Expand Up @@ -29,3 +29,19 @@ def get_all_features(cls):
members[nm] = variable
variable.set_feature_name(nm)
return members

class LLMCatalogBase:
@classmethod
def get_all_features(cls) -> LLMFeature:
"""
Returns a LLMFeature which contains a DocReader and DocSplitter instance.
"""
llm_feat = None
for aclass in reversed(cls.__mro__):
vars_dct = vars(aclass)
for nm, variable in vars_dct.items():
if not callable(getattr(aclass, nm)) and not nm.startswith("__"):
if isinstance(variable, LLMFeature):
llm_feat = variable
llm_feat.name = nm
return llm_feat
81 changes: 69 additions & 12 deletions framework/feature_factory/llm_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
from langchain.text_splitter import RecursiveCharacterTextSplitter
from transformers import AutoTokenizer
from langchain.document_loaders import UnstructuredPDFLoader
import math
import math, os, re


class LLMTool(ABC):

"""Generic interface for LLMs tools.
apply and create methods need to be implemented in the children classes.
create method creates resources for the tool and apply method makes inference using the resources.
If the resources are not created before calling apply(), create() will be invoked in the beginning of the apply().
Having a separate create() will make it more efficient to initalize/create all required resouces only once per partition.
"""
def __init__(self) -> None:
self._initialized = False

Expand All @@ -37,7 +42,8 @@ def create(self):


class DocReader(LLMTool):

""" Generic class for doc reader.
"""
def create(self):
...

Expand All @@ -46,7 +52,8 @@ def apply(self, filename: str) -> Union[str, List[Document]]:


class DocSplitter(LLMTool):

""" Generic class for doc splitter.
"""
def __init__(self) -> None:
super().__init__()

Expand Down Expand Up @@ -108,13 +115,26 @@ def _to_lcdocuments(cls, docs: Union[str, List[Document], List[LCDocument]]):
new_docs.append(new_doc)
return new_docs

@classmethod
def extract_directory_metadata(cls, fileName: str):
path_parts = os.path.normpath(fileName).split(os.path.sep)
attrs = {}
for part in path_parts:
if "=" in part:
attr, val = part.split('=')
if attr and val:
attr = re.sub(r'[-_]', ' ', attr, flags=re.IGNORECASE)
attrs[attr] = val
return attrs

def apply(self, docs: Union[str, List[Document]]) -> List[str]:
...

Check warning on line 131 in framework/feature_factory/llm_tools.py

View check run for this annotation

Codecov / codecov/patch

framework/feature_factory/llm_tools.py#L131

Added line #L131 was not covered by tests


class LlamaIndexDocReader(DocReader):

"""A wrapper class for SimpleDirectoryReader of LlamaIndex.
For more details, refer to https://gpt-index.readthedocs.io/en/latest/examples/data_connectors/simple_directory_reader.html
"""
def __init__(self) -> None:
super().__init__()

Expand All @@ -124,6 +144,9 @@ def apply(self, filename: str) -> List[Document]:


class UnstructuredDocReader(DocReader):
"""
A doc reader class using Unstructured API. Only allowed categories will be included in the final parsed text.
"""

def __init__(self, allowedCategories: Tuple[str]=('NarrativeText', 'ListItem')) -> None:
super().__init__()
Expand All @@ -143,7 +166,9 @@ def apply(self, filename: str) -> str:


class LLMDef(LLMTool):

""" A generic class to define LLM instance e.g. using HuggingFace APIs.
An example can be found at notebooks/feature_factory_llms.py
"""
def __init__(self) -> None:
self._instance = None

Check warning on line 173 in framework/feature_factory/llm_tools.py

View check run for this annotation

Codecov / codecov/patch

framework/feature_factory/llm_tools.py#L173

Added line #L173 was not covered by tests

Expand All @@ -153,7 +178,11 @@ def get_instance(self):


class LlamaIndexDocSplitter(DocSplitter):

"""A class to split documents using LlamaIndex SimpleNodeParser.
TokenTextSplitter and TitleExtractor are used to generate text chunks and metadata for each chunk.
`chunk_size`, `chunk_overlap` are the super parameters to tweak for better response from LLMs.
`llm` is the LLM instance used for metadata extraction. If not provided, the splitter will generate text chunks only.
"""
def __init__(self, chunk_size:int=1024, chunk_overlap:int=64, llm:LLMDef=None) -> None:
super().__init__()
self.chunk_size = chunk_size
Expand Down Expand Up @@ -186,12 +215,20 @@ def apply(self, docs: List[Document]):
docs = DocSplitter._to_documents(docs)
self.create()
doc_nodes = self.node_parser.get_nodes_from_documents(docs)
for node in doc_nodes:
if 'file_path' in node.metadata:
filepath = node.metadata['file_path']
doc_attrs = DocSplitter.extract_directory_metadata(filepath)
node.metadata.update(doc_attrs)
chunks = [node.get_content(metadata_mode=MetadataMode.LLM) for node in doc_nodes]
return chunks


class LangChainRecursiveCharacterTextSplitter(DocSplitter):

""" A splitter class to utilize Langchain RecursiveCharacterTextSplitter to generate text chunks.
If `pretrained_model_path` is provided, the `chunk_size` and `chunk_overlap` will be measured in tokens.
If `pretrained_model_path` is not provided, the `chunk_size` and `chunk_overlap` will be measured in characters.
"""
def __init__(self, chunk_size=1024, chunk_overlap=64, pretrained_model_path: str=None) -> None:
super().__init__()
self.chunk_size = chunk_size
Expand Down Expand Up @@ -219,7 +256,9 @@ def apply(self, docs):


class TokenizerTextSpliter(DocSplitter):

""" A text splitter which uses LLM defined by `pretrained_tokenizer_path` to encode the input text.
The splitting will be applied to the tokens instead of characters.
"""
def __init__(self, chunk_size=1024, chunk_overlap=64, pretrained_tokenizer_path: str=None) -> None:
super().__init__()
self.chunk_size = chunk_size
Expand Down Expand Up @@ -248,8 +287,24 @@ def apply(self, text: Union[str, List[Document]]) -> List[str]:


class LLMFeature(LLMTool):
""" A container class to hold all required reader and splitter instances.
The name is the column name for text chunks in the generated spark dataframe.
If the name is not provided, it will take the variable name in the LLM catalog as the name.
e.g.
class TestCatalog(LLMCatalogBase):
# define a reader for the documents
doc_reader = LlamaIndexDocReader()
# define a text splitter
doc_splitter = LangChainRecursiveCharacterTextSplitter()
# define a LLM feature, the name is the column name in the result dataframe
chunk_col_name = LLMFeature(reader=doc_reader, splitter=doc_splitter)
def __init__(self, name: str, reader: DocReader, splitter: DocSplitter) -> None:
The name of output dataframe will be `chunk_col_name`.
"""
def __init__(self, reader: DocReader, splitter: DocSplitter, name: str = "chunks") -> None:
super().__init__()
self.name = name
self.reader = reader
Expand All @@ -267,7 +322,8 @@ def apply(self, filename: str):


class LLMUtils:

""" Util class to define generic split and process methods invoked from spark.
"""
@classmethod
def split_docs(cls, fileName: str, llmFeat: LLMFeature):
print(fileName)
Expand All @@ -278,4 +334,5 @@ def split_docs(cls, fileName: str, llmFeat: LLMFeature):
def process_docs(cls, partitionData, llmFeat):
llmFeat.create()
for row in partitionData:
yield cls.split_docs(row, llmFeat)
yield cls.split_docs(row, llmFeat)

5 changes: 2 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ py4j==0.10.9
pyarrow==5.0.0
pyspark==3.1.3
python-dateutil==2.8.1
pdf2image>=1.16.3
scipy==1.7.1
six==1.15.0
coverage
langchain>=0.0.317
llama-index>=0.8.61
langchain==0.0.317
llama-index==0.8.61
pypdf>=3.17.0
PyPDF2>=3.0.1
transformers>=4.31.0
Expand Down
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ def read(fname):

setup(
name='featurefactory',
version="0.13.0",
version="0.14.0",
author="Databricks",
packages=find_packages(exclude=['tests', 'tests.*', 'data', 'data.*', 'notebook', 'notebook.*']),
install_requires=[
'python-dateutil'
'python-dateutil',
'openai>=0.27.8,<1.0'
],
description='feature factory',
long_description=read('README.md'),
Expand Down
26 changes: 23 additions & 3 deletions test/test_chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import json
from pyspark.sql.types import StructType
from test.local_spark_singleton import SparkSingleton
from framework.feature_factory.catalog import CatalogBase
from framework.feature_factory.catalog import LLMCatalogBase
from enum import IntEnum
from framework.feature_factory.llm_tools import *

Expand Down Expand Up @@ -49,7 +49,7 @@ def test_recursive_splitter_llamaindex_docs(self):
def test_process_docs(self):
doc_reader = LlamaIndexDocReader()
doc_splitter = LlamaIndexDocSplitter()
llm_feature = LLMFeature("test_llm", reader=doc_reader, splitter=doc_splitter)
llm_feature = LLMFeature(reader=doc_reader, splitter=doc_splitter)
chunks = LLMUtils.process_docs(["test/data/sample.pdf"], llmFeat=llm_feature)
for chunk in chunks:
assert len(chunk) == 1
Expand Down Expand Up @@ -113,4 +113,24 @@ def test_token_splitter(self):
doc_splitter = TokenizerTextSpliter(chunk_size=1024, chunk_overlap=32, pretrained_tokenizer_path="hf-internal-testing/llama-tokenizer")
chunks = doc_splitter.apply(docs)
assert len(chunks) == 1


def test_llm_catalog(self):
class TestCatalog(LLMCatalogBase):

# define a reader for the documents
doc_reader = LlamaIndexDocReader()

# define a text splitter
doc_splitter = LangChainRecursiveCharacterTextSplitter()

# define a LLM feature, the name is the column name in the result dataframe
chunk_col_name = LLMFeature(reader=doc_reader, splitter=doc_splitter)

llm_feature = TestCatalog.get_all_features()
assert llm_feature.name == "chunk_col_name"
assert llm_feature.reader == TestCatalog.doc_reader
assert llm_feature.splitter == TestCatalog.doc_splitter

def test_dir_meta_extraction(self):
attrs = DocSplitter.extract_directory_metadata("/tmp/year_of_publication=2023")
assert attrs["year of publication"] == "2023"

0 comments on commit b079aa2

Please sign in to comment.