From 9a3b6d459973eae0e820cbf49ac49c90214e1a39 Mon Sep 17 00:00:00 2001 From: mio Date: Mon, 23 Dec 2024 15:29:34 +0800 Subject: [PATCH] Several important changes - Support DUPSORT databases - DBI is not closed anymore and mdbx_env_close will handle it - More pythonic iterators and support __enter__ __exit__ - A few typing fixes --- mdbx/mdbx.py | 428 ++++++++++++++++++++++++++++---------------- tests/test_iters.py | 80 +++++++++ tests/test_mdbx.py | 16 +- 3 files changed, 360 insertions(+), 164 deletions(-) create mode 100644 tests/test_iters.py diff --git a/mdbx/mdbx.py b/mdbx/mdbx.py index 1423ee9..8242594 100644 --- a/mdbx/mdbx.py +++ b/mdbx/mdbx.py @@ -24,6 +24,11 @@ import os from pathlib import Path import sys +import itertools +from typing import Optional, Tuple, Iterator, List +from weakref import ReferenceType +import weakref +import logging # init lib SO_FILE = { @@ -663,6 +668,12 @@ class MDBXTXNFlags(enum.IntFlag): # Exactly the same as \ref MDBX_SAFE_NOSYNC # but for this transaction only MDBX_TXN_NOSYNC = MDBXEnvFlags.MDBX_SAFE_NOSYNC + + def is_read_only(self) -> bool: + return self == MDBXTXNFlags.MDBX_TXN_RDONLY + + def is_read_write(self) -> bool: + return self == MDBXTXNFlags.MDBX_TXN_READWRITE class MDBXDBFlags(enum.IntFlag): @@ -1472,6 +1483,13 @@ def __init__(self, base: bytes=None, length: int=0): self.iov_base=ctypes.cast(ctypes.c_char_p(base), ctypes.c_void_p) self.iov_len=length + def to_bytes(self) -> Optional[bytes]: + length = self.iov_len + if length == 0: + return None + else: + return bytes(ctypes.cast(self.iov_base, ctypes.POINTER(ctypes.c_ubyte))[:length]) + def __repr__(self): return "iovec{.iov_base=%s, iov.len=%s}" % (self.iov_base, self.iov_len) @@ -1531,6 +1549,7 @@ def __init__(self, errno: int, errmsg: str): self.message = errmsg super().__init__(self.message) + class TXN: """ An abstraction of the MdbxTxn struct and related functions @@ -1552,36 +1571,23 @@ def __init__(self, env: Env, parent: TXN=None, flags: MDBXTXNFlags=0, context=No self._txn = ctypes.POINTER(MDBXTXN)() self._env = env self._ctx = None - self._dependents = [] + self._flags = flags + self._dependents: List[ReferenceType[Cursor]] = [] + env._dependents.append(weakref.ref(self)) ret=_lib.mdbx_txn_begin_ex(env._env, parent, flags, ctypes.pointer(self._txn), context) if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) def __del__(self): - # don't rely on order of deletion - self.abort() - - def _invalidate(self): - """ - Invalidate references to own ctypes objects and remove self from dependents of the TXN's Env - - Used to make sure Cases in which libmdbx frees TXN objects, the reference to it is also set to None - - After calling this, the _txn and _env references are set to None, because the TXN is not legal to use now. - """ - if self._txn: - for i in self._dependents: - try: - i.abort() - except: - pass - try: - i.close() - except: - pass - self._txn = None - self._env._dependents.remove(self) - self._env = None + logging.getLogger(__name__).debug(f"Transaction {self._txn} being deleted, dependents: {self._dependents}") + self.close() + + def __enter__(self): + return self + + def __exit__(self, _1, _2, _3): + logging.getLogger(__name__).debug(f"Transaction {self._txn} exits, dependents: {self._dependents}") + self.close() def break_txn(self): """ @@ -1606,11 +1612,14 @@ def commit(self): :returns: True ;rtype bool """ + logging.getLogger(__name__).debug(f"Transaction {self._txn} being commit, dependents: {self._dependents}") if self._txn: + self.__inform_deps() ret=_lib.mdbx_txn_commit(self._txn) - self._invalidate() if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) + self._txn = None + self._env = None return True def commit_ex(self) -> MDBXCommitLatency: @@ -1621,11 +1630,15 @@ def commit_ex(self) -> MDBXCommitLatency: :returns ctypes struct of MDBX_commit_latency :rtype MDBXCommitLatency """ + logging.getLogger(__name__).debug(f"Transaction {self._txn} being committed_ex, dependents: {self._dependents}") if self._txn: + self.__inform_deps() commit_latency=MDBXCommitLatency() ret=_lib.mdbx_txn_commit_ex(self._txn, ctypes.byref(commit_latency)) if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) + self._txn = None + self._env = None return commit_latency def id(self) -> int : @@ -1727,6 +1740,17 @@ def reset(self): return True return False + def __inform_deps(self): + for cur in self._dependents: + cur = cur() + if cur is not None: + cur.close() + self._dependents = [] + + def close(self): + if self._txn: + # The transaction is still alive, we must abort it + self.abort() def abort(self): """ @@ -1738,20 +1762,34 @@ def abort(self): :returns: boolean indicating success or failure (if TXN was invalid) :rtype bool """ + logging.getLogger(__name__).debug(f"Transaction {self._txn} being aborted, dependents: {self._dependents}") if self._txn: + self.__inform_deps() # It's okay to double-close ret=_lib.mdbx_txn_abort(self._txn) - self._invalidate() if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) + self._txn = None + self._env = None return True return False def create_map(self, name: str=None, flags: MDBXDBFlags=MDBXDBFlags.MDBX_CREATE): + """ + Wrapper around mdbx_dbi_open, intended to create a database(map) + + Raises MDBXErrorExc or OSerror + :param name: DBI name or None, if default DB is to be opened + :type name: str + :param flags: Combination of MDBXDBFlags, passed to mdbx_dbi_open + :type flags: Combination of MDBXDBFlags + :returns: Reference to opened DBI DBI if success, or False in case TXN was invalid + :rtype DBI in case of success, or bool in case of failure + """ return self.open_map(name, flags | MDBXDBFlags.MDBX_CREATE) def open_map(self, name: str=None, flags: MDBXDBFlags=MDBXDBFlags.MDBX_DB_DEFAULTS): """ - Wrapper around mdbx_dbi_open + Wrapper around mdbx_dbi_open, intended to open an existing map Raises MDBXErrorExc or OSerror :param name: DBI name or None, if default DB is to be opened @@ -1765,10 +1803,12 @@ def open_map(self, name: str=None, flags: MDBXDBFlags=MDBXDBFlags.MDBX_DB_DEFAUL dbi=ctypes.c_uint32() dbi.value=0 if isinstance(name, str): - name=name.encode("utf-8") - if name: - name=ctypes.c_char_p(name) - ret=_lib.mdbx_dbi_open(self._txn, name, flags, ctypes.pointer(dbi)) + cname=name.encode("utf-8") + else: + cname=name + if cname: + cname=ctypes.c_char_p(cname) + ret=_lib.mdbx_dbi_open(self._txn, cname, flags, ctypes.pointer(dbi)) if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) ret=DBI(self._env, MDBXDBI(dbi)) @@ -1794,7 +1834,7 @@ def get_info(self, scan_rlt: bool=False) -> MDBXTXNInfo: return info return None - def get_canary(self) -> MDBXCanary(): + def get_canary(self) -> MDBXCanary: """ Thin wrapper around mdbx_canary_get @@ -1820,6 +1860,20 @@ def put_canary(self, canary: MDBXCanary): if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) + def cursor(self, db: Optional[DBI | str] = None) -> Cursor: + """ + Creata a cursor on a database. If the argument is str and current transaction is a read-write + transaction, the database will be created. + """ + if isinstance(db, str): + if self._flags.is_read_only(): + db = self.open_map(db) + else: + db = self.create_map(db) + elif db is None: + db = self.open_map(db) + return Cursor(db, self, self._ctx) + @dataclasses.dataclass class Geometry: """ @@ -1832,49 +1886,6 @@ class Geometry: shrink_threshold: int=-1 pagesize: int=-1 -class EnvIterator(): - """ - Iterator object for iterating over DB contents - - Raises MDBXErrorExc or OSError - :param env: Environment this iterator belongs to - :type env: Env - :param dbname: Name of the DB to be iterated over - :type dbname: str - """ - def __init__(self, env: Env, dbname: str): - self._env = env - self._txn = env.start_transaction(flags=MDBXTXNFlags.MDBX_TXN_RDONLY) - self._dbname = dbname - self._db = self._txn.open_map(dbname) - self._started = False - self._cursor=ctypes.POINTER(MDBXCursor)() - ret=_lib.mdbx_cursor_open(self._txn._txn, self._db._dbi, ctypes.pointer(self._cursor)) - if ret != MDBXError.MDBX_SUCCESS.value: - raise make_exception(ret) - def __next__(self): - """ - Get next database key and value pair - Raises MDBXErrorExc or OSError - """ - val=Iovec(bytes()) - key=Iovec(bytes()) - if not self._started: - self._started=True - ret=_lib.mdbx_cursor_get(self._cursor, ctypes.pointer(key), ctypes.pointer(val), MDBXCursorOp.MDBX_FIRST) - else: - ret=_lib.mdbx_cursor_get(self._cursor, ctypes.pointer(key), ctypes.pointer(val), MDBXCursorOp.MDBX_NEXT) - - if ret != MDBXError.MDBX_SUCCESS.value: - if _lib.mdbx_cursor_eof(self._cursor) or ret == MDBXError.MDBX_NOTFOUND.value: - raise StopIteration - raise make_exception(ret) - val=bytes(ctypes.cast(val.iov_base, ctypes.POINTER(ctypes.c_ubyte))[:val.iov_len]) - key=bytes(ctypes.cast(key.iov_base, ctypes.POINTER(ctypes.c_ubyte))[:key.iov_len]) - return key, val - - def __del__(self): - self._txn.abort() class Env(object): """ @@ -1895,13 +1906,21 @@ class Env(object): :type maxdbs: int """ - def __init__(self, path: str="default_db", flags: MDBXEnvFlags=0, mode: MDBXMode=0o755, geometry: Geometry=None, - maxreaders: int=1, maxdbs: int=1): + def __init__(self, + path: str, + flags: MDBXEnvFlags=0, + mode: MDBXMode=0o755, + geometry: Geometry=None, + maxreaders: int=1, + maxdbs: int=1, + sync_bytes: int=None, + sync_period: int=None + ): self._env = ctypes.pointer(MDBXEnv()) ret=_lib.mdbx_env_create(ctypes.byref(self._env)) self._default_db=None self._current_txn=None - self._dependents=[] + self._dependents: List[ReferenceType[TXN] | ReferenceType[DBI]]=[] self._ctx=None if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) @@ -1920,20 +1939,29 @@ def __init__(self, path: str="default_db", flags: MDBXEnvFlags=0, mode: MDBXMode ret=_lib.mdbx_env_set_maxdbs(self._env, maxdbs) if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) - ret=_lib.mdbx_env_open(self._env, ctypes.c_char_p(bytes(path, "utf-8")), flags, mode) + ret=_lib.mdbx_env_open(self._env, ctypes.c_char_p(bytes(str(path), "utf-8")), flags, mode) if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) - + + if sync_bytes is not None: + self.set_option(MDBXOption.MDBX_opt_sync_bytes, sync_bytes) + + if sync_period is not None: + self.set_option(MDBXOption.MDBX_opt_sync_period, sync_period) + def __del__(self): self.close() + def __enter__(self): + return self + def __exit__(self, _1, _2, _3): self.close() def __repr__(self): return "Env { \"path\" : \"%s\"}" % self.get_path() - def __getitem__(self, key: (str, bytes)): + def __getitem__(self, key: Tuple[str, bytes]): """ Gets item from currently set default database Opens a read only transaction, gets the object, aborts the transaction @@ -1947,7 +1975,7 @@ def __getitem__(self, key: (str, bytes)): if not isinstance(key, (str, bytes)): raise KeyError("Key can only be string or bytes") try: - txn=self.start_transaction(flags=MDBXTXNFlags.MDBX_TXN_RDONLY) + txn=self.ro_transaction() db=txn.open_map(self._default_db) if isinstance(key, str): key=key.encode("utf-8") @@ -1957,7 +1985,7 @@ def __getitem__(self, key: (str, bytes)): except Exception as e: return None - def __setitem__(self, key: (str, bytes), val: (str, bytes)): + def __setitem__(self, key: Tuple[str, bytes], val: Tuple[str, bytes]): """ Sets the given key and val in the current default database if key or val are strings, they are converted into bytes using utf-8 encoding @@ -1992,27 +2020,24 @@ def __iter__(self): :returns iterator over default_db :rtype: EnvIterator """ - return EnvIterator(self, self._default_db) + txn = self.ro_transaction() + cur = Cursor(self._default_db, txn, self._ctx) + return cur.iter() def close(self): """ - Closes this Env - - Closes or aborts all TXNs or DBIs belonging to this Env - Invalidates the reference to the Env - + Closes this Env. _In most cases, you don't need to call this._ mdbx-py has + internal reference counting to _safely_ garbage collect envs, txs and cursors. + Raises MDBXErrorExc or OSError """ + logging.getLogger(__name__).debug(f"env {self._env} being closed, dependents: {self._dependents}") if self._env: - for i in self._dependents: - try: - i.abort() - except: - pass - try: - i.close() - except: - pass + for tx in self._dependents: + tx = tx() + if tx is not None: + tx.close() + self._dependents = [] ret=_lib.mdbx_env_close(self._env) if ret != MDBXError.MDBX_BUSY.value: self._env = None @@ -2044,7 +2069,12 @@ def set_default_db(self, name: str): """ self._default_db=name - + def ro_transaction(self) -> TXN: + return self.start_transaction(MDBXTXNFlags.MDBX_TXN_RDONLY, None) + + def rw_transaction(self) -> TXN: + return self.start_transaction(MDBXTXNFlags.MDBX_TXN_READWRITE, None) + def start_transaction(self, flags: MDBXTXNFlags=0, parent_txn: TXN=None): """ Starts a transaction on the given Env @@ -2060,7 +2090,7 @@ def start_transaction(self, flags: MDBXTXNFlags=0, parent_txn: TXN=None): # start transaction and return new object if self._env: txn=TXN(self, parent_txn, flags) - self._dependents.append(txn) + logging.getLogger(__name__).debug(f"Starting transaction {txn._txn}") return txn def get_path(self): @@ -2405,45 +2435,27 @@ def __init__(self, env: Env, dbi: MDBXDBI): """ self._dbi=dbi self._env=env - self._dependents = [] def __del__(self): - """ - Just here for documentation. - Don't do anything because doing so automatically can lead to DB corruption! - """ - self._invalidate() pass + def __enter__(self): + return self + + def __exit__(self, _1, _2, _3): + pass + def __repr__(self): return str("%s: %s" % (self._env, self._dbi)) - def _invalidate(self): - if self._dbi: - for i in self._dependents: - try: - i.abort() - except: - pass - try: - i.close() - except: - pass - - def close(self): """ - Thin wrapper around mdbx_dbi_close - - Not valid for DBI 0 - - Raises MDBXErrorExc or OSError - + Do nothing and delegate mdbx_env_close to close all handles. + + MDBX will reuse handles (closing one will invalidate other aliased handles) and it is + tedious to track the duplicate handle aliases by our side. """ - self._invalidate() - ret=_lib.mdbx_dbi_close(self._env._env, self._dbi) - if ret != MDBXError.MDBX_SUCCESS.value: - raise make_exception(ret) + pass def get(self, txn: TXN, key: bytes) -> bytes: """ @@ -2465,7 +2477,7 @@ def get(self, txn: TXN, key: bytes) -> bytes: if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) - return bytes(ctypes.cast(data.iov_base, ctypes.POINTER(ctypes.c_ubyte))[:data.iov_len]) + return data.to_bytes() def put(self, txn: TXN, key: bytes, value: bytes, flags: MDBXPutFlags=0): """ @@ -2565,25 +2577,23 @@ def __init__(self, db: DBI=None, txn: TXN=None, ctx=None): :type ctx: Object """ self._db = db - if db: - db._dependents.append(self) self._txn = txn - if txn: - txn._dependents.append(self) self._ctx = ctx self._started=False self._cursor=ctypes.POINTER(MDBXCursor)() ret=None if db and txn: ret=_lib.mdbx_cursor_open(txn._txn, db._dbi, ctypes.pointer(self._cursor)) - if not ctypes.cast(self._cursor, ctypes.c_void_p): - raise ValueError() if ret and ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) else: + if not ctypes.cast(self._cursor, ctypes.c_void_p): + raise ValueError(f"The returned cursor pointer {self._cursor} is invalid?!") self._cursor.value=_lib.mdbx_cursor_create(ctypes.c_void_p()) if not self._cursor.value: raise MemoryError() + if txn: + txn._dependents.append(weakref.ref(self)) def dbi(self): """ @@ -2606,13 +2616,16 @@ def bind(self, txn: TXN, db: DBI=None): if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) + def __enter__(self): + return self + + def __exit__(self, _1, _2, _3): + logging.getLogger(__name__).debug(f"Cursor {self._cursor} exits") + self.__del__() + def __del__(self): + logging.getLogger(__name__).debug(f"Cursor {self._cursor} being deleted") self.close() - if self._db: - self._db._dependents.remove(self) - if self._txn: - self._txn._dependents.remove(self) - self._cursor=None def __iter__(self): return self @@ -2647,9 +2660,13 @@ def close(self): Sets self._cursor to None, invalidating the reference to it """ + logging.getLogger(__name__).debug(f"Cursor {self._cursor} being closed") if self._cursor: _lib.mdbx_cursor_close(self._cursor) + # This is important to release the strong references self._cursor = None + self._txn = None + self._db = None def set_user_ctx(self, val): """ @@ -2717,7 +2734,26 @@ def copy(self, dest: Cursor): if ret != MDBXError.MDBX_SUCCESS.value: raise make_exception(ret) - def get(self, key: bytes, cursor_op: MDBXCursorOp=MDBXCursorOp.MDBX_FIRST) -> bytes: + def dup(self) -> Cursor: + cursor = Cursor(self._db, self._txn, self._ctx) + self.copy(cursor) + return cursor + + def get_full(self, key: Optional[bytes], cursor_op: MDBXCursorOp) -> Tuple[Optional[bytes], Optional[bytes]]: + if self._cursor: + io_key = Iovec(key) + io_data = Iovec(None, 1) + + ret=_lib.mdbx_cursor_get(self._cursor, ctypes.byref(io_key), ctypes.byref(io_data), cursor_op) + if ret == MDBXError.MDBX_NOTFOUND or ret == MDBXError.MDBX_ENODATA: + return (None, None) + if ret != MDBXError.MDBX_SUCCESS.value: + raise make_exception(ret) + out_key = io_key.to_bytes() + out_value = io_data.to_bytes() + return (out_key, out_value) + + def get(self, key: Optional[bytes], cursor_op: MDBXCursorOp=MDBXCursorOp.MDBX_FIRST) -> Optional[bytes]: """ Wrapper around mdbx_cursor_get @@ -2729,17 +2765,8 @@ def get(self, key: bytes, cursor_op: MDBXCursorOp=MDBXCursorOp.MDBX_FIRST) -> by :returns value of the key :rtype bytes """ - if self._cursor: - key = Iovec(key) - data = Iovec(None, 1) - - ret=_lib.mdbx_cursor_get(self._cursor, ctypes.byref(key), ctypes.byref(data), cursor_op) - if ret == MDBXError.MDBX_NOTFOUND: - return None - if ret != MDBXError.MDBX_SUCCESS.value: - raise make_exception(ret) - - return bytes(ctypes.cast(data.iov_base, ctypes.POINTER(ctypes.c_ubyte))[:data.iov_len]) + _, v = self.get_full(key, cursor_op) + return v def put(self, key: bytes, val: bytes, flags: MDBXPutFlags=MDBXPutFlags.MDBX_UPSERT): """ @@ -2858,6 +2885,98 @@ def renew(self, txn: TXN): raise make_exception(ret) return False + def iter(self, + start_key: Optional[bytes] = None, + from_next: bool = False, + copy_cursor: bool = False + ) -> Iterator[Tuple[bytes, bytes]]: + if start_key is not None and from_next: + raise RuntimeError("start_key and from_next can not be used at the same time") + if copy_cursor: + cursor = self.dup() + else: + cursor = self + if from_next: + first_op = MDBXCursorOp.MDBX_NEXT + else: + first_op = MDBXCursorOp.MDBX_FIRST + if start_key is None: + return DBIter(cursor, first_op, MDBXCursorOp.MDBX_NEXT) + else: + self.get_full(start_key, MDBXCursorOp.MDBX_SET_RANGE) + return DBIter(cursor, MDBXCursorOp.MDBX_GET_CURRENT, MDBXCursorOp.MDBX_NEXT) + + def iter_dupsort(self, + start_key: Optional[bytes] = None, + from_next: bool = False, + copy_cursor: bool = False + ) -> Iterator[Tuple[bytes, bytes]]: + its = self.iter_dupsort_rows(start_key, from_next, copy_cursor) + return itertools.chain.from_iterable(its) + + def iter_dupsort_rows(self, + start_key: Optional[bytes] = None, + from_next: bool = False, + copy_cursor: bool = False, + ) -> Iterator[Iterator[Tuple[bytes, bytes]]]: + if start_key is not None and from_next: + raise RuntimeError("start_key and from_next can not be used at the same time") + if copy_cursor: + cursor = self.dup() + else: + cursor = self + if from_next: + first_op = MDBXCursorOp.MDBX_NEXT + else: + first_op = MDBXCursorOp.MDBX_FIRST + if start_key is None: + return DBDupIter(cursor, first_op) + else: + self.get_full(start_key, MDBXCursorOp.MDBX_SET_RANGE) + return DBDupIter(cursor, MDBXCursorOp.MDBX_GET_CURRENT) + +class DBIter(object): + + def __init__(self, cur: Cursor, first_op: MDBXCursorOp, second_op: Optional[MDBXCursorOp]): + self.cur = cur # Strong reference! + self.first_op = first_op + self.second_op = second_op + + self.op = first_op + + def __iter__(self): + return self + + def __next__(self): + op = self.first_op + if self.second_op is not None: + self.first_op = self.second_op + out_key, out_data = self.cur.get_full(None, op) + if out_data is None: + raise StopIteration + else: + return (out_key, out_data) + + +class DBDupIter(object): + + def __init__(self, cur: Cursor, op: MDBXCursorOp): + self.cur = cur + self.op = op + + def __iter__(self): + return self + + def __next__(self): + op = self.op + self.op = MDBXCursorOp.MDBX_NEXT_NODUP + + k, v = self.cur.get_full(None, op) + if k is None or v is None: + raise StopIteration + + return DBIter(self.cur.dup(), MDBXCursorOp.MDBX_GET_CURRENT, MDBXCursorOp.MDBX_NEXT_DUP) + def get_build_info(): """ :returns mdbx_build struct embedded in lib @@ -2881,6 +3000,7 @@ def make_exception(errno: int): return MDBXErrorExc(errno, err) return OSError(errno, os.strerror(errno)) + _lib.mdbx_strerror_r.argtypes = [ ctypes.c_int, ctypes.c_char_p, ctypes.c_size_t ] _lib.mdbx_strerror_r.restype = ctypes.c_char_p _lib.mdbx_liberr2str.argtypes = [ ctypes.c_int ] diff --git a/tests/test_iters.py b/tests/test_iters.py new file mode 100644 index 0000000..edd86e4 --- /dev/null +++ b/tests/test_iters.py @@ -0,0 +1,80 @@ +import unittest +import tempfile +import shutil +from pathlib import Path +from mdbx import * +import struct + +class MDBXIterTest(unittest.TestCase): + + def setUp(self): + self._folder = tempfile.TemporaryDirectory() + self._folder_path = Path(self._folder.name) + return super().setUp() + + def test_iters(self): + expected = [] + for i in range(10): + if i == 1: + continue + expected.append((struct.pack(">I", i), struct.pack(">I", 10 - i))) + with Env(self._folder_path.absolute()) as env: + with env.rw_transaction() as txn: + with txn.open_map() as dbi: + for k, v in expected: + dbi.put(txn, k, v) + txn.commit() + + with env.ro_transaction() as txn: + with txn.cursor() as cur: + vals = [(k,v) for k, v in cur.iter()] + self.assertEqual(vals, expected) + + with txn.cursor() as cur: + vals = [(k, v) for k, v in cur.iter(start_key=struct.pack(">I", 4))] + self.assertEqual(vals, expected[3:]) + + def test_iters_dup(self): + expected = [] + for i in range(10): + if i == 1: + continue + dups = [struct.pack(">I", x) for x in range(5)] + expected.append((struct.pack(">I", i), tuple(dups))) + + with Env(self._folder_path.absolute(), maxdbs=2) as env: + with env.rw_transaction() as txn: + with txn.create_map("test", MDBXDBFlags.MDBX_DUPSORT) as dbi: + for k, dups in expected: + for d in dups: + dbi.put(txn, k, d) + txn.commit() + + with env.ro_transaction() as txn: + with txn.cursor("test") as cur: + vals = [] + for row in cur.iter_dupsort_rows(): + row_vals = {} + for k, v in row: + if k not in row_vals: + row_vals[k] = [] + row_vals[k].append(v) + self.assertEqual(len(row_vals), 1) + vals.append(( + list(row_vals.keys())[0], + tuple(list(row_vals.values())[0]) + )) + self.assertEqual(vals, expected) + + with txn.cursor("test") as cur: + vals = [(k, v) for k, v in cur.iter_dupsort()] + expected = [ (x, dup) for x, dups in expected for dup in dups] + self.assertEqual(vals, expected) + + def tearDown(self): + del self._folder + shutil.rmtree(self._folder_path, ignore_errors=True) + return super().tearDown() + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_mdbx.py b/tests/test_mdbx.py index e5e1302..9dee638 100644 --- a/tests/test_mdbx.py +++ b/tests/test_mdbx.py @@ -22,7 +22,10 @@ import time import tempfile import mdbx as libmdbx +import logging + +logging.getLogger("mdbx").setLevel("DEBUG") MDBX_TEST_DIR="%s/MDBX_TEST" % tempfile.gettempdir() MDBX_TEST_DB_NAME="MDBX_TEST_DB_NAME" MDBX_TEST_MAP_NAME="MDBX_TEST_MAP_NAME" @@ -92,16 +95,6 @@ def test_db_iter(self): self.assertEqual(dbi.get(txn, key.encode("utf-8")).decode("utf-8"), val) db.close() - def test_fail_close_empty_map(self): - MDBX_TEST_DB_DIR="%s/%s" % (MDBX_TEST_DIR, inspect.stack()[0][3]) - db=libmdbx.Env(MDBX_TEST_DB_DIR, maxdbs=1024) - txn=db.start_transaction() - opened_map=txn.create_map(MDBX_TEST_DB_NAME) - with self.assertRaises(libmdbx.MDBXErrorExc) as cm: - opened_map.close() - self.assertEqual(cm.exception.errno, libmdbx.MDBXError.MDBX_BAD_DBI.value) - db.close() - def test_success_close_written_map(self): MDBX_TEST_DB_DIR="%s/%s" % (MDBX_TEST_DIR, inspect.stack()[0][3]) db=libmdbx.Env(MDBX_TEST_DB_DIR, maxdbs=1024) @@ -376,8 +369,11 @@ def test_cursor_open(self): cursor.put(a, b) txn.commit() + logging.getLogger("mdbx").debug(f"Status, txn={txn._txn}, cursor={cursor._cursor}, dbi={dbi._dbi}") txn=env.start_transaction() + logging.getLogger("mdbx").debug(f"New dbi") dbi=txn.open_map(MDBX_TEST_DB_NAME) + logging.getLogger("mdbx").debug(f"New Cursor, dbi = {dbi._dbi}") cursor=libmdbx.Cursor(dbi, txn) self.assertEqual(MDBX_TEST_VAL_UTF8, dbi.get(txn, MDBX_TEST_KEY)) self.assertEqual(b, cursor.get(a, cursor_op=libmdbx.MDBXCursorOp.MDBX_SET))