Skip to content

Commit

Permalink
Merge pull request #97 from realratchet/master
Browse files Browse the repository at this point in the history
Accessing page updates timestamp
  • Loading branch information
realratchet authored Oct 23, 2023
2 parents e162897 + 62f2d71 commit 2eb792f
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 70 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

| Version | Change |
|:-----------|-----------------------------------------------------|
|2023.8.0| Nim backend for csv importer.<br>Improve excel importer.<br>Improve slicing consistency.<br>Logical cores re-enabled on *nix based systems.<br>Filter is now type safe.<br>Added merge utility.<br>Various bugfixes.|
|2023.6.5| Fix issues with `get_headers` falling back to text reading when reading 0 lines of excel, fix issue where reading excel file would ignore file count, excel file reader now has parity for linecount selection. |
|2023.6.4| Fix a logic bug in `get_headers` that caused one extra line to be returned than requested. |
|2023.6.3| Updated the way reference counting works. Tablite now tracks references to used pages and cleans them up based on number of references to those pages in the current process. This change allows to handle deep table clones when sending tables via processes (pickling/unpickling), whereas previous implementation would corrupt all tables using same pages due to reference counting asserting that all tables are shallow copies to the same object.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pyexcel-ods==0.6.0
pyexcel-xlsx==0.6.0
pyexcel-xls==0.7.0
pyuca>=1.2
mplite>=1.2.3
mplite>=1.2.5
PyYAML==6.0
openpyxl==3.0.10 # newest version breaks pyexcel
h5py>=3.6.0
Expand Down
3 changes: 0 additions & 3 deletions tablite/_nimlite/nimlite.nim
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ when isMainModule and appType == "lib":
echo $e.msg & "\n" & $e.getStackTrace
raise e

# if isMainModule:
# echo "Nimlite imported!"

when isMainModule and appType != "lib":
import argparse
import std/[sugar, json]
Expand Down
119 changes: 72 additions & 47 deletions tablite/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import warnings
import zipfile
from tablite.utils import load_numpy, update_access_time
import numpy as np
from tqdm import tqdm as _tqdm
from pathlib import Path
Expand Down Expand Up @@ -55,66 +56,106 @@ def register(path):
def shutdown():
"""method to clean up temporary files triggered at shutdown."""
for path in file_registry:
if str(os.getpid()) in str(path): # safety feature to prevent rm -rf /
if Config.pid in str(path): # safety feature to prevent rm -rf /
log.debug(f"shutdown: running rmtree({path})")
shutil.rmtree(path)


atexit.register(shutdown)


class Page(object):
class SimplePage(object):
ids = count(start=1)
refcounts = {}
autocleanup = True

def __init__(self, id, path, len, py_dtype) -> None:
self.id = id
self.path = path / "pages" / f"{id}.npy"
self.len = len
self.dtype = py_dtype

self._incr_refcount()

def _incr_refcount(self):
""" increment refcount of this page if it's used by this process"""
if f"pid-{os.getpid()}" in self.path.parts:
if self.owns():
self.refcounts[self.path] = self.refcounts.get(self.path, 0) + 1


def __setstate__(self, state):
"""
when an object is unpickled,
say in a case of multi-processing, object.__setstate__(state) is called instead of __init__,
this means we need to update page refcount as if constructor had been called
"""
self.__dict__.update(state)

self._incr_refcount()

@classmethod
def next_id(cls, path):
while True:
_id = next(cls.ids)
type_check(path, Path)
_path = path / "pages" / f"{_id}.npy"

if not _path.exists():
break # make sure we don't override existing pages if they are created outside of main thread

return _id

def __len__(self):
return self.len

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.path}, {self.get()})"

def __hash__(self) -> int:
return hash(self.id)

def owns(self):
parts = self.path.parts

return all((p in parts for p in Path(Config.pid).parts))

def __del__(self):
"""When python's reference count for an object is 0, python uses
it's garbage collector to remove the object and free the memory.
As tablite tables have columns and columns have page and pages have
data stored on disk, the space on disk must be freed up as well.
This __del__ override assures the cleanup of stored data.
"""
if f"pid-{os.getpid()}" not in self.path.parts:
if not self.owns():
return

refcount = self.refcounts[self.path] = max(self.refcounts.get(self.path, 0) - 1, 0)

if refcount > 0:
return

