Skip to content

Commit

Permalink
Merge pull request #73 from realratchet/master
Browse files Browse the repository at this point in the history
Fix refcount for deep table copies
  • Loading branch information
realratchet authored Aug 14, 2023
2 parents a1553a9 + dbf4368 commit 53bb033
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 14 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

| Version | Change |
|:-----------|-----------------------------------------------------|
|2023.6.3| Updated the way reference counting works. Tablite now tracks references to used pages and cleans them up based on number of references to those pages in the current process. This change allows to handle deep table clones when sending tables via processes (pickling/unpickling), whereas previous implementation would corrupt all tables using same pages due to reference counting asserting that all tables are shallow copies to the same object.
|2023.6.2| Updated `mplite` dependency, changed to soft version requirement to prevent pipeline freezes due to small bugfixes in `mplite`. |
|2023.6.1| Major change of the backend processes. Speed up of ~6x. For more see the [release notes](https://github.com/root-11/tablite/releases/tag/2023.6.1) |
| 2022.11.19 | Fixed some memory leaks. |
Expand Down
52 changes: 40 additions & 12 deletions tablite/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,44 @@ def shutdown():

class Page(object):
ids = count(start=1)
refcounts = {}

def _incr_refcount(self):
""" increment refcount of this page if it's used by this process"""
if f"pid-{os.getpid()}" in self.path.parts:
self.refcounts[self.path] = self.refcounts.get(self.path, 0) + 1


def __setstate__(self, state):
"""
when an object is unpickled,
say in a case of multi-processing, object.__setstate__(state) is called instead of __init__,
this means we need to update page refcount as if constructor had been called
"""
self.__dict__.update(state)

self._incr_refcount()

def __del__(self):
"""When python's reference count for an object is 0, python uses
it's garbage collector to remove the object and free the memory.
As tablite tables have columns and columns have page and pages have
data stored on disk, the space on disk must be freed up as well.
This __del__ override assures the cleanup of stored data.
"""
if f"pid-{os.getpid()}" not in self.path.parts:
return

refcount = self.refcounts[self.path] = max(self.refcounts.get(self.path, 0) - 1, 0)

if refcount > 0:
return

print(f"{os.getpid()} deleted page '{self.path}")

self.path.unlink(True)

del self.refcounts[self.path]

def __init__(self, path, array) -> None:
"""
Expand Down Expand Up @@ -104,6 +142,8 @@ def __init__(self, path, array) -> None:
np.save(self.path, array, allow_pickle=True, fix_imports=False)
log.debug(f"Page saved: {self.path}")

self._incr_refcount() # increment refcount for this page

def __len__(self):
return self.len

Expand All @@ -113,18 +153,6 @@ def __repr__(self) -> str:
def __hash__(self) -> int:
return self.id

def __del__(self):
"""When python's reference count for an object is 0, python uses
it's garbage collector to remove the object and free the memory.
As tablite tables have columns and columns have page and pages have
data stored on disk, the space on disk must be freed up as well.
This __del__ override assures the cleanup of stored data.
"""
if f"pid-{os.getpid()}" in self.path.parts:
if self.path.exists():
os.remove(self.path)
log.debug(f"Page deleted: {self.path}")

def get(self):
"""loads stored data
Expand Down
2 changes: 1 addition & 1 deletion tablite/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
major, minor, patch = 2023, 6, 2
major, minor, patch = 2023, 6, 3
__version_info__ = (major, minor, patch)
__version__ = ".".join(str(i) for i in __version_info__)
43 changes: 42 additions & 1 deletion tests/test_base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from tablite.base import Table, Column
from tablite.base import Table, Column, Page
from mplite import TaskManager, Task
from tablite.config import Config
import numpy as np
from pathlib import Path
import math
import os
import gc

import logging

Expand Down Expand Up @@ -554,3 +556,42 @@ def test_get_by_indices():
assert np.all(values == expected)

Config.PAGE_SIZE = old_cfg

def fn_foo_table(tbl):
return tbl

def test_page_refcount():
table = Table({"A": [0, 1, 2, 3], "B": [4, 5, 6, 7]})

assert all(Page.refcounts.get(p.path, 0) == 1 for p in table["A"].pages), "Refcount expected to be 1"
assert all(Page.refcounts.get(p.path, 0) == 1 for p in table["B"].pages), "Refcount expected to be 1"

with TaskManager(1) as tm:
""" this will cause deep table copy by copying table from main process -> child process -> main process """
tasks = [Task(fn_foo_table, table)]

result_table, *_ = tm.execute(tasks)

assert all(Page.refcounts.get(p.path, 0) == 2 for p in table["A"].pages), "Refcount expected to be 2"
assert all(Page.refcounts.get(p.path, 0) == 2 for p in table["B"].pages), "Refcount expected to be 2"

del result_table # deleting the table should reduce the refcounts for all pages
gc.collect()

assert all(Page.refcounts.get(p.path, 0) == 1 for p in table["A"].pages), "Refcount expected to be 1"
assert all(Page.refcounts.get(p.path, 0) == 1 for p in table["B"].pages), "Refcount expected to be 1"

table.show() # make sure table is not corrupt

a_pages = [p.path for p in table["A"].pages]
b_pages = [p.path for p in table["B"].pages]

del tm, tasks, table # deleting the table should reduce the refcounts for all pages
gc.collect()

assert all(p not in Page.refcounts for p in a_pages), "There should be no more pages left"
assert all(p not in Page.refcounts for p in b_pages), "There should be no more pages left"

assert all(p.exists() == False for p in a_pages), "Pages should be deleted"
assert all(p.exists() == False for p in b_pages), "Pages should be deleted"

0 comments on commit 53bb033

Please sign in to comment.