diff --git a/tests/integration/test_materialized_mysql_database/materialized_with_ddl.py b/tests/integration/test_materialized_mysql_database/materialized_with_ddl.py index 8b2943c2b733..389d430622d2 100644 --- a/tests/integration/test_materialized_mysql_database/materialized_with_ddl.py +++ b/tests/integration/test_materialized_mysql_database/materialized_with_ddl.py @@ -13,25 +13,36 @@ from helpers.test_tools import assert_eq_with_retry -def check_query(clickhouse_node, query, result_set, retry_count=10, interval_seconds=3): - lastest_result = "" +def check_query( + clickhouse_node, + query, + result_set, + retry_count=30, + interval_seconds=1, + on_failure=None, +): + latest_result = "" + if "/* expect: " not in query: + query = "/* expect: " + result_set.rstrip("\n") + "*/ " + query for i in range(retry_count): try: - lastest_result = clickhouse_node.query(query) - if result_set == lastest_result: + latest_result = clickhouse_node.query(query) + if result_set == latest_result: return - logging.debug(f"latest_result {lastest_result}") + logging.debug(f"latest_result {latest_result}") time.sleep(interval_seconds) except Exception as e: logging.debug(f"check_query retry {i+1} exception {e}") time.sleep(interval_seconds) else: - result_got = clickhouse_node.query(query) + latest_result = clickhouse_node.query(query) + if on_failure is not None and latest_result != result_set: + on_failure(latest_result, result_set) assert ( - result_got == result_set - ), f"Got result {result_got}, while expected result {result_set}" + latest_result == result_set + ), f"Got result '{latest_result}', expected result '{result_set}'" def dml_with_materialized_mysql_database(clickhouse_node, mysql_node, service_name): diff --git a/tests/integration/test_materialized_mysql_database/test.py b/tests/integration/test_materialized_mysql_database/test.py index c21e04af8dbf..1fd09f733f0b 100644 --- a/tests/integration/test_materialized_mysql_database/test.py +++ b/tests/integration/test_materialized_mysql_database/test.py @@ -52,6 +52,7 @@ def started_cluster(): cluster.start() yield cluster finally: + node_db.stop_clickhouse() # ensures that coverage report is written to disk, even if cluster.shutdown() times out. cluster.shutdown() @@ -86,7 +87,7 @@ def alloc_connection(self): else: self.mysql_connection.ping(reconnect=True) logging.debug( - "MySQL Connection establised: {}:{}".format( + "MySQL Connection established: {}:{}".format( self.ip_address, self.port ) ) @@ -94,7 +95,7 @@ def alloc_connection(self): except Exception as e: errors += [str(e)] time.sleep(1) - raise Exception("Connection not establised, {}".format(errors)) + raise Exception("Connection not established, {}".format(errors)) def query(self, execution_query): with self.alloc_connection().cursor() as cursor: @@ -118,9 +119,9 @@ def result(self, execution_query): if result is not None: print(cursor.fetchall()) - def query_and_get_data(self, executio_query): + def query_and_get_data(self, execution_query): with self.alloc_connection().cursor() as cursor: - cursor.execute(executio_query) + cursor.execute(execution_query) return cursor.fetchall() def close(self):