diff --git a/changelog.md b/changelog.md index c995be5f..4314ddcb 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,7 @@ | Version | Change | |:-----------|-----------------------------------------------------| +|2023.8.0| Nim backend for csv importer.
Improve excel importer.
Improve slicing consistency.
Logical cores re-enabled on *nix based systems.
Filter is now type safe.
Added merge utility.
Various bugfixes.| |2023.6.5| Fix issues with `get_headers` falling back to text reading when reading 0 lines of excel, fix issue where reading excel file would ignore file count, excel file reader now has parity for linecount selection. | |2023.6.4| Fix a logic bug in `get_headers` that caused one extra line to be returned than requested. | |2023.6.3| Updated the way reference counting works. Tablite now tracks references to used pages and cleans them up based on number of references to those pages in the current process. This change allows to handle deep table clones when sending tables via processes (pickling/unpickling), whereas previous implementation would corrupt all tables using same pages due to reference counting asserting that all tables are shallow copies to the same object. diff --git a/requirements.txt b/requirements.txt index a73f30bc..27486cdf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ pyexcel-ods==0.6.0 pyexcel-xlsx==0.6.0 pyexcel-xls==0.7.0 pyuca>=1.2 -mplite>=1.2.3 +mplite>=1.2.5 PyYAML==6.0 openpyxl==3.0.10 # newest version breaks pyexcel h5py>=3.6.0 diff --git a/tablite/_nimlite/nimlite.nim b/tablite/_nimlite/nimlite.nim index 78093d85..f7151382 100644 --- a/tablite/_nimlite/nimlite.nim +++ b/tablite/_nimlite/nimlite.nim @@ -89,9 +89,6 @@ when isMainModule and appType == "lib": echo $e.msg & "\n" & $e.getStackTrace raise e -# if isMainModule: -# echo "Nimlite imported!" - when isMainModule and appType != "lib": import argparse import std/[sugar, json] diff --git a/tablite/base.py b/tablite/base.py index f17421da..749faca9 100644 --- a/tablite/base.py +++ b/tablite/base.py @@ -7,6 +7,7 @@ import logging import warnings import zipfile +from tablite.utils import load_numpy, update_access_time import numpy as np from tqdm import tqdm as _tqdm from pathlib import Path @@ -55,7 +56,7 @@ def register(path): def shutdown(): """method to clean up temporary files triggered at shutdown.""" for path in file_registry: - if str(os.getpid()) in str(path): # safety feature to prevent rm -rf / + if Config.pid in str(path): # safety feature to prevent rm -rf / log.debug(f"shutdown: running rmtree({path})") shutil.rmtree(path) @@ -63,16 +64,24 @@ def shutdown(): atexit.register(shutdown) -class Page(object): +class SimplePage(object): ids = count(start=1) refcounts = {} + autocleanup = True + + def __init__(self, id, path, len, py_dtype) -> None: + self.id = id + self.path = path / "pages" / f"{id}.npy" + self.len = len + self.dtype = py_dtype + + self._incr_refcount() def _incr_refcount(self): """ increment refcount of this page if it's used by this process""" - if f"pid-{os.getpid()}" in self.path.parts: + if self.owns(): self.refcounts[self.path] = self.refcounts.get(self.path, 0) + 1 - def __setstate__(self, state): """ when an object is unpickled, @@ -80,9 +89,35 @@ def __setstate__(self, state): this means we need to update page refcount as if constructor had been called """ self.__dict__.update(state) - + self._incr_refcount() + @classmethod + def next_id(cls, path): + while True: + _id = next(cls.ids) + type_check(path, Path) + _path = path / "pages" / f"{_id}.npy" + + if not _path.exists(): + break # make sure we don't override existing pages if they are created outside of main thread + + return _id + + def __len__(self): + return self.len + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.path}, {self.get()})" + + def __hash__(self) -> int: + return hash(self.id) + + def owns(self): + parts = self.path.parts + + return all((p in parts for p in Path(Config.pid).parts)) + def __del__(self): """When python's reference count for an object is 0, python uses it's garbage collector to remove the object and free the memory. @@ -90,31 +125,37 @@ def __del__(self): data stored on disk, the space on disk must be freed up as well. This __del__ override assures the cleanup of stored data. """ - if f"pid-{os.getpid()}" not in self.path.parts: + if not self.owns(): return - + refcount = self.refcounts[self.path] = max(self.refcounts.get(self.path, 0) - 1, 0) if refcount > 0: return - self.path.unlink(True) + if self.autocleanup: + self.path.unlink(True) del self.refcounts[self.path] + def get(self): + """loads stored data + + Returns: + np.ndarray: stored data. + """ + array = load_numpy(self.path) + return MetaArray(array, array.dtype, py_dtype=self.dtype) + + +class Page(SimplePage): def __init__(self, path, array) -> None: """ Args: path (Path): working directory. array (np.array): data """ - while True: - self.id = next(self.ids) - type_check(path, Path) - self.path = path / "pages" / f"{self.id}.npy" - - if not self.path.exists(): - break # make sure we don't override existing pages if they are created outside of main thread + _id = self.next_id(path) type_check(array, np.ndarray) @@ -136,33 +177,16 @@ def __init__(self, path, array) -> None: ) raise OSError(msg) - self.len = len(array) + _len = len(array) # type_check(array, MetaArray) if not hasattr(array, "metadata"): raise ValueError - self.dtype = array.metadata["py_dtype"] - np.save(self.path, array, allow_pickle=True, fix_imports=False) - log.debug(f"Page saved: {self.path}") - - self._incr_refcount() # increment refcount for this page - - def __len__(self): - return self.len + _dtype = array.metadata["py_dtype"] - def __repr__(self) -> str: - return f"{self.__class__.__name__}({self.path}, {self.get()})" - - def __hash__(self) -> int: - return self.id - - def get(self): - """loads stored data + super().__init__(_id, path, _len, _dtype) - Returns: - np.ndarray: stored data. - """ - array = np.load(self.path, allow_pickle=True, fix_imports=False) - return MetaArray(array, array.dtype, py_dtype=self.dtype) + np.save(self.path, array, allow_pickle=True, fix_imports=False) + log.debug(f"Page saved: {self.path}") class Column(object): @@ -300,14 +324,14 @@ def getpages(self, item): pages.append(page) else: # fetch the slice and filter it. search_slice = slice(ro.start - start, ro.stop - start, ro.step) - np_arr = np.load(page.path, allow_pickle=True, fix_imports=False) + np_arr = load_numpy(page.path) match = np_arr[search_slice] pages.append(match) if is_reversed: pages.reverse() for ix, page in enumerate(pages): - if isinstance(page, Page): + if isinstance(page, SimplePage): data = page.get() pages[ix] = np.flip(data) else: @@ -344,7 +368,7 @@ def __getitem__(self, item): # USER FUNCTION. """ result = [] for element in self.getpages(item): - if isinstance(element, Page): + if isinstance(element, SimplePage): result.append(element.get()) else: result.append(element) @@ -466,9 +490,9 @@ def _setitem_prextend(self, key, value): # PRIVATE FUNCTION start, end = end, end + page.len if start <= key.stop < end: # find beginning data = page.get() - keep = data[(key.stop - start) :] # keeping after key.stop + keep = data[(key.stop - start):] # keeping after key.stop new = np_type_unify([value, keep]) - tail = self.pages[index + 1 :] # keep pointers to pages. + tail = self.pages[index + 1:] # keep pointers to pages. self.pages = [] self.extend(new) # handles pagination. self.pages.extend(tail) # handles old pages. @@ -500,7 +524,7 @@ def _setitem_insert(self, key, value): # PRIVATE FUNCTION if start <= key_stop < end: # start of tail data = page.get() if data is None else data # don't load again if on same page. - tail = data[key_stop - start :] + tail = data[key_stop - start:] if key_stop < start: unchanged_tail.append(page) @@ -961,7 +985,7 @@ def count(self, item): class Table(object): - _pid_dir = None # typically Path(Config.workdir) / f"pid-{os.getpid()}" + _pid_dir = None # typically `Config.workdir / Config.pid` _ids = count() def __init__(self, columns=None, headers=None, rows=None, _path=None) -> None: @@ -978,7 +1002,7 @@ def __init__(self, columns=None, headers=None, rows=None, _path=None) -> None: """ if _path is None: if self._pid_dir is None: - self._pid_dir = Path(Config.workdir) / f"pid-{os.getpid()}" + self._pid_dir = Path(Config.workdir) / Config.pid if not self._pid_dir.exists(): self._pid_dir.mkdir() (self._pid_dir / "pages").mkdir() @@ -1129,7 +1153,7 @@ def __getitem__(self, keys): # USER FUNCTION for item in column.getpages(slc): if isinstance(item, np.ndarray): new_column.extend(item) # extend subslice (expensive) - elif isinstance(item, Page): + elif isinstance(item, SimplePage): new_column.pages.append(item) # extend page (cheap) else: raise TypeError(f"Bad item: {item}") @@ -1275,6 +1299,7 @@ def load(cls, path, tqdm=_tqdm): # USER FUNCTION. column.extend(data) pbar.update(1) t.columns[name] = column + update_access_time(path) return t def copy(self): @@ -1539,7 +1564,7 @@ def datatype(col): # PRIVATE else: empty = [blanks] * 7 head = (col[:7].tolist() + empty)[:7] - tail = (col[n - 7 :].tolist() + empty)[-7:] + tail = (col[n - 7:].tolist() + empty)[-7:] row = head + ["..."] + tail data[name] = row diff --git a/tablite/config.py b/tablite/config.py index 96044fd2..4c8ad817 100644 --- a/tablite/config.py +++ b/tablite/config.py @@ -43,6 +43,8 @@ class Config(object): workdir = pathlib.Path(os.environ.get("TABLITE_TMPDIR", f"{tempfile.gettempdir()}/tablite-tmp")) workdir.mkdir(parents=True, exist_ok=True) + pid = f"pid-{os.getpid()}" + PAGE_SIZE = 1_000_000 # sets the page size limit. ENCODING = "UTF-8" # sets the page encoding when using bytes diff --git a/tablite/import_utils.py b/tablite/import_utils.py index 4f1ef80a..aecc2c5e 100644 --- a/tablite/import_utils.py +++ b/tablite/import_utils.py @@ -8,6 +8,7 @@ from pathlib import Path import openpyxl import pyexcel +from tablite.utils import load_numpy import sys import warnings import logging @@ -244,7 +245,7 @@ def excel_reader(T, path, first_row_has_headers=True, header_row_index=0, sheet= it_rows_filtered = ([row[idx].value for idx in it_used_indices] for row in it_rows) # create page directory - workdir = Path(Config.workdir) / f"pid-{os.getpid()}" + workdir = Path(Config.workdir) / Config.pid pagesdir = workdir/"pages" pagesdir.mkdir(exist_ok=True, parents=True) @@ -691,7 +692,7 @@ def text_reader_py( ) # make sure the tempdir is ready. - workdir = Path(Config.workdir) / f"pid-{os.getpid()}" + workdir = Path(Config.workdir) / Config.pid if not workdir.exists(): workdir.mkdir() (workdir / "pages").mkdir() @@ -757,7 +758,7 @@ def update(self, n=1): t[name] = Column(t.path) for cfg in configs: for idx, npy in ((inv_field_relation[idx], npy) for idx, npy in enumerate(cfg.destination)): - data = np.load(npy, allow_pickle=True, fix_imports=False) + data = load_numpy(npy) if guess_datatypes: data = list_to_np_array(DataTypes.guess(data)) @@ -803,7 +804,7 @@ def text_reader_nim( else: raise NotImplementedError(f"encoding not implemented: {encoding}") - pid = Config.workdir / f"pid-{os.getpid()}" + pid = Config.workdir / Config.pid kwargs = {} if first_row_has_headers is not None: diff --git a/tablite/mp_utils.py b/tablite/mp_utils.py index 9c76f9f0..ba451f99 100644 --- a/tablite/mp_utils.py +++ b/tablite/mp_utils.py @@ -1,5 +1,6 @@ import operator import numpy as np +from tablite.utils import load_numpy from multiprocessing import shared_memory import psutil from tablite.config import Config @@ -112,7 +113,7 @@ def reindex_task(src, dst, index_shm, shm_shape, start, end): existing_shm = shared_memory.SharedMemory(name=index_shm) shared_index = np.ndarray(shm_shape, dtype=np.int64, buffer=existing_shm.buf) # work - array = np.load(src, allow_pickle=True, fix_imports=False) + array = load_numpy(src) new = np.take(array, shared_index[start:end]) np.save(dst, new, allow_pickle=True, fix_imports=False) # disconnect diff --git a/tablite/nimlite.py b/tablite/nimlite.py index c85cc5fd..1d68c410 100644 --- a/tablite/nimlite.py +++ b/tablite/nimlite.py @@ -2,14 +2,14 @@ import json import psutil import platform -import numpy as np import subprocess as sp from pathlib import Path from tqdm import tqdm as _tqdm from tablite.config import Config from mplite import Task, TaskManager +from tablite.utils import load_numpy from tablite.utils import generate_random_string -from tablite.base import Page, Column, pytype_from_iterable +from tablite.base import SimplePage, Column, pytype_from_iterable IS_WINDOWS = platform.system() == "Windows" USE_CLI_BACKEND = IS_WINDOWS @@ -28,15 +28,12 @@ sys.argv.extend(paths) # importing nim module messes with pythons launch arguments!!! -class TmpPage(Page): +class NimPage(SimplePage): def __init__(self, id, path, data) -> None: - self.id = id - self.path = path / "pages" / f"{self.id}.npy" - self.len = len(data) - _, py_dtype = pytype_from_iterable(data) if self.len > 0 else (None, object) - self.dtype = py_dtype + _len = len(data) + _, _dtype = pytype_from_iterable(data) if _len > 0 else (None, object) - self._incr_refcount() + super().__init__(id, path, _len, _dtype) def text_reader_task(*, pid, path, encoding, dialect, task, import_fields, guess_dtypes): @@ -83,8 +80,8 @@ def text_reader_task(*, pid, path, encoding, dialect, task, import_fields, guess pages = [] for p in (Path(p) for p in task["pages"]): id = int(p.name.replace(p.suffix, "")) - arr = np.load(p, allow_pickle=True) - page = TmpPage(id, pid, arr) + arr = load_numpy(p) + page = NimPage(id, pid, arr) pages.append(page) return pages diff --git a/tablite/sortation.py b/tablite/sortation.py index 6bcff717..4082d336 100644 --- a/tablite/sortation.py +++ b/tablite/sortation.py @@ -1,6 +1,7 @@ import os import numpy as np import psutil +from tablite.utils import load_numpy from mplite import Task, TaskManager from tablite.mp_utils import share_mem, reindex_task, select_processing_method from tablite.datatypes import multitype_set, numpy_to_python @@ -131,7 +132,7 @@ def _mp_reindex(T, index): for name in T.columns: t[name] = Column(t.path) for dst in new[name]: - data = np.load(dst, allow_pickle=True, fix_imports=False) + data = load_numpy(dst) t[name].extend(data) os.remove(dst) return t diff --git a/tablite/utils.py b/tablite/utils.py index ef55c84c..acb930e8 100644 --- a/tablite/utils.py +++ b/tablite/utils.py @@ -1,6 +1,10 @@ from collections import defaultdict import math import re +import os +from time import time as now +from pathlib import Path +import numpy as np import ast from datetime import datetime, date, time, timedelta, timezone # noqa from itertools import compress @@ -449,4 +453,14 @@ def fixup_worksheet(worksheet): worksheet._max_column = ws_cols worksheet._max_row = ws_rows except Exception as e: - logging.error(f"Failed to fetch true dimensions: {e}") \ No newline at end of file + logging.error(f"Failed to fetch true dimensions: {e}") + +def update_access_time(path): + path = Path(path) + stat = path.stat() + os.utime(path, (now(), stat.st_mtime)) + +def load_numpy(path): + update_access_time(path) + + return np.load(path, allow_pickle=True, fix_imports=False) \ No newline at end of file diff --git a/tablite/version.py b/tablite/version.py index 95db57fc..8fb1c8af 100644 --- a/tablite/version.py +++ b/tablite/version.py @@ -1,3 +1,3 @@ -major, minor, patch = 2023, 8, "dev7" +major, minor, patch = 2023, 8, 0 __version_info__ = (major, minor, patch) __version__ = ".".join(str(i) for i in __version_info__)