Skip to content

Commit

Permalink
Merge pull request #4 from openearthplatforminitiative/feat/kubernetes
Browse files Browse the repository at this point in the history
feat: add dagster-values with example image
  • Loading branch information
Erik Dymbe authored Mar 20, 2024
2 parents 83d6ba3 + a14be67 commit d9f4819
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 285 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,4 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea/
2 changes: 1 addition & 1 deletion data_pipelines/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
region_name=settings.aws_region,
scheduler_task_definition_arn=settings.dask_scheduler_task_definition_arn,
worker_task_definition_arn=settings.dask_worker_task_definition_arn,
cluster_arn=settings.dask_cluster_arn,
cluster_arn=settings.dask_ecs_cluster_arn,
execution_role_arn=settings.dask_execution_role_arn,
security_groups=settings.dask_security_groups,
task_role_arn=settings.dask_task_role_arn,
Expand Down
2 changes: 1 addition & 1 deletion data_pipelines/resources/dask_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _provision_cluster(self, context: InitResourceContext):
) as cluster:
yield cluster
else:
context.log.warn(
context.log.warning(
"Dask cluster ARN config could not be found. Launching ephemeral Dask cluster on AWS Fargate."
)
with FargateCluster(
Expand Down
25 changes: 19 additions & 6 deletions data_pipelines/resources/io_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ def __init__(self, base_path: UPath):
def dump_to_path(
self, context: OutputContext, obj: xr.DataArray, path: UPath
) -> None:
obj.to_zarr(path, mode="w", storage_options=path.storage_options)
obj.to_zarr(
path.as_uri(),
mode="w",
storage_options=dict(path.storage_options),
)

def load_from_path(self, context: InputContext, path: UPath) -> xr.DataArray:
return xr.open_dataarray(path)
Expand All @@ -70,15 +74,24 @@ def __init__(self, base_path: UPath):
def dump_to_path(
self, context: OutputContext, obj: pd.DataFrame | dd.DataFrame, path: UPath
):
if isinstance(obj, pd.DataFrame):
obj.to_parquet(path, storage_options=path.storage_options)
else:
obj.to_parquet(path, overwrite=True, storage_options=path.storage_options)
match obj:
case pd.DataFrame:
obj.to_parquet(
path.as_uri(),
storage_options=path.storage_options,
)
case dd.DataFrame:
obj.to_parquet(
path.as_uri(),
storage_options=path.storage_options,
)
case _:
raise TypeError("Must be either a Pandas or Dask DataFrame.")

def load_from_path(
self, context: InputContext, path: UPath | Sequence[UPath]
) -> dd.DataFrame:
return dd.read_parquet(path)
return dd.read_parquet(path.as_uri())

def load_input(self, context: InputContext) -> dd.DataFrame:
if not context.has_asset_partitions:
Expand Down
3 changes: 2 additions & 1 deletion data_pipelines/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Settings(BaseSettings):

dask_scheduler_task_definition_arn: str | None = None
dask_worker_task_definition_arn: str | None = None
dask_cluster_arn: str | None = None
dask_ecs_cluster_arn: str | None = None
dask_execution_role_arn: str | None = None
dask_task_role_arn: str | None = None
dask_security_group_id: str | None = None
Expand All @@ -30,6 +30,7 @@ def base_data_upath(self) -> UPath:
self.base_data_path,
key=self.aws_access_key_id,
secret=self.aws_secret_access_key,
client_kwargs={"region_name": self.aws_region},
)


Expand Down
15 changes: 15 additions & 0 deletions deployment/dagster/routing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: dagster-webserver
namespace: dagster
spec:
entryPoints:
- websecure
routes:
- kind: Rule
match: PathPrefix(`/dagster`)
services:
- kind: Service
name: dagster-dagster-webserver
port: 80
76 changes: 76 additions & 0 deletions deployment/dagster/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
dagster-user-deployments:
enabled: true
deployments:
- name: "data-pipelines"
image:
repository: "ghcr.io/openearthplatforminitiative/data-pipelines"
tag: latest
pullPolicy: Always
dagsterApiGrpcArgs:
- "--module-name"
- "data_pipelines"
port: 3030
envSecrets:
- name: "dagster-aws-credentials"
envConfigMaps:
- name: "dagster-config"

dagsterWebserver:
pathPrefix: "/dagster"
readinessProbe:
httpGet:
path: "/dagster/server_info"
port: 80
periodSeconds: 20
timeoutSeconds: 10
successThreshold: 1
failureThreshold: 3
resources:
requests:
memory: 256Mi
cpu: 250m
limits:
memory: 256Mi
cpu: 250m

dagsterDaemon:
resources:
requests:
memory: 256Mi
cpu: 250m
limits:
memory: 256Mi
cpu: 250m

runCoordinator:
enabled: true
type: QueuedRunCoordinator
config:
queuedRunCoordinator:
maxConcurrentRuns: 3


runLauncher:
type: K8sRunLauncher
config:
k8sRunLauncher:
runK8sConfig:
containerConfig:
resources:
requests:
memory: 1024Mi
cpu: 250m
limits:
memory: 1024Mi
cpu: 250m
envSecrets:
- name: "dagster-aws-credentials"
envConfigMaps:
- name: "dagster-config"

postgresql:
# set postgresql.enabled to be false to disable deploy of a PostgreSQL database and use an
# existing external PostgreSQL database
enabled: false

generatePostgresqlPasswordSecret: false
Loading

0 comments on commit d9f4819

Please sign in to comment.