Skip to content

Commit

Permalink
Add top level resources field to GrapheneQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
gibsondan committed Nov 13, 2024
1 parent 42d3814 commit 5d52166
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 45 deletions.

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 32 additions & 21 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
from dagster._core.snap import ResourceDefSnap

from dagster_graphql.schema.config_types import GrapheneConfigTypeField
from dagster_graphql.schema.util import ResolveInfo
from dagster_graphql.schema.errors import (
GrapheneInvalidSubsetError,
GraphenePipelineNotFoundError,
GraphenePythonError,
)
from dagster_graphql.schema.util import ResolveInfo, non_null_list


class GrapheneResource(graphene.ObjectType):
Expand Down Expand Up @@ -44,3 +49,21 @@ def resolve_configField(self, _graphene_info: ResolveInfo):
)

return None


class GrapheneResourceConnection(graphene.ObjectType):
class Meta:
name = "ResourceConnection"

resources = non_null_list(GrapheneResource)


class GrapheneResourcesOrError(graphene.Union):
class Meta:
types = (
GrapheneResourceConnection,
GraphenePipelineNotFoundError,
GrapheneInvalidSubsetError,
GraphenePythonError,
)
name = "ResourcesOrError"
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph
from dagster._core.definitions.selector import (
InstigatorSelector,
JobSelector,
RepositorySelector,
ResourceSelector,
ScheduleSelector,
Expand Down Expand Up @@ -92,6 +93,7 @@
from dagster_graphql.implementation.loader import StaleStatusLoader
from dagster_graphql.implementation.run_config_schema import resolve_run_config_schema_or_error
from dagster_graphql.implementation.utils import (
UserFacingGraphQLError,
capture_error,
graph_selector_from_graphql,
pipeline_selector_from_graphql,
Expand Down Expand Up @@ -164,6 +166,11 @@
GraphenePipeline,
GrapheneRunOrError,
)
from dagster_graphql.schema.pipelines.resource import (
GrapheneResource,
GrapheneResourceConnection,
GrapheneResourcesOrError,
)
from dagster_graphql.schema.pipelines.snapshot import GraphenePipelineSnapshotOrError
from dagster_graphql.schema.resources import (
GrapheneResourceDetailsListOrError,
Expand Down Expand Up @@ -244,6 +251,12 @@ class Meta:
description="Retrieve a job by its location name, repository name, and job name.",
)

resourcesOrError = graphene.Field(
graphene.NonNull(GrapheneResourcesOrError),
pipelineSelector=graphene.NonNull(GraphenePipelineSelector),
description="Retrieve the list of resources for a given job.",
)

pipelineSnapshotOrError = graphene.Field(
graphene.NonNull(GraphenePipelineSnapshotOrError),
snapshotId=graphene.String(),
Expand Down Expand Up @@ -807,6 +820,31 @@ def resolve_pipelineOrError(self, graphene_info: ResolveInfo, params: GraphenePi
get_remote_job_or_raise(graphene_info, pipeline_selector_from_graphql(params))
)

@capture_error
def resolve_resourcesOrError(
self, graphene_info: ResolveInfo, pipelineSelector: GraphenePipelineSelector
) -> Sequence[GrapheneResource]:
from dagster_graphql.schema.errors import GraphenePipelineNotFoundError

job_selector = JobSelector(
location_name=pipelineSelector.repositoryLocationName,
repository_name=pipelineSelector.repositoryName,
job_name=pipelineSelector.pipelineName,
)

if not graphene_info.context.has_job(job_selector):
raise UserFacingGraphQLError(GraphenePipelineNotFoundError(selector=job_selector))

def _get_config_type(key: str):
return graphene_info.context.get_config_type(job_selector, key)

return GrapheneResourceConnection(
resources=[
GrapheneResource(_get_config_type, resource_snap)
for resource_snap in graphene_info.context.get_resources(job_selector)
]
)

def resolve_pipelineRunsOrError(
self,
_graphene_info: ResolveInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,29 @@
}),
]),
}),
'resourcesOrError': dict({
'__typename': 'ResourceConnection',
'resources': list([
dict({
'configField': dict({
'configType': dict({
'key': 'Int',
}),
}),
'description': None,
'name': 'R1',
}),
dict({
'configField': dict({
'configType': dict({
'key': 'Any',
}),
}),
'description': 'Built-in filesystem IO manager that stores and retrieves values using pickling.',
'name': 'io_manager',
}),
]),
}),
})
# ---
# name: test_required_resources[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,32 @@
modes {
name
resources {
...ResourceInfo
}
}
}
}
resourcesOrError(pipelineSelector: $selector) {
__typename
... on ResourceConnection {
resources {
...ResourceInfo
}
}
}
}
fragment ResourceInfo on Resource {
name
description
configField {
configType {
key
... on CompositeConfigType {
fields {
name
description
configField {
configType {
key
... on CompositeConfigType {
fields {
name
configType {
key
}
}
}
}
configType {
key
}
}
}
Expand Down Expand Up @@ -62,9 +74,12 @@ def test_mode_fetch_resources(graphql_context: WorkspaceRequestContext, snapshot
assert not result.errors
assert result.data
assert result.data["pipelineOrError"]
assert result.data["pipelineOrError"]["modes"]
for mode_data in result.data["pipelineOrError"]["modes"]:
assert mode_data["resources"]
assert len(result.data["pipelineOrError"]["modes"]) == 1
assert result.data["pipelineOrError"]["modes"][0]["resources"]
assert (
result.data["pipelineOrError"]["modes"][0]["resources"]
== result.data["resourcesOrError"]["resources"]
)

snapshot.assert_match(result.data)

Expand Down
10 changes: 10 additions & 0 deletions python_modules/dagster/dagster/_core/workspace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
ManagedGrpcPythonEnvCodeLocationOrigin,
)
from dagster._core.snap.dagster_types import DagsterTypeSnap
from dagster._core.snap.mode import ResourceDefSnap
from dagster._core.snap.node import GraphDefSnap, OpDefSnap
from dagster._core.workspace.load_target import WorkspaceLoadTarget
from dagster._core.workspace.permissions import (
Expand Down Expand Up @@ -413,6 +414,15 @@ def get_dagster_type(
job = self.get_full_job(job_selector)
return job.job_snapshot.dagster_type_namespace_snapshot.get_dagster_type_snap(type_key)

def get_resources(
self,
job_selector: Union[JobSubsetSelector, JobSelector],
) -> Sequence[ResourceDefSnap]:
job = self.get_full_job(job_selector)
if not job.mode_def_snaps:
return []
return job.mode_def_snaps[0].resource_def_snaps

def get_dagster_library_versions(self, location_name: str) -> Optional[Mapping[str, str]]:
return self.get_code_location(location_name).get_dagster_library_versions()

Expand Down

0 comments on commit 5d52166

Please sign in to comment.