From d26e6ce6dab256f8c1584ae211ebfe6a26011db9 Mon Sep 17 00:00:00 2001 From: Holger Stitz Date: Fri, 18 Aug 2023 15:00:07 +0200 Subject: [PATCH] feat: add tracer to db.py and dbview.py --- tdp_core/db.py | 449 ++++++++++++++++++++++++--------------------- tdp_core/dbview.py | 28 +-- 2 files changed, 255 insertions(+), 222 deletions(-) diff --git a/tdp_core/db.py b/tdp_core/db.py index fe9674e03..9b666ba29 100644 --- a/tdp_core/db.py +++ b/tdp_core/db.py @@ -10,6 +10,9 @@ from .sql_filter import filter_logic from .utils import clean_query, secure_replacements +from opentelemetry import trace + +tracer = trace.get_tracer(__name__) _log = logging.getLogger(__name__) @@ -107,18 +110,18 @@ def __init__(self, engine): session wrapper of sql alchemy with auto cleanup :param engine: """ - self._engine = engine - import uuid - - self._name = uuid.uuid4() - _log.debug("%s - engine status before: %s", self._name, engine.pool.status()) - _log.debug("%s - creating session", self._name) - # add connection count and session count with SQLALCHEMY_POOL_SIZE and SQLALCHEMY_MAX_OVERFLOW - # https://stackoverflow.com/questions/34775501/how-could-i-check-the-number-of-active-sqlalchemy-connections-in-a-pool-at-any-g - self._session: Session = manager.db.create_session(engine) - _log.debug("%s - session created", self._name) - self._supports_array_parameter = _supports_sql_parameters(engine.name) - _log.debug("%s - supports array parameter: %s", self._name, self._supports_array_parameter) + with tracer.start_as_current_span("WrappedSession.__init__", attributes={"db.pool_status": engine.pool.status()}): + self._engine = engine + import uuid + self._name = uuid.uuid4() + _log.debug("%s - engine status before: %s", self._name, engine.pool.status()) + _log.debug("%s - creating session", self._name) + # add connection count and session count with SQLALCHEMY_POOL_SIZE and SQLALCHEMY_MAX_OVERFLOW + # https://stackoverflow.com/questions/34775501/how-could-i-check-the-number-of-active-sqlalchemy-connections-in-a-pool-at-any-g + self._session: Session = manager.db.create_session(engine) + _log.debug("%s - session created", self._name) + self._supports_array_parameter = _supports_sql_parameters(engine.name) + _log.debug("%s - supports array parameter: %s", self._name, self._supports_array_parameter) def execute(self, sql, **kwargs): """ @@ -127,17 +130,18 @@ def execute(self, sql, **kwargs): :param kwargs: additional args to replace :return: the session result """ - _log.debug("%s - replace array parameter in sql query: %s", self._name, sql) - parsed = to_query(sql, self._supports_array_parameter, kwargs) - _log.debug("%s - execute the given query with the given args: %s", self._name, sql) - _log.debug("%s (%s)", parsed, kwargs) - try: - return self._session.execute(parsed, kwargs) - except OperationalError as error: - _log.error("OperationalError: %s", error) - abort(408, error) - except SQLAlchemyError as error: - _log.error("SQLAlchemyError: %s", error) + with tracer.start_as_current_span("WrappedSession.execute", attributes={"db.pool_status": self._engine.pool.status()}): + _log.debug("%s - replace array parameter in sql query: %s", self._name, sql) + parsed = to_query(sql, self._supports_array_parameter, kwargs) + _log.debug("%s - execute the given query with the given args: %s", self._name, sql) + _log.debug("%s (%s)", parsed, kwargs) + try: + return self._session.execute(parsed, kwargs) + except OperationalError as error: + _log.error("OperationalError: %s", error) + abort(408, error) + except SQLAlchemyError as error: + _log.error("SQLAlchemyError: %s", error) def run(self, sql, **kwargs): """ @@ -146,40 +150,49 @@ def run(self, sql, **kwargs): :param kwargs: args for this query :return: list of dicts """ - _log.debug("%s - run sql statement: %s", self._name, sql) - result = self.execute(sql, **kwargs) - _log.debug("%s - ran sql statement: %s", self._name, sql) - columns = result.keys() # type: ignore - return [{c: r[c] for c in columns} for r in result] # type: ignore + with tracer.start_as_current_span("WrappedSession.run"): + _log.debug("%s - run sql statement: %s", self._name, sql) + result = self.execute(sql, **kwargs) + _log.debug("%s - ran sql statement: %s", self._name, sql) + columns = result.keys() # type: ignore + return [{c: r[c] for c in columns} for r in result] # type: ignore def __call__(self, sql, **kwargs): - return self.run(sql, **kwargs) + with tracer.start_as_current_span("WrappedSession.__call__"): + return self.run(sql, **kwargs) def __enter__(self): - return self + with tracer.start_as_current_span("WrappedSession.__enter__"): + return self def commit(self): - self._session.commit() + with tracer.start_as_current_span("WrappedSession.commit"): + self._session.commit() def flush(self): - self._session.flush() + with tracer.start_as_current_span("WrappedSession.flush"): + self._session.flush() def rollback(self): - self._session.rollback() + with tracer.start_as_current_span("WrappedSession.rollback"): + self._session.rollback() def _destroy(self): - if self._session: - _log.debug("%s - removing session", self._name) - self._session.close() - self._session = None # type: ignore - _log.debug("%s - removed session", self._name) - _log.debug("%s - engine status after destroy: %s", self._name, self._engine.pool.status()) + with tracer.start_as_current_span("WrappedSession._destroy", attributes={"db.pool_status": self._engine.pool.status()}): + if self._session: + _log.debug("%s - removing session", self._name) + self._session.close() + self._session = None # type: ignore + _log.debug("%s - removed session", self._name) + _log.debug("%s - engine status after destroy: %s", self._name, self._engine.pool.status()) def __del__(self): - self._destroy() + with tracer.start_as_current_span("WrappedSession.__del__"): + self._destroy() def __exit__(self, exc_type, exc_val, exc_tb): - self._destroy() + with tracer.start_as_current_span("WrappedSession.__exit__"): + self._destroy() def session(engine): @@ -353,84 +366,90 @@ def get_data( :param filters: the dict of dynamically build filter :return: (r, view) tuple of the resulting rows and the resolved view """ - config, engine, view = resolve_view(database, view_name) + with tracer.start_as_current_span("db.get_data"): + config, engine, view = resolve_view(database, view_name) - kwargs, replace = prepare_arguments(view, config, replacements, arguments, extra_sql_argument) + kwargs, replace = prepare_arguments(view, config, replacements, arguments, extra_sql_argument) - query = view.query + query = view.query - if callable(query): - _log.debug("GET DATA with callback variant") - # callback variant - return query(engine, arguments, filters), view + if callable(query): + with tracer.start_as_current_span("db.get_data with callback"): + _log.debug("GET DATA with callback variant") + # callback variant + return query(engine, arguments, filters), view - with session(engine) as sess: - _log.debug("%s - GET DATA with session", sess._name) - if config.statement_timeout and config.statement_timeout_query: - _log.debug("set statement_timeout to {}".format(config.statement_timeout)) - sess.execute(config.statement_timeout_query.format(config.statement_timeout)) - _log.debug("%s - GET DATA before run", sess._name) - r = sess.run(query.format(**replace), **kwargs) - _log.debug("%s - GET DATA after run", sess._name) - return r, view + with session(engine) as sess: + _log.debug("%s - GET DATA with session", sess._name) + if config.statement_timeout and config.statement_timeout_query: + _log.debug("set statement_timeout to {}".format(config.statement_timeout)) + sess.execute(config.statement_timeout_query.format(config.statement_timeout)) + _log.debug("%s - GET DATA before run", sess._name) + r = sess.run(query.format(**replace), **kwargs) + _log.debug("%s - GET DATA after run", sess._name) + return r, view def get_query(database, view_name, replacements=None, arguments=None, extra_sql_argument=None): - config, engine, view = resolve_view(database, view_name) + with tracer.start_as_current_span("db.get_query"): + config, engine, view = resolve_view(database, view_name) - kwargs, replace = prepare_arguments(view, config, replacements, arguments, extra_sql_argument) + kwargs, replace = prepare_arguments(view, config, replacements, arguments, extra_sql_argument) - query = view.query + query = view.query - if callable(query): - return {"query": "custom function", "args": kwargs} + if callable(query): + return {"query": "custom function", "args": kwargs} - return {"query": clean_query(query.format(**replace)), "args": kwargs} + return {"query": clean_query(query.format(**replace)), "args": kwargs} def get_filtered_data(database, view_name, args): - config, _, view = resolve_view(database, view_name) - # convert to index lookup - # row id start with 1 - try: - replacements, processed_args, extra_args, where_clause = filter_logic(view, args) - except RuntimeError as error: - abort(400, error) + with tracer.start_as_current_span("db.get_filtered_data"): + config, _, view = resolve_view(database, view_name) + # convert to index lookup + # row id start with 1 + try: + replacements, processed_args, extra_args, where_clause = filter_logic(view, args) + except RuntimeError as error: + abort(400, error) - return get_data(database, view_name, replacements, processed_args, extra_args, where_clause) + return get_data(database, view_name, replacements, processed_args, extra_args, where_clause) def get_filtered_query(database, view_name, args): - config, _, view = resolve_view(database, view_name) - # convert to index lookup - # row id start with 1 - try: - replacements, processed_args, extra_args, where_clause = filter_logic(view, args) - except RuntimeError as error: - abort(400, error) + with tracer.start_as_current_span("db.get_filtered_query"): + config, _, view = resolve_view(database, view_name) + # convert to index lookup + # row id start with 1 + try: + replacements, processed_args, extra_args, where_clause = filter_logic(view, args) + except RuntimeError as error: + abort(400, error) - return get_query(database, view_name, replacements, processed_args, extra_args) + return get_query(database, view_name, replacements, processed_args, extra_args) def _get_count(database, view_name, args): - config, engine, view = resolve_view(database, view_name) + with tracer.start_as_current_span("db._get_count"): + config, engine, view = resolve_view(database, view_name) - try: - replacements, processed_args, extra_args, where_clause = filter_logic(view, args) - except RuntimeError as error: - abort(400, error) + try: + replacements, processed_args, extra_args, where_clause = filter_logic(view, args) + except RuntimeError as error: + abort(400, error) - kwargs, replace = prepare_arguments(view, config, replacements, processed_args, extra_args) + kwargs, replace = prepare_arguments(view, config, replacements, processed_args, extra_args) - if "count" in view.queries: - count_query = view.queries["count"] - elif view.table: - count_query = "SELECT count(d.*) as count FROM {table} d {{joins}} {{where}}".format(table=view.table) - else: - count_query = None - abort(500, "invalid view configuration, missing count query and cannot derive it") + if "count" in view.queries: + count_query = view.queries["count"] + elif view.table: + count_query = "SELECT count(d.*) as count FROM {table} d {{joins}} {{where}}".format(table=view.table) + else: + count_query = None + abort(500, "invalid view configuration, missing count query and cannot derive it") - return config, engine, count_query, processed_args, where_clause, replace, kwargs + return config, engine, count_query, processed_args, where_clause, replace, kwargs def get_count(database, view_name, args): @@ -440,154 +459,162 @@ def get_count(database, view_name, args): :param view_name: view name :return: the count of results """ + with tracer.start_as_current_span("db.get_count"): + ( + config, + engine, + count_query, + processed_args, + where_clause, + replace, + kwargs, + ) = _get_count(database, view_name, args) + + if callable(count_query): + with tracer.start_as_current_span("db.get_count with callback"): + # callback variant + return count_query(engine, processed_args, where_clause) - ( - config, - engine, - count_query, - processed_args, - where_clause, - replace, - kwargs, - ) = _get_count(database, view_name, args) - - if callable(count_query): - # callback variant - return count_query(engine, processed_args, where_clause) - - with session(engine) as sess: - _log.debug("%s - GET COUNT with session", sess._name) - if config.statement_timeout and config.statement_timeout_query: - _log.debug("set statement_timeout to {}".format(config.statement_timeout)) - sess.execute(config.statement_timeout_query.format(config.statement_timeout)) - _log.debug("%s - GET COUNT before run", sess._name) - r = sess.run(count_query.format(**replace), **kwargs) - _log.debug("%s - GET COUNT after run", sess._name) - if r: - return r[0]["count"] - return 0 + with session(engine) as sess: + _log.debug("%s - GET COUNT with session", sess._name) + if config.statement_timeout and config.statement_timeout_query: + _log.debug("set statement_timeout to {}".format(config.statement_timeout)) + sess.execute(config.statement_timeout_query.format(config.statement_timeout)) + _log.debug("%s - GET COUNT before run", sess._name) + r = sess.run(count_query.format(**replace), **kwargs) + _log.debug("%s - GET COUNT after run", sess._name) + if r: + return r[0]["count"] + return 0 def get_count_query(database, view_name, args): - ( - config, - engine, - count_query, - processed_args, - where_clause, - replace, - kwargs, - ) = _get_count(database, view_name, args) + with tracer.start_as_current_span("db.get_count_query"): + ( + config, + engine, + count_query, + processed_args, + where_clause, + replace, + kwargs, + ) = _get_count(database, view_name, args) - if callable(count_query): - return {"query": "custom function", "args": kwargs} + if callable(count_query): + return {"query": "custom function", "args": kwargs} - return {"query": count_query.format(**replace), "args": kwargs} + return {"query": count_query.format(**replace), "args": kwargs} def derive_columns(table_name, engine, columns=None): """ helper function to derive the columns of a table """ - columns = columns or {} - - for col in get_columns(engine, table_name): - name = col["column"] - if name in columns: - # merge - old = columns[name] - for k, v in col.items(): - if k not in old: - old[k] = v - else: - columns[name] = col - - # derive the missing domains and categories - number_columns = [k for k, col in columns.items() if col["type"] == "number" and ("min" not in col or "max" not in col)] - categorical_columns = [ - k for k, col in columns.items() if (col["type"] == "categorical" or col["type"] == "set") and "categories" not in col - ] - if number_columns or categorical_columns: - with session(engine) as sess: - _log.debug("%s - DERIVE COLUMNS with session", sess._name) - if number_columns: - template = "min({col}) as {col}_min, max({col}) as {col}_max" - minmax = ", ".join(template.format(col=col) for col in number_columns) - _log.debug("%s - DERIVE COLUMNS number columns before run", sess._name) - row = next(iter(sess.execute("""SELECT {minmax} FROM {table}""".format(table=table_name, minmax=minmax)))) # type: ignore - _log.debug("%s - DERIVE COLUMNS number columns after run", sess._name) - for num_col in number_columns: - columns[num_col]["min"] = row[num_col + "_min"] - columns[num_col]["max"] = row[num_col + "_max"] - for col in categorical_columns: - template = """SELECT distinct {col} as cat FROM {table} WHERE {col} is not NULL""" - if _differentiates_empty_string_and_null(engine.name): - template += """ AND {col} <> ''""" - template += """ ORDER BY {col} ASC""" - _log.debug("%s - DERIVE COLUMNS categorical columns before run: %s, %s", sess._name, table_name, col) - cats = sess.execute(template.format(col=col, table=table_name)) - _log.debug("%s - DERIVE COLUMNS categorical columns after run: %s, %s", sess._name, table_name, col) - categories = [str(r["cat"]) for r in cats if r["cat"] is not None] # type: ignore - if columns[col]["type"] == "set": - separator = getattr(columns[col], "separator", ";") - separated_categories = [category.split(separator) for category in categories] - # flatten array - categories = list({category for sublist in separated_categories for category in sublist}) - categories.sort() # sort list to avoid random order with each run - columns[col]["categories"] = categories - _log.debug("%s - DERIVE COLUMNS done", sess._name) - - return columns + with tracer.start_as_current_span("db.derive_columns"): + columns = columns or {} + + for col in get_columns(engine, table_name): + name = col["column"] + if name in columns: + # merge + old = columns[name] + for k, v in col.items(): + if k not in old: + old[k] = v + else: + columns[name] = col + + # derive the missing domains and categories + number_columns = [k for k, col in columns.items() if col["type"] == "number" and ("min" not in col or "max" not in col)] + categorical_columns = [ + k for k, col in columns.items() if (col["type"] == "categorical" or col["type"] == "set") and "categories" not in col + ] + if number_columns or categorical_columns: + with tracer.start_as_current_span("db.derive_column", attributes={"db.table_name": table_name, "db.number_columns": number_columns, "db.categorical_columns": categorical_columns}): + with session(engine) as sess: + _log.debug("%s - DERIVE COLUMNS with session", sess._name) + if number_columns: + template = "min({col}) as {col}_min, max({col}) as {col}_max" + minmax = ", ".join(template.format(col=col) for col in number_columns) + _log.debug("%s - DERIVE COLUMNS number columns before run", sess._name) + row = next(iter(sess.execute("""SELECT {minmax} FROM {table}""".format(table=table_name, minmax=minmax)))) # type: ignore + _log.debug("%s - DERIVE COLUMNS number columns after run", sess._name) + for num_col in number_columns: + columns[num_col]["min"] = row[num_col + "_min"] + columns[num_col]["max"] = row[num_col + "_max"] + for col in categorical_columns: + template = """SELECT distinct {col} as cat FROM {table} WHERE {col} is not NULL""" + if _differentiates_empty_string_and_null(engine.name): + template += """ AND {col} <> ''""" + template += """ ORDER BY {col} ASC""" + _log.debug("%s - DERIVE COLUMNS categorical columns before run: %s, %s", sess._name, table_name, col) + cats = sess.execute(template.format(col=col, table=table_name)) + _log.debug("%s - DERIVE COLUMNS categorical columns after run: %s, %s", sess._name, table_name, col) + categories = [str(r["cat"]) for r in cats if r["cat"] is not None] # type: ignore + if columns[col]["type"] == "set": + separator = getattr(columns[col], "separator", ";") + separated_categories = [category.split(separator) for category in categories] + # flatten array + categories = list({category for sublist in separated_categories for category in sublist}) + categories.sort() # sort list to avoid random order with each run + columns[col]["categories"] = categories + _log.debug("%s - DERIVE COLUMNS done", sess._name) + + return columns def _fill_up_columns(view, engine): - _log.debug("fill up view %s", view) - # update the real object - view.columns = derive_columns(view.table, engine, view.columns) - view.columns_filled_up = True + with tracer.start_as_current_span("db._fill_up_columns"): + _log.debug("fill up view %s", view) + # update the real object + view.columns = derive_columns(view.table, engine, view.columns) + view.columns_filled_up = True def _lookup(database, view_name, query, page, limit, args): - config, engine, view = resolve_view(database, view_name) + with tracer.start_as_current_span("db._lookup"): + config, engine, view = resolve_view(database, view_name) - arguments = MultiDict(args) - offset = page * limit - # replace with wildcard version - arguments["query"] = "%{}%".format(query) - arguments["query_end"] = "%{}".format(query) - arguments["query_start"] = "{}%".format(query) - arguments["query_match"] = "{}".format(query) - # add 1 for checking if we have more - replacements = {"limit": limit + 1, "offset": offset, "offset2": (offset + limit + 1)} + arguments = MultiDict(args) + offset = page * limit + # replace with wildcard version + arguments["query"] = "%{}%".format(query) + arguments["query_end"] = "%{}".format(query) + arguments["query_start"] = "{}%".format(query) + arguments["query_match"] = "{}".format(query) + # add 1 for checking if we have more + replacements = {"limit": limit + 1, "offset": offset, "offset2": (offset + limit + 1)} - kwargs, replace = prepare_arguments(view, config, replacements, arguments) + kwargs, replace = prepare_arguments(view, config, replacements, arguments) - return engine, view, view.query, replace, kwargs + return engine, view, view.query, replace, kwargs def lookup_query(database, view_name, query, page, limit, args): - engine, _, sql, replace, kwargs = _lookup(database, view_name, query, page, limit, args) + with tracer.start_as_current_span("db.lookup_query"): + engine, _, sql, replace, kwargs = _lookup(database, view_name, query, page, limit, args) - if callable(sql): - return {"query": "custom function", "args": kwargs} + if callable(sql): + return {"query": "custom function", "args": kwargs} - return {"query": sql.format(**replace), "args": kwargs} + return {"query": sql.format(**replace), "args": kwargs} def lookup(database, view_name, query, page, limit, args): - engine, view, sql, replace, kwargs = _lookup(database, view_name, query, page, limit, args) + with tracer.start_as_current_span("db.lookup"): + engine, view, sql, replace, kwargs = _lookup(database, view_name, query, page, limit, args) - if callable(sql): - kwargs.update(replace) - # callback variant - return sql(engine, kwargs, None) + if callable(sql): + kwargs.update(replace) + # callback variant + return sql(engine, kwargs, None) - with session(engine) as sess: - r_items = sess.run(sql.format(**replace), **kwargs) + with session(engine) as sess: + r_items = sess.run(sql.format(**replace), **kwargs) - more = len(r_items) > limit - if more: - # hit the boundary of more remove the artificial one - del r_items[-1] + more = len(r_items) > limit + if more: + # hit the boundary of more remove the artificial one + del r_items[-1] - return r_items, more, view + return r_items, more, view diff --git a/tdp_core/dbview.py b/tdp_core/dbview.py index 7df2e3b32..298156e98 100644 --- a/tdp_core/dbview.py +++ b/tdp_core/dbview.py @@ -10,6 +10,9 @@ from .utils import clean_query +from opentelemetry import trace + +tracer = trace.get_tracer(__name__) _log = logging.getLogger(__name__) REGEX_TYPE = type(re.compile("")) @@ -607,14 +610,15 @@ def __init__(self, views=None, agg_score=None, mappings=None): :param agg_score: optional specify how aggregation should be handled :param mappings: optional database mappings """ - _log.debug("create db connector") - self.agg_score = agg_score or default_agg_score - self.views = views or {} - self.dburl: str = None # type: ignore - self.mappings = mappings - self.statement_timeout = None - self.statement_timeout_query: str | None = None - self.description = "" + with tracer.start_as_current_span("DBConnector.init"): + _log.debug("create db connector") + self.agg_score = agg_score or default_agg_score + self.views = views or {} + self.dburl: str = None # type: ignore + self.mappings = mappings + self.statement_timeout = None + self.statement_timeout_query: str | None = None + self.description = "" def dump(self, name): return OrderedDict(name=name, description=self.description) @@ -627,8 +631,10 @@ def create_engine(self, config) -> Engine: } engine_options.update(config.get("engine", {})) _log.debug("db connector: create engine with options %s", engine_options) - return sqlalchemy.create_engine(self.dburl, **engine_options) + with tracer.start_as_current_span("DBConnector.create_engine"): + return sqlalchemy.create_engine(self.dburl, **engine_options) def create_sessionmaker(self, engine) -> sessionmaker: - _log.debug("db connector: create_sessionmaker") - return sessionmaker(bind=engine) + with tracer.start_as_current_span("DBConnector.create_sessionmaker"): + _log.debug("db connector: create_sessionmaker") + return sessionmaker(bind=engine)