Skip to content

Commit

Permalink
Fix error file not found. tmp file is deleted before inserting rows t…
Browse files Browse the repository at this point in the history
…o DB in VerticaToMySQLOperator bulk (#44028)

* Fix error file not found. tmp file is deleted before inserting rows to DB in VerticaToMySQLOperator bulk .

* fix the mock_get_conn function , so it will really mock the data and will return empty results

* fix ruff-format

---------

Co-authored-by: Amir.Ba <amir.bareket@solaredge.com>
  • Loading branch information
bareketamir and Amir.Ba authored Nov 19, 2024
1 parent 3b941ef commit 8c91fce
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 25 deletions.
29 changes: 14 additions & 15 deletions providers/src/airflow/providers/mysql/transfers/vertica_to_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,20 @@ def _bulk_load_transfer(self, mysql, vertica):
count += 1

tmpfile.flush()
self._run_preoperator(mysql)
try:
self.log.info("Bulk inserting rows into MySQL...")
with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as cursor:
cursor.execute(
f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
f"INTO TABLE {self.mysql_table} "
f"LINES TERMINATED BY '\r\n' ({', '.join(selected_columns)})"
)
conn.commit()
tmpfile.close()
self.log.info("Inserted rows into MySQL %s", count)
except (MySQLdb.Error, MySQLdb.Warning):
self.log.info("Inserted rows into MySQL 0")
raise
self._run_preoperator(mysql)
try:
self.log.info("Bulk inserting rows into MySQL...")
with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as cursor:
cursor.execute(
f"LOAD DATA LOCAL INFILE '{tmpfile.name}' "
f"INTO TABLE {self.mysql_table} "
f"LINES TERMINATED BY '\r\n' ({', '.join(selected_columns)})"
)
conn.commit()
self.log.info("Inserted rows into MySQL %s", count)
except (MySQLdb.Error, MySQLdb.Warning):
self.log.info("Inserted rows into MySQL 0")
raise

def _run_preoperator(self, mysql):
if self.mysql_preoperator:
Expand Down
26 changes: 16 additions & 10 deletions providers/tests/mysql/transfers/test_vertica_to_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,23 @@


def mock_get_conn():
class MockCol:
def __init__(self, name):
self.name = name

col_a = MockCol(name="a")
col_b = MockCol(name="b")
col_c = MockCol(name="c")

commit_mock = mock.MagicMock()
cursor_mock = mock.MagicMock(
execute=[],
fetchall=[["1", "2", "3"]],
description=["a", "b", "c"],
iterate=[["1", "2", "3"]],
)
conn_mock = mock.MagicMock(
commit=commit_mock,
cursor=cursor_mock,
)
cursor_mock = mock.MagicMock(description=[col_a, col_b, col_c])
cursor_mock.execute.return_value = []
cursor_mock.fetchall.return_value = [["1", "2", "3"]]
cursor_mock.iterate.return_value = [["1", "2", "3"]]
conn_mock = mock.MagicMock()
conn_mock.commit.return_value = commit_mock
conn_mock.cursor.return_value = cursor_mock

return conn_mock


Expand Down

0 comments on commit 8c91fce

Please sign in to comment.