Skip to content

Commit

Permalink
progress on plugin rethink
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzoic committed Jul 7, 2023
1 parent ff38cb3 commit 21cb198
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 3 deletions.
10 changes: 8 additions & 2 deletions countess/core/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ def hash(self):
"""Returns a hex digest of the hash of all configuration parameters"""
return self.get_parameter_hash().hexdigest()

def prepare(self):
pass

def process_inputs(self, inputs: Mapping[str, Iterable[Any]], logger: Logger, row_limit: Optional[int]) -> Iterable[Any]:
raise NotImplementedError(f"{self.__class__}.process_inputs()")

Expand Down Expand Up @@ -182,7 +185,8 @@ def process_inputs(
except StopIteration:
iterators.remove(it)

for p in self.parameters:
print(f"process_inputs {self.parameters}")
for p in self.parameters.values():
p.set_column_choices(self.input_columns.keys())

def process_dataframe(self, dataframe: pd.DataFrame, logger: Logger) -> pd.DataFrame:
Expand All @@ -191,7 +195,9 @@ def process_dataframe(self, dataframe: pd.DataFrame, logger: Logger) -> pd.DataF

class PandasTransformPlugin(PandasSimplePlugin):
def process_dataframe(self, dataframe: pd.DataFrame, logger: Logger) -> pd.DataFrame:
return dataframe.apply(self.process_row, expand=True)
df = dataframe.apply(self.process_row, result_type="expand", logger=logger)
print(f"process_dataframe {dataframe} {df}")
return df

def process_row(self, row: pd.Series, logger: Logger) -> pd.Series:
raise NotImplementedError(f"Implement {self.__class__.__name__}.process_row()")
Expand Down
3 changes: 3 additions & 0 deletions countess/plugins/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class LoadCsvPlugin(PandasInputPlugin):
def read_file_to_dataframe(self, file_params, logger, row_limit=None):
filename = file_params["filename"].value

print(f"read_file_to_dataframe {filename}")

options = {
"header": 0 if self.parameters["header"].value else None,
}
Expand Down Expand Up @@ -139,6 +141,7 @@ def read_file_to_dataframe(self, file_params, logger, row_limit=None):
if index_col_numbers:
df = df.set_index([df.columns[n] for n in index_col_numbers])

print(f"read_file_to_dataframe {df}")
return df


Expand Down
26 changes: 25 additions & 1 deletion countess/plugins/regex.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import re

from typing import Mapping, Iterable, Optional

import pandas as pd

from countess import VERSION
Expand All @@ -13,7 +15,7 @@
StringParam,
)
from countess.core.plugins import PandasInputPlugin, PandasTransformPlugin

from countess.core.logger import Logger

class RegexToolPlugin(PandasTransformPlugin):
name = "Regex Tool"
Expand Down Expand Up @@ -43,6 +45,8 @@ class RegexToolPlugin(PandasTransformPlugin):
"drop_unmatch": BooleanParam("Drop Unmatched Rows", False),
}

compiled_re = None

def run_df(self, df, logger):
compiled_re = re.compile(self.parameters["regex"].value)

Expand Down Expand Up @@ -99,6 +103,26 @@ def func(value):

return df

def process_inputs(
self, inputs: Mapping[str, Iterable[pd.DataFrame]], logger: Logger, row_limit: Optional[int]
) -> Iterable[pd.DataFrame]:

print(f"prepare! {self.parameters['regex'].value}")
self.compiled_re = re.compile(self.parameters["regex"].value)
while self.compiled_re.groups > len(self.parameters["output"].params):
self.parameters["output"].add_row()

return super().process_inputs(inputs, logger, row_limit)

def process_row(self, row: pd.Series, logger: Logger) -> pd.Series:
print(f"process_row {row}")
value = row[self.parameters["column"].value]
print(f"process_row value {value}")
if match := self.compiled_re.match(value):
return [1,2,3]
else:
return [1,2,3]


class RegexReaderPlugin(PandasInputPlugin):
name = "Regex Reader"
Expand Down

0 comments on commit 21cb198

Please sign in to comment.