Skip to content

Commit

Permalink
Merge pull request #1113 from phenobarbital/refactor-2.7-scylladb
Browse files Browse the repository at this point in the history
fixing rethinkdb library
  • Loading branch information
phenobarbital committed May 28, 2024
2 parents 14f803d + e11711d commit 1380b5a
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 23 deletions.
73 changes: 51 additions & 22 deletions asyncdb/drivers/rethink.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ async def sync(self, table: str):
async def createindex(
self,
table: str,
field: str,
name: str,
name: str = None,
field: str = None,
fields: list = None,
multi: bool = True
):
Expand All @@ -225,10 +225,10 @@ async def createindex(
await self.valid_operation(table)
if table in await self._engine.db(self._db).table_list().run(self._connection):
# check for a single index
if isinstance(fields, list) and len(fields) > 0:
if isinstance(fields, (list, tuple)) and len(fields) > 0:
idx = []
for f in fields:
idx.append(self._engine.row(f))
idx.append(self._engine.row[f])
try:
return await self._engine.table(table).index_create(name, idx).run(self._connection)
except (ReqlDriverError, ReqlRuntimeError) as ex:
Expand Down Expand Up @@ -354,7 +354,10 @@ async def query(
# table:
tbl = self._engine.db(self._db).table(table)
if not columns:
self._columns = await tbl.nth(0).default(None).keys().run(self._connection)
try:
self._columns = await tbl.nth(0).default(None).keys().run(self._connection)
except rethinkdb.errors.ReqlQueryLogicError:
self._columns = []
else:
self._columns = columns
tbl = tbl.with_fields(*columns)
Expand All @@ -378,9 +381,10 @@ async def query(
self._result = data
else:
raise NoDataFound(
message=f"RethinkDB: Empty Result on {table!s}",
code=404
message=f"RethinkDB: Empty Result on {table!s}"
)
except NoDataFound:
raise
except ReqlResourceLimitError as err:
error = f"Query Limit Error: {err!s}"
except ReqlOpIndeterminateError as err:
Expand Down Expand Up @@ -409,7 +413,10 @@ async def fetch_all(self, table: str, **kwargs): # pylint: disable=W0221,W0237
try:
self.start_timing()
_filter = kwargs.get("filter", kwargs)
self._columns = await self._engine.table(table).nth(0).default(None).keys().run(self._connection)
try:
self._columns = await self._engine.table(table).nth(0).default(None).keys().run(self._connection)
except rethinkdb.errors.ReqlQueryLogicError:
self._columns = []
if not _filter:
cursor = await self._engine.db(self._db).table(table).run(self._connection)
else:
Expand All @@ -421,9 +428,10 @@ async def fetch_all(self, table: str, **kwargs): # pylint: disable=W0221,W0237
return data
else:
raise NoDataFound(
message=f"RethinkDB: Empty Result on {table!s}",
code=404
message=f"RethinkDB: Empty Result on {table!s}"
)
except NoDataFound:
raise
except ReqlResourceLimitError as err:
raise StatementError(f"Query Limit Error: {err!s}") from err
except ReqlOpIndeterminateError as err:
Expand Down Expand Up @@ -459,7 +467,10 @@ async def queryrow(
self.start_timing()
_filter = kwargs.get("filter", kwargs)
if not columns:
self._columns = await self._engine.table(table).nth(0).default(None).keys().run(self._connection)
try:
self._columns = await self._engine.table(table).nth(0).default(None).keys().run(self._connection)
except rethinkdb.errors.ReqlQueryLogicError:
self._columns = []
else:
self._columns = columns
# table:
Expand All @@ -475,9 +486,10 @@ async def queryrow(
self._result = data
else:
raise NoDataFound(
message=f"RethinkDB: Empty Row Result on {table!s}",
code=404
message=f"RethinkDB: Empty Row Result on {table!s}"
)
except NoDataFound:
raise
except ReqlResourceLimitError as err:
error = f"Query Limit Error: {err!s}"
except ReqlOpIndeterminateError as err:
Expand Down Expand Up @@ -523,21 +535,28 @@ async def fetch_one(
code=404
)
return data
except NoDataFound:
raise
except ReqlNonExistenceError as err:
raise NoDataFound(
f"Object doesn't exist {table}: {err!s}"
)
except ReqlResourceLimitError as err:
raise StatementError(f"Query Limit Error: {err!s}") from err
except ReqlOpIndeterminateError as err:
raise StatementError(f"Operation indeterminated: {err!s}") from err
except ReqlNonExistenceError as err:
raise DriverError(f"Object doesn't exist {table}: {err!s}") from err
except rethinkdb.errors.ReqlPermissionError as err:
raise DataError(f"Permission error over {table}: {err}") from err
except ReqlRuntimeError as err:
raise DriverError(f"Runtime Error: {err}") from err
except Exception as err: # pylint: disable=W0703
raise DriverError(f"Unknown RT error: {err}") from err


fetch_row = fetch_one

### New Methods
async def get(self, table: str, idx: int = 0):
async def get(self, table: str, idx: int = 0, **kwargs):
"""
get
get only one row based on primary key or filtering,
Expand All @@ -546,15 +565,20 @@ async def get(self, table: str, idx: int = 0):
"""
error = None
await self.valid_operation(table)
_filter = kwargs.get("filter", kwargs)
try:
data = await self._engine.table(table).get(idx).run(self._connection)
if _filter:
data = await self._engine.table(table).get_all(_filter).get(idx).run(self._connection)
else:
data = await self._engine.table(table).get(idx).run(self._connection)
if data:
self._result = data
else:
raise NoDataFound(
message=f"RethinkDB: Empty Row Result on {table!s}",
code=404
message=f"RethinkDB: Empty Row Result on {table!s}"
)
except NoDataFound:
raise
except ReqlResourceLimitError as err:
error = f"Query Limit Error: {err!s}"
except ReqlOpIndeterminateError as err:
Expand Down Expand Up @@ -594,9 +618,10 @@ async def get_all(self, table: str, index: str = None, **kwargs):
self._result = data
else:
raise NoDataFound(
message=f"RethinkDB: Empty Row Result on {table!s}",
code=404
message=f"RethinkDB: Empty Row Result on {table!s}"
)
except NoDataFound:
raise
except ReqlResourceLimitError as err:
error = f"Query Limit Error: {err!s}"
except ReqlOpIndeterminateError as err:
Expand Down Expand Up @@ -626,7 +651,9 @@ async def match(self, table: str, field: str = "id", regexp="(?i)^[a-z]+$"):
if data:
self._result = data
else:
raise NoDataFound(message=f"RethinkDB: Empty Row Result on {table!s}", code=404)
raise NoDataFound(message=f"RethinkDB: Empty Row Result on {table!s}")
except NoDataFound:
raise
except ReqlResourceLimitError as err:
error = f"Query Limit Error: {err!s}"
except ReqlOpIndeterminateError as err:
Expand Down Expand Up @@ -882,6 +909,8 @@ async def between(self, table: str, min: int = None, max: int = None, idx: str =
self._result = data
else:
raise NoDataFound(message=f"RethinkDB: Empty Row Result on {table!s}")
except NoDataFound:
raise
except ReqlResourceLimitError as err:
error = f"Query Limit Error: {err!s}"
except ReqlOpIndeterminateError as err:
Expand Down
2 changes: 1 addition & 1 deletion asyncdb/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
__title__ = "asyncdb"
__description__ = "Library for Asynchronous data source connections \
Collection of asyncio drivers."
__version__ = "2.7.7"
__version__ = "2.7.8"
__author__ = "Jesus Lara"
__author_email__ = "jesuslarag@gmail.com"
__license__ = "BSD"
18 changes: 18 additions & 0 deletions examples/test_rethink.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,29 @@ async def test_connect(event_loop):
await conn.drop_database('testing')


async def create_rethink_table(event_loop):
rt = AsyncDB('rethink', params=params, loop=event_loop)
print(rt)
async with await rt.connection() as conn: #pylint: disable=E1101
await conn.create_database('navigator', use=True)
await conn.create_table('chatbots_usage')
exists = await conn.list_tables()
print('Exists? ', 'chatbots_usage' in exists)
# chatbot loaders:
await conn.create_table('chatbots_data')
created = await conn.create_index(
table='chatbots_data', field=None, name='data_version_control', fields=['chatbot_id', 'source_type', 'version']
)
print('CREATED > ', created)
exists = await conn.list_tables()
print('Exists? ', 'chatbots_data' in exists)

if __name__ == '__main__':
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(True)
loop.run_until_complete(test_connect(loop))
loop.run_until_complete(create_rethink_table(loop))
finally:
loop.stop()
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ def readme():
"psycopg-binary>=3.1.8",
"cassandra-driver==3.29.1",
"scylla_driver==3.26.8",
"acsylla==0.1.8b0",
"cqlsh==6.1.2",
"influxdb==5.3.1",
"influxdb-client==1.39.0",
"aioodbc==0.5.0",
Expand Down

0 comments on commit 1380b5a

Please sign in to comment.