Skip to content

Commit

Permalink
refactor: iready asset/sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
cbini committed Jul 10, 2024
1 parent 8e9e85a commit b54b66a
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 159 deletions.
76 changes: 19 additions & 57 deletions src/teamster/code_locations/kippmiami/iready/assets.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,49 @@
from dagster import MultiPartitionsDefinition, StaticPartitionsDefinition

from teamster.code_locations.kippmiami import CODE_LOCATION
from teamster.code_locations.kippmiami import CODE_LOCATION, CURRENT_FISCAL_YEAR
from teamster.code_locations.kippmiami.iready.schema import (
DIAGNOSTIC_AND_INSTRUCTION_SCHEMA,
DIAGNOSTIC_RESULTS_SCHEMA,
INSTRUCTIONAL_USAGE_DATA_SCHEMA,
PERSONALIZED_INSTRUCTION_BY_LESSON_SCHEMA,
)
from teamster.libraries.sftp.assets import build_sftp_file_asset
from teamster.libraries.iready.assets import build_iready_sftp_asset

ssh_resource_key = "ssh_iready"
remote_dir_regex = r"/exports/fl-kipp_miami/(?P<academic_year>\w+)"
slugify_replacements = [["%", "percent"]]
region_subfolder = "fl-kipp_miami"
key_prefix = [CODE_LOCATION, "iready"]

subject_partitions_def = StaticPartitionsDefinition(["ela", "math"])

