Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
topherinternational committed Jan 3, 2024
1 parent f8e7fc9 commit aabb034
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 12 deletions.
8 changes: 7 additions & 1 deletion src/pylint_airflow/checkers/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Set, Dict, Tuple

import astroid
from astroid import AttributeInferenceError
from pylint import checkers
from pylint.checkers import utils

Expand Down Expand Up @@ -77,7 +78,12 @@ def get_xcoms_from_tasks(
if callable_func_name == "<lambda>": # TODO support lambdas
continue

callable_func = node.getattr(callable_func_name)[0]
try:
module_attribute = node.getattr(callable_func_name)
except AttributeInferenceError:
continue
else:
callable_func = module_attribute[0]

if not isinstance(callable_func, astroid.FunctionDef):
continue # Callable_func is str not FunctionDef when imported
Expand Down
84 changes: 73 additions & 11 deletions tests/pylint_airflow/checkers/test_xcom.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# pylint: disable=missing-function-docstring
# pylint: disable=missing-function-docstring,use-implicit-booleaness-not-comparison
"""Tests for the XCom checker and its helper functions."""
from unittest.mock import Mock

Expand Down Expand Up @@ -65,21 +65,53 @@ def test_should_detect_builtin_callable(self):
test_code = """
from airflow.operators.python_operator import PythonOperator
builtin_task = PythonOperator(task_id="builtin_task", python_callable=super)
builtin_task = PythonOperator(task_id="builtin_task", python_callable=list)
"""
ast = astroid.parse(test_code)

result = get_task_ids_to_python_callable_specs(ast)

expected_result = {
"builtin_task": PythonOperatorSpec(ast.body[1].value, "super"),
"builtin_task": PythonOperatorSpec(ast.body[1].value, "list"),
}

assert result == expected_result

@pytest.mark.xfail(reason="Test not yet written", raises=AssertionError, strict=True)
def test_should_detect_imported_callable(self):
assert False # TODO: write this test
@pytest.mark.xfail(reason="Not yet implemented", raises=AssertionError, strict=True)
def test_should_detect_imported_callable_as_attribute(self):
test_code = """
from airflow.operators.python_operator import PythonOperator
from datetime import date
builtin_task = PythonOperator(task_id="builtin_task", python_callable=date.today)
"""
ast = astroid.parse(test_code)

result = get_task_ids_to_python_callable_specs(ast)

expected_result = {
"builtin_task": PythonOperatorSpec(ast.body[2].value, "date.today"),
}

assert result == expected_result

@pytest.mark.xfail(reason="Not yet implemented", raises=AssertionError, strict=True)
def test_should_detect_imported_callable_as_name(self):
test_code = """
from airflow.operators.python_operator import PythonOperator
from datetime.date import today
builtin_task = PythonOperator(task_id="builtin_task", python_callable=today)
"""
ast = astroid.parse(test_code)

result = get_task_ids_to_python_callable_specs(ast)

expected_result = {
"builtin_task": PythonOperatorSpec(ast.body[2].value, "today"),
}

assert result == expected_result

def test_should_detect_local_function_callables(self):
test_code = """
Expand Down Expand Up @@ -134,21 +166,51 @@ def test_should_skip_builtin_callable(self):
test_code = """
from airflow.operators.python_operator import PythonOperator
builtin_task = PythonOperator(task_id="builtin_task", python_callable=super)
builtin_task = PythonOperator(task_id="builtin_task", python_callable=list)
"""
ast = astroid.parse(test_code)

test_task_ids_to_python_callable_specs = {
"builtin_task": PythonOperatorSpec(ast.body[1].value, "super"),
"builtin_task": PythonOperatorSpec(ast.body[1].value, "list"),
}

result = get_xcoms_from_tasks(ast, test_task_ids_to_python_callable_specs)

assert result == ({}, set())

@pytest.mark.xfail(reason="Test not yet written", raises=AssertionError, strict=True)
def test_should_skip_imported_callable(self):
assert False # TODO: write this test
def test_should_skip_imported_callable_as_attribute(self):
test_code = """
from airflow.operators.python_operator import PythonOperator
from datetime import date
builtin_task = PythonOperator(task_id="builtin_task", python_callable=date.today)
"""
ast = astroid.parse(test_code)

test_task_ids_to_python_callable_specs = {
"builtin_task": PythonOperatorSpec(ast.body[2].value, "date.today"),
}

result = get_xcoms_from_tasks(ast, test_task_ids_to_python_callable_specs)

assert result == ({}, set())

def test_should_skip_imported_callable_as_name(self):
test_code = """
from airflow.operators.python_operator import PythonOperator
from datetime.date import today
builtin_task = PythonOperator(task_id="builtin_task", python_callable=today)
"""
ast = astroid.parse(test_code)

test_task_ids_to_python_callable_specs = {
"builtin_task": PythonOperatorSpec(ast.body[2].value, "today"),
}

result = get_xcoms_from_tasks(ast, test_task_ids_to_python_callable_specs)

assert result == ({}, set())

def test_should_detect_xcom_push_tasks(self):
test_code = """
Expand Down

0 comments on commit aabb034

Please sign in to comment.