Skip to content

Commit

Permalink
improve schema
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Aug 26, 2024
1 parent 2a6d6a5 commit 71683ca
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
3 changes: 2 additions & 1 deletion fastparquet/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ def __init__(self, schema_elements):
except AttributeError:
pass # already a str
self.root = schema_elements[0]
# repeats loop above doing decoding, could meld
self.schema_elements_by_name = {
se[4]: se for se in schema_elements}
self.tree = {}
schema_tree(schema_elements, paths = self.tree)
schema_tree(schema_elements, paths=self.tree)
self._text = None

@property
Expand Down
8 changes: 6 additions & 2 deletions fastparquet/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ def concat_and_add(*arrs, offset=True):


class Task:
"""Just holds a function and arguments for later call"""
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
Expand All @@ -479,9 +480,9 @@ def __call__(self):


class ThreadPool:
"""Simplistic fire&forget pool of X threads"""

def __init__(self, num_workers, poll_time=0.0003):
import queue
self.num_workers = num_workers
self.tasks = []
self.ntask = 0
Expand All @@ -503,6 +504,9 @@ def wait(self):
while len(self.done) < self.ntask:
time.sleep(self.poll)
self.th.clear()
self.done.clear()
self.tasks.clear()
self.ntask = 0

def submit(self, func, *args, **kwargs):
self.tasks.append(Task(func, *args, **kwargs))
Expand All @@ -516,5 +520,5 @@ def go(self, wait=True):
self.wait()

def run_tasks(self, tasks: list[callable], wait=True):
self.tasks = tasks
self.tasks.extend(tasks)
self.go(wait)

0 comments on commit 71683ca

Please sign in to comment.