self.path.unlink(True)
if self.autocleanup:
self.path.unlink(True)

del self.refcounts[self.path]

def get(self):
"""loads stored data
Returns:
np.ndarray: stored data.
"""
array = load_numpy(self.path)
return MetaArray(array, array.dtype, py_dtype=self.dtype)


class Page(SimplePage):
def __init__(self, path, array) -> None:
"""
Args:
path (Path): working directory.
array (np.array): data
"""
while True:
self.id = next(self.ids)
type_check(path, Path)
self.path = path / "pages" / f"{self.id}.npy"

if not self.path.exists():
break # make sure we don't override existing pages if they are created outside of main thread
_id = self.next_id(path)

type_check(array, np.ndarray)

Expand All @@ -136,33 +177,16 @@ def __init__(self, path, array) -> None:
)
raise OSError(msg)

self.len = len(array)
_len = len(array)
# type_check(array, MetaArray)
if not hasattr(array, "metadata"):
raise ValueError
self.dtype = array.metadata["py_dtype"]
np.save(self.path, array, allow_pickle=True, fix_imports=False)
log.debug(f"Page saved: {self.path}")

self._incr_refcount() # increment refcount for this page

def __len__(self):
return self.len
_dtype = array.metadata["py_dtype"]

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.path}, {self.get()})"

def __hash__(self) -> int:
return self.id

def get(self):
"""loads stored data
super().__init__(_id, path, _len, _dtype)

Returns:
np.ndarray: stored data.
"""
array = np.load(self.path, allow_pickle=True, fix_imports=False)
return MetaArray(array, array.dtype, py_dtype=self.dtype)
np.save(self.path, array, allow_pickle=True, fix_imports=False)
log.debug(f"Page saved: {self.path}")


class Column(object):
Expand Down Expand Up @@ -300,14 +324,14 @@ def getpages(self, item):
pages.append(page)
else: # fetch the slice and filter it.
search_slice = slice(ro.start - start, ro.stop - start, ro.step)
np_arr = np.load(page.path, allow_pickle=True, fix_imports=False)
np_arr = load_numpy(page.path)
match = np_arr[search_slice]
pages.append(match)

if is_reversed:
pages.reverse()
for ix, page in enumerate(pages):
if isinstance(page, Page):
if isinstance(page, SimplePage):
data = page.get()
pages[ix] = np.flip(data)
else:
Expand Down Expand Up @@ -344,7 +368,7 @@ def __getitem__(self, item): # USER FUNCTION.
"""
result = []
for element in self.getpages(item):
if isinstance(element, Page):
if isinstance(element, SimplePage):
result.append(element.get())
else:
result.append(element)
Expand Down Expand Up @@ -466,9 +490,9 @@ def _setitem_prextend(self, key, value): # PRIVATE FUNCTION
start, end = end, end + page.len
if start <= key.stop < end: # find beginning
data = page.get()
keep = data[(key.stop - start) :] # keeping after key.stop
keep = data[(key.stop - start):] # keeping after key.stop
new = np_type_unify([value, keep])
tail = self.pages[index + 1 :] # keep pointers to pages.
tail = self.pages[index + 1:] # keep pointers to pages.
self.pages = []
self.extend(new) # handles pagination.
self.pages.extend(tail) # handles old pages.
Expand Down Expand Up @@ -500,7 +524,7 @@ def _setitem_insert(self, key, value): # PRIVATE FUNCTION

if start <= key_stop < end: # start of tail
data = page.get() if data is None else data # don't load again if on same page.
tail = data[key_stop - start :]
tail = data[key_stop - start:]

if key_stop < start:
unchanged_tail.append(page)
Expand Down Expand Up @@ -961,7 +985,7 @@ def count(self, item):


class Table(object):
_pid_dir = None # typically Path(Config.workdir) / f"pid-{os.getpid()}"
_pid_dir = None # typically `Config.workdir / Config.pid`
_ids = count()

def __init__(self, columns=None, headers=None, rows=None, _path=None) -> None:
Expand All @@ -978,7 +1002,7 @@ def __init__(self, columns=None, headers=None, rows=None, _path=None) -> None:
"""
if _path is None:
if self._pid_dir is None:
self._pid_dir = Path(Config.workdir) / f"pid-{os.getpid()}"
self._pid_dir = Path(Config.workdir) / Config.pid
if not self._pid_dir.exists():
self._pid_dir.mkdir()
(self._pid_dir / "pages").mkdir()
Expand Down Expand Up @@ -1129,7 +1153,7 @@ def __getitem__(self, keys): # USER FUNCTION
for item in column.getpages(slc):
if isinstance(item, np.ndarray):
new_column.extend(item) # extend subslice (expensive)
elif isinstance(item, Page):
elif isinstance(item, SimplePage):
new_column.pages.append(item) # extend page (cheap)
else:
raise TypeError(f"Bad item: {item}")
Expand Down Expand Up @@ -1275,6 +1299,7 @@ def load(cls, path, tqdm=_tqdm): # USER FUNCTION.
column.extend(data)
pbar.update(1)
t.columns[name] = column
update_access_time(path)
return t

