Skip to content

Commit

Permalink
Add top level resources field to GrapheneQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
gibsondan committed Nov 13, 2024
1 parent 0857c31 commit fa024e4
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 23 deletions.

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 39 additions & 2 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
from dagster._core.snap import ResourceDefSnap

from dagster_graphql.schema.config_types import GrapheneConfigTypeField
from dagster_graphql.schema.util import ResolveInfo
from dagster_graphql.schema.errors import (
GrapheneInvalidSubsetError,
GraphenePipelineNotFoundError,
GraphenePythonError,
)
from dagster_graphql.schema.util import ResolveInfo, non_null_list


class GrapheneResource(graphene.ObjectType):
Expand Down Expand Up @@ -44,3 +49,21 @@ def resolve_configField(self, _graphene_info: ResolveInfo):
)

return None


class GrapheneResourceConnection(graphene.ObjectType):
class Meta:
name = "ResourceConnection"

resources = non_null_list(GrapheneResource)


class GrapheneResourcesOrError(graphene.Union):
class Meta:
types = (
GrapheneResourceConnection,
GraphenePipelineNotFoundError,
GrapheneInvalidSubsetError,
GraphenePythonError,
)
name = "ResourcesOrError"
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,4 @@ class Meta:
class GrapheneResourceDetailsListOrError(graphene.Union):
class Meta:
types = (GrapheneResourceDetailsList, GrapheneRepositoryNotFoundError, GraphenePythonError)
name = "ResourcesOrError"
name = "ResourceDetailsListOrError"
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
from dagster_graphql.implementation.loader import StaleStatusLoader
from dagster_graphql.implementation.run_config_schema import resolve_run_config_schema_or_error
from dagster_graphql.implementation.utils import (
UserFacingGraphQLError,
capture_error,
graph_selector_from_graphql,
pipeline_selector_from_graphql,
Expand Down Expand Up @@ -164,6 +165,11 @@
GraphenePipeline,
GrapheneRunOrError,
)
from dagster_graphql.schema.pipelines.resource import (
GrapheneResource,
GrapheneResourceConnection,
GrapheneResourcesOrError,
)
from dagster_graphql.schema.pipelines.snapshot import GraphenePipelineSnapshotOrError
from dagster_graphql.schema.resources import (
GrapheneResourceDetailsListOrError,
Expand Down Expand Up @@ -244,6 +250,12 @@ class Meta:
description="Retrieve a job by its location name, repository name, and job name.",
)

resourcesOrError = graphene.Field(
graphene.NonNull(GrapheneResourcesOrError),
pipelineSelector=graphene.NonNull(GraphenePipelineSelector),
description="Retrieve the list of resources for a given job.",
)

pipelineSnapshotOrError = graphene.Field(
graphene.NonNull(GraphenePipelineSnapshotOrError),
snapshotId=graphene.String(),
Expand Down Expand Up @@ -807,6 +819,32 @@ def resolve_pipelineOrError(self, graphene_info: ResolveInfo, params: GraphenePi
get_remote_job_or_raise(graphene_info, pipeline_selector_from_graphql(params))
)

@capture_error
def resolve_resourcesOrError(
self, graphene_info: ResolveInfo, pipelineSelector: GraphenePipelineSelector
) -> Sequence[GrapheneResource]:
from dagster_graphql.schema.errors import GraphenePipelineNotFoundError

job_selector = pipeline_selector_from_graphql(pipelineSelector)

if not graphene_info.context.has_job(job_selector):
raise UserFacingGraphQLError(GraphenePipelineNotFoundError(selector=job_selector))

check.invariant(
not job_selector.is_subset_selection,
"resourcesOrError only accepts non-subsetted selectors",
)

def _get_config_type(key: str):
return graphene_info.context.get_config_type(job_selector, key)

return GrapheneResourceConnection(
resources=[
GrapheneResource(_get_config_type, resource_snap)
for resource_snap in graphene_info.context.get_resources(job_selector)
]
)

def resolve_pipelineRunsOrError(
self,
_graphene_info: ResolveInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,29 @@
}),
]),
}),
'resourcesOrError': dict({
'__typename': 'ResourceConnection',
'resources': list([
dict({
'configField': dict({
'configType': dict({
'key': 'Int',
}),
}),
'description': None,
'name': 'R1',
}),
dict({
'configField': dict({
'configType': dict({
'key': 'Any',
}),
}),
'description': 'Built-in filesystem IO manager that stores and retrieves values using pickling.',
'name': 'io_manager',
}),
]),
}),
})
# ---
# name: test_required_resources[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,32 @@
modes {
name
resources {
...ResourceInfo
}
}
}
}
resourcesOrError(pipelineSelector: $selector) {
__typename
... on ResourceConnection {
resources {
...ResourceInfo
}
}
}
}
fragment ResourceInfo on Resource {
name
description
configField {
configType {
key
... on CompositeConfigType {
fields {
name
description
configField {
configType {
key
... on CompositeConfigType {
fields {
name
configType {
key
}
}
}
}
configType {
key
}
}
}
Expand Down Expand Up @@ -62,9 +74,12 @@ def test_mode_fetch_resources(graphql_context: WorkspaceRequestContext, snapshot
assert not result.errors
assert result.data
assert result.data["pipelineOrError"]
assert result.data["pipelineOrError"]["modes"]
for mode_data in result.data["pipelineOrError"]["modes"]:
assert mode_data["resources"]
assert len(result.data["pipelineOrError"]["modes"]) == 1
assert result.data["pipelineOrError"]["modes"][0]["resources"]
assert (
result.data["pipelineOrError"]["modes"][0]["resources"]
== result.data["resourcesOrError"]["resources"]
)

snapshot.assert_match(result.data)

Expand Down
10 changes: 10 additions & 0 deletions python_modules/dagster/dagster/_core/workspace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
ManagedGrpcPythonEnvCodeLocationOrigin,
)
from dagster._core.snap.dagster_types import DagsterTypeSnap
from dagster._core.snap.mode import ResourceDefSnap
from dagster._core.snap.node import GraphDefSnap, OpDefSnap
from dagster._core.workspace.load_target import WorkspaceLoadTarget
from dagster._core.workspace.permissions import (
Expand Down Expand Up @@ -413,6 +414,15 @@ def get_dagster_type(
job = self.get_full_job(job_selector)
return job.job_snapshot.dagster_type_namespace_snapshot.get_dagster_type_snap(type_key)

def get_resources(
self,
job_selector: Union[JobSubsetSelector, JobSelector],
) -> Sequence[ResourceDefSnap]:
job = self.get_full_job(job_selector)
if not job.mode_def_snaps:
return []
return job.mode_def_snaps[0].resource_def_snaps

def get_dagster_library_versions(self, location_name: str) -> Optional[Mapping[str, str]]:
return self.get_code_location(location_name).get_dagster_library_versions()

Expand Down

0 comments on commit fa024e4

Please sign in to comment.