diagnostic_results = build_sftp_file_asset(
diagnostic_results = build_iready_sftp_asset(
asset_key=[*key_prefix, "diagnostic_results"],
remote_dir_regex=remote_dir_regex,
region_subfolder=region_subfolder,
remote_file_regex=r"diagnostic_results_(?P<subject>\w+)\.csv",
ssh_resource_key=ssh_resource_key,
avro_schema=DIAGNOSTIC_RESULTS_SCHEMA,
partitions_def=MultiPartitionsDefinition(
{
"subject": subject_partitions_def,
"academic_year": StaticPartitionsDefinition(
["Current_Year", "2023", "2022", "2021", "2020"]
),
}
),
slugify_replacements=slugify_replacements,
start_fiscal_year=2020,
current_fiscal_year=CURRENT_FISCAL_YEAR.fiscal_year,
)

personalized_instruction_by_lesson = build_sftp_file_asset(
personalized_instruction_by_lesson = build_iready_sftp_asset(
asset_key=[*key_prefix, "personalized_instruction_by_lesson"],
remote_dir_regex=remote_dir_regex,
region_subfolder=region_subfolder,
remote_file_regex=r"personalized_instruction_by_lesson_(?P<subject>\w+)\.csv",
ssh_resource_key=ssh_resource_key,
avro_schema=PERSONALIZED_INSTRUCTION_BY_LESSON_SCHEMA,
partitions_def=MultiPartitionsDefinition(
{
"subject": subject_partitions_def,
"academic_year": StaticPartitionsDefinition(
["Current_Year", "2023", "2022"]
),
}
),
slugify_replacements=slugify_replacements,
start_fiscal_year=2022,
current_fiscal_year=CURRENT_FISCAL_YEAR.fiscal_year,
)

instructional_usage_data = build_sftp_file_asset(
instructional_usage_data = build_iready_sftp_asset(
asset_key=[*key_prefix, "instructional_usage_data"],
remote_dir_regex=remote_dir_regex,
region_subfolder=region_subfolder,
remote_file_regex=r"instructional_usage_data_(?P<subject>\w+)\.csv",
ssh_resource_key=ssh_resource_key,
avro_schema=INSTRUCTIONAL_USAGE_DATA_SCHEMA,
partitions_def=MultiPartitionsDefinition(
{
"subject": subject_partitions_def,
"academic_year": StaticPartitionsDefinition(
["Current_Year", "2023", "2022"]
),
}
),
slugify_replacements=slugify_replacements,
start_fiscal_year=2022,
current_fiscal_year=CURRENT_FISCAL_YEAR.fiscal_year,
)

diagnostic_and_instruction = build_sftp_file_asset(
diagnostic_and_instruction = build_iready_sftp_asset(
asset_key=[*key_prefix, "diagnostic_and_instruction"],
remote_dir_regex=remote_dir_regex,
region_subfolder=region_subfolder,
remote_file_regex=r"diagnostic_and_instruction_(?P<subject>\w+)_ytd_window\.csv",
ssh_resource_key=ssh_resource_key,
avro_schema=DIAGNOSTIC_AND_INSTRUCTION_SCHEMA,
partitions_def=MultiPartitionsDefinition(
{
"subject": subject_partitions_def,
"academic_year": StaticPartitionsDefinition(
["Current_Year", "2023", "2022", "2021"]
),
}
),
slugify_replacements=slugify_replacements,
start_fiscal_year=2021,
current_fiscal_year=CURRENT_FISCAL_YEAR.fiscal_year,
)

assets = [
Expand Down
9 changes: 7 additions & 2 deletions src/teamster/code_locations/kippmiami/iready/sensors.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from teamster.code_locations.kippmiami import CODE_LOCATION, LOCAL_TIMEZONE
from teamster.code_locations.kippmiami import (
CODE_LOCATION,
CURRENT_FISCAL_YEAR,
LOCAL_TIMEZONE,
)
from teamster.code_locations.kippmiami.iready import assets
from teamster.libraries.iready.sensors import build_iready_sftp_sensor

sftp_sensor = build_iready_sftp_sensor(
code_location=CODE_LOCATION,
asset_defs=assets,
timezone=LOCAL_TIMEZONE,
remote_dir_regex=r"/exports/fl-kipp_miami/Current_Year",
remote_dir_regex=r"/exports/fl-kipp_miami",
current_fiscal_year=CURRENT_FISCAL_YEAR.fiscal_year,
minimum_interval_seconds=(60 * 10),
)

Expand Down
83 changes: 19 additions & 64 deletions src/teamster/code_locations/kippnewark/iready/assets.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,49 @@
from dagster import MultiPartitionsDefinition, StaticPartitionsDefinition

from teamster.code_locations.kippnewark import CODE_LOCATION
from teamster.code_locations.kippnewark import CODE_LOCATION, CURRENT_FISCAL_YEAR
from teamster.code_locations.kippnewark.iready.schema import (
DIAGNOSTIC_AND_INSTRUCTION_SCHEMA,
DIAGNOSTIC_RESULTS_SCHEMA,
INSTRUCTIONAL_USAGE_DATA_SCHEMA,
PERSONALIZED_INSTRUCTION_BY_LESSON_SCHEMA,
)
from teamster.libraries.sftp.assets import build_sftp_file_asset
from teamster.libraries.iready.assets import build_iready_sftp_asset

ssh_resource_key = "ssh_iready"
remote_dir_regex = r"/exports/nj-kipp_nj/(?P<academic_year>\w+)"
slugify_replacements = [["%", "percent"]]
region_subfolder = "nj-kipp_nj"
key_prefix = [CODE_LOCATION, "iready"]

subject_partitions_def = StaticPartitionsDefinition(["ela", "math"])

diagnostic_results = build_sftp_file_asset(
diagnostic_results = build_iready_sftp_asset(
asset_key=[*key_prefix, "diagnostic_results"],
remote_dir_regex=remote_dir_regex,
region_subfolder=region_subfolder,
remote_file_regex=r"diagnostic_results_(?P<subject>\w+)\.csv",
ssh_resource_key=ssh_resource_key,
avro_schema=DIAGNOSTIC_RESULTS_SCHEMA,
partitions_def=MultiPartitionsDefinition(
{
"subject": subject_partitions_def,
"academic_year": StaticPartitionsDefinition(
["Current_Year", "2023", "2022", "2021", "2020"]
),
}
),
slugify_replacements=slugify_replacements,
start_fiscal_year=2020,
current_fiscal_year=CURRENT_FISCAL_YEAR.fiscal_year,
)

personalized_instruction_by_lesson = build_sftp_file_asset(
personalized_instruction_by_lesson = build_iready_sftp_asset(
asset_key=[*key_prefix, "personalized_instruction_by_lesson"],
remote_dir_regex=remote_dir_regex,
region_subfolder=region_subfolder,
remote_file_regex=r"personalized_instruction_by_lesson_(?P<subject>\w+)\.csv",
ssh_resource_key=ssh_resource_key,
avro_schema=PERSONALIZED_INSTRUCTION_BY_LESSON_SCHEMA,
partitions_def=MultiPartitionsDefinition(
{
"subject": subject_partitions_def,
"academic_year": StaticPartitionsDefinition(
["Current_Year", "2023", "2022"]
),
}
),
slugify_replacements=slugify_replacements,
op_tags={
"dagster-k8s/config": {
"container_config": {
"resources": {"requests": {"cpu": "750m"}, "limits": {"cpu": "750m"}}
}
}
},
start_fiscal_year=2022,
current_fiscal_year=CURRENT_FISCAL_YEAR.fiscal_year,
)

instructional_usage_data = build_sftp_file_asset(
instructional_usage_data = build_iready_sftp_asset(
asset_key=[*key_prefix, "instructional_usage_data"],
remote_dir_regex=remote_dir_regex,
region_subfolder=region_subfolder,
remote_file_regex=r"instructional_usage_data_(?P<subject>\w+)\.csv",
ssh_resource_key=ssh_resource_key,
avro_schema=INSTRUCTIONAL_USAGE_DATA_SCHEMA,
partitions_def=MultiPartitionsDefinition(
{
"subject": subject_partitions_def,
"academic_year": StaticPartitionsDefinition(
["Current_Year", "2023", "2022"]
),
}
),
slugify_replacements=slugify_replacements,
start_fiscal_year=2022,
current_fiscal_year=CURRENT_FISCAL_YEAR.fiscal_year,
)

diagnostic_and_instruction = build_sftp_file_asset(
diagnostic_and_instruction = build_iready_sftp_asset(
asset_key=[*key_prefix, "diagnostic_and_instruction"],
remote_dir_regex=remote_dir_regex,
region_subfolder=region_subfolder,
remote_file_regex=r"diagnostic_and_instruction_(?P<subject>\w+)_ytd_window\.csv",
ssh_resource_key=ssh_resource_key,
avro_schema=DIAGNOSTIC_AND_INSTRUCTION_SCHEMA,
partitions_def=MultiPartitionsDefinition(
{
"subject": subject_partitions_def,
"academic_year": StaticPartitionsDefinition(
["Current_Year", "2023", "2022", "2021"]
),
}
),
slugify_replacements=slugify_replacements,
start_fiscal_year=2021,
current_fiscal_year=CURRENT_FISCAL_YEAR.fiscal_year,
)

assets = [
Expand Down
9 changes: 7 additions & 2 deletions src/teamster/code_locations/kippnewark/iready/sensors.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from teamster.code_locations.kippnewark import CODE_LOCATION, LOCAL_TIMEZONE
from teamster.code_locations.kippnewark import (
CODE_LOCATION,
CURRENT_FISCAL_YEAR,
LOCAL_TIMEZONE,
)
from teamster.code_locations.kippnewark.iready import assets
from teamster.libraries.iready.sensors import build_iready_sftp_sensor

sftp_sensor = build_iready_sftp_sensor(
code_location=CODE_LOCATION,
asset_defs=assets,
timezone=LOCAL_TIMEZONE,
remote_dir_regex=r"/exports/nj-kipp_nj/Current_Year",
remote_dir_regex=r"/exports/nj-kipp_nj",
current_fiscal_year=CURRENT_FISCAL_YEAR.fiscal_year,
minimum_interval_seconds=(60 * 10),
)

Expand Down
2 changes: 1 addition & 1 deletion src/teamster/libraries/core/utils/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


def regex_pattern_replace(pattern: str, replacements: Mapping[str, str]):
for group in re.findall(r"\(\?P<\w+>[\w\[\]\{\}\+\-\\\/\.]*\)", pattern):
for group in re.findall(r"\(\?P<\w+>[\w\+\-\.\[\]\{\}\/\\]*\)", pattern):
match = _check.not_none(
value=re.search(pattern=r"(?<=<)(\w+)(?=>)", string=group)
)
Expand Down
32 changes: 32 additions & 0 deletions src/teamster/libraries/iready/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dagster import MultiPartitionsDefinition, StaticPartitionsDefinition

from teamster.libraries.sftp.assets import build_sftp_file_asset


def build_iready_sftp_asset(
asset_key,
region_subfolder,
remote_file_regex,
avro_schema,
start_fiscal_year: int,
current_fiscal_year: int,
):
partition_keys = [
str(y - 1) for y in range(start_fiscal_year, current_fiscal_year + 1)
]

return build_sftp_file_asset(
asset_key=asset_key,
remote_dir_regex=rf"/exports/{region_subfolder}/(?P<academic_year>\w+)",
remote_file_regex=remote_file_regex,
ssh_resource_key="ssh_iready",
group_name="iready",
avro_schema=avro_schema,
slugify_replacements=[["%", "percent"]],
partitions_def=MultiPartitionsDefinition(
{
"subject": StaticPartitionsDefinition(["ela", "math"]),
"academic_year": StaticPartitionsDefinition(partition_keys),
}
),
)
63 changes: 42 additions & 21 deletions src/teamster/libraries/iready/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@


def build_iready_sftp_sensor(
code_location,
code_location: str,
asset_defs: list[AssetsDefinition],
timezone,
remote_dir_regex,
remote_dir_regex: str,
current_fiscal_year: int,
minimum_interval_seconds=None,
):
@sensor(
Expand All @@ -41,34 +42,54 @@ def _sensor(context: SensorEvaluationContext, ssh_iready: SSHResource):
return SensorResult(skip_reason=str(e))

for asset in asset_defs:
asset_metadata = asset.metadata_by_key[asset.key]
metadata_by_key = asset.metadata_by_key[asset.key]
asset_identifier = asset.key.to_python_identifier()

context.log.info(asset_identifier)
last_run = cursor.get(asset_identifier, 0)

for f, _ in files:
match = re.match(
pattern=asset_metadata["remote_file_regex"], string=f.filename
pattern = re.compile(
pattern=(
f"{metadata_by_key["remote_dir_regex"]}/"
f"{metadata_by_key["remote_file_regex"]}"
)
)

if (
match is not None
and f.st_mtime > last_run
and _check.not_none(value=f.st_size) > 0
):
context.log.info(f"{f.filename}: {f.st_mtime} - {f.st_size}")
run_requests.append(
RunRequest(
run_key=f"{asset_identifier}_{f.st_mtime}",
asset_selection=[asset.key],
partition_key=MultiPartitionKey(
{**match.groupdict(), "academic_year": "Current_Year"}
),
)
file_matches = [
(f, path)
for f, path in files
if pattern.match(string=path)
and _check.not_none(value=f.st_mtime) > last_run
and _check.not_none(value=f.st_size) > 0
]

for f, path in file_matches:
match = _check.not_none(value=pattern.match(string=path))

group_dict = match.groupdict()

if group_dict["academic_year"] == "Current_Year":
partition_key = MultiPartitionKey(
{
"academic_year": str(current_fiscal_year - 1),
"subject": group_dict["subject"],
}
)
else:
partition_key = MultiPartitionKey(group_dict)

context.log.info(f"{f.filename}: {partition_key}")
run_requests.append(
RunRequest(
run_key=(
f"{asset_identifier}__{partition_key}__{now.timestamp()}"
),
asset_selection=[asset.key],
partition_key=partition_key,
)
)

cursor[asset_identifier] = now.timestamp()
cursor[asset_identifier] = now.timestamp()

return SensorResult(run_requests=run_requests, cursor=json.dumps(obj=cursor))

Expand Down
Loading

0 comments on commit b54b66a

Please sign in to comment.