def copy(self):
Expand Down Expand Up @@ -1539,7 +1564,7 @@ def datatype(col): # PRIVATE
else:
empty = [blanks] * 7
head = (col[:7].tolist() + empty)[:7]
tail = (col[n - 7 :].tolist() + empty)[-7:]
tail = (col[n - 7:].tolist() + empty)[-7:]
row = head + ["..."] + tail
data[name] = row

Expand Down
2 changes: 2 additions & 0 deletions tablite/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class Config(object):
workdir = pathlib.Path(os.environ.get("TABLITE_TMPDIR", f"{tempfile.gettempdir()}/tablite-tmp"))
workdir.mkdir(parents=True, exist_ok=True)

pid = f"pid-{os.getpid()}"

PAGE_SIZE = 1_000_000 # sets the page size limit.
ENCODING = "UTF-8" # sets the page encoding when using bytes

Expand Down
9 changes: 5 additions & 4 deletions tablite/import_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path
import openpyxl
import pyexcel
from tablite.utils import load_numpy
import sys
import warnings
import logging
Expand Down Expand Up @@ -244,7 +245,7 @@ def excel_reader(T, path, first_row_has_headers=True, header_row_index=0, sheet=
it_rows_filtered = ([row[idx].value for idx in it_used_indices] for row in it_rows)

# create page directory
workdir = Path(Config.workdir) / f"pid-{os.getpid()}"
workdir = Path(Config.workdir) / Config.pid
pagesdir = workdir/"pages"
pagesdir.mkdir(exist_ok=True, parents=True)

Expand Down Expand Up @@ -691,7 +692,7 @@ def text_reader_py(
)

# make sure the tempdir is ready.
workdir = Path(Config.workdir) / f"pid-{os.getpid()}"
workdir = Path(Config.workdir) / Config.pid
if not workdir.exists():
workdir.mkdir()
(workdir / "pages").mkdir()
Expand Down Expand Up @@ -757,7 +758,7 @@ def update(self, n=1):
t[name] = Column(t.path)
for cfg in configs:
for idx, npy in ((inv_field_relation[idx], npy) for idx, npy in enumerate(cfg.destination)):
data = np.load(npy, allow_pickle=True, fix_imports=False)
data = load_numpy(npy)

if guess_datatypes:
data = list_to_np_array(DataTypes.guess(data))
Expand Down Expand Up @@ -803,7 +804,7 @@ def text_reader_nim(
else:
raise NotImplementedError(f"encoding not implemented: {encoding}")

pid = Config.workdir / f"pid-{os.getpid()}"
pid = Config.workdir / Config.pid
kwargs = {}

if first_row_has_headers is not None:
Expand Down
3 changes: 2 additions & 1 deletion tablite/mp_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import operator
import numpy as np
from tablite.utils import load_numpy
from multiprocessing import shared_memory
import psutil
from tablite.config import Config
Expand Down Expand Up @@ -112,7 +113,7 @@ def reindex_task(src, dst, index_shm, shm_shape, start, end):
existing_shm = shared_memory.SharedMemory(name=index_shm)
shared_index = np.ndarray(shm_shape, dtype=np.int64, buffer=existing_shm.buf)
# work
array = np.load(src, allow_pickle=True, fix_imports=False)
array = load_numpy(src)
new = np.take(array, shared_index[start:end])
np.save(dst, new, allow_pickle=True, fix_imports=False)
# disconnect
Expand Down
Loading

0 comments on commit 2eb792f

Please sign in to comment.