Skip to content

Commit

Permalink
dir assets
Browse files Browse the repository at this point in the history
  • Loading branch information
cbini committed Jul 30, 2023
1 parent 874734e commit 0b92ac0
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 31 deletions.
59 changes: 57 additions & 2 deletions src/teamster/core/google/directory/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,67 @@ def build_google_directory_assets(code_location):
key=[code_location, "google", "directory", "users"],
io_manager_key="gcs_avro_io",
)
def _asset(
def users(
context: AssetExecutionContext, google_directory: GoogleDirectoryResource
):
data = google_directory.list_users(projection="full")
schema = get_avro_record_schema(name="users", fields=ASSET_FIELDS["users"])

yield Output(value=(data, schema), metadata={"record_count": len(data)})

return _asset
@asset(
key=[code_location, "google", "directory", "groups"],
io_manager_key="gcs_avro_io",
)
def groups(
context: AssetExecutionContext, google_directory: GoogleDirectoryResource
):
data = google_directory.list_groups()
schema = get_avro_record_schema(name="groups", fields=ASSET_FIELDS["groups"])

yield Output(value=(data, schema), metadata={"record_count": len(data)})

@asset(
key=[code_location, "google", "directory", "roles"],
io_manager_key="gcs_avro_io",
)
def roles(
context: AssetExecutionContext, google_directory: GoogleDirectoryResource
):
data = google_directory.list_roles()
schema = get_avro_record_schema(name="roles", fields=ASSET_FIELDS["roles"])

yield Output(value=(data, schema), metadata={"record_count": len(data)})

@asset(
key=[code_location, "google", "directory", "role_assignments"],
io_manager_key="gcs_avro_io",
)
def role_assignments(
context: AssetExecutionContext, google_directory: GoogleDirectoryResource
):
data = google_directory.list_role_assignments()
schema = get_avro_record_schema(
name="role_assignments", fields=ASSET_FIELDS["role_assignments"]
)

yield Output(value=(data, schema), metadata={"record_count": len(data)})

return [users, groups, roles, role_assignments]


def build_google_directory_partitioned_assets(code_location, partitions_def):
@asset(
key=[code_location, "google", "directory", "members"],
io_manager_key="gcs_avro_io",
partitions_def=partitions_def,
)
def members(
context: AssetExecutionContext, google_directory: GoogleDirectoryResource
):
data = google_directory.list_members(group_key=context.partition_key)
schema = get_avro_record_schema(name="members", fields=ASSET_FIELDS["members"])

yield Output(value=(data, schema), metadata={"record_count": len(data)})

return [members]
79 changes: 79 additions & 0 deletions src/teamster/core/google/directory/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,85 @@
},
]

GROUP_FIELDS = [
{"name": "id", "type": ["null", "string"], "default": None},
{"name": "email", "type": ["null", "string"], "default": None},
{"name": "name", "type": ["null", "string"], "default": None},
{"name": "description", "type": ["null", "string"], "default": None},
{"name": "adminCreated", "type": ["null", "boolean"], "default": None},
{"name": "directMembersCount", "type": ["null", "string"], "default": None},
{"name": "kind", "type": ["null", "string"], "default": None},
{"name": "etag", "type": ["null", "string"], "default": None},
{
"name": "aliases",
"type": ["null", {"type": "array", "items": "string", "default": []}],
"default": None,
},
{
"name": "nonEditableAliases",
"type": ["null", {"type": "array", "items": "string", "default": []}],
"default": None,
},
]

MEMBER_FIELDS = [
{"name": "kind", "type": ["null", "string"], "default": None},
{"name": "email", "type": ["null", "string"], "default": None},
{"name": "role", "type": ["null", "string"], "default": None},
{"name": "etag", "type": ["null", "string"], "default": None},
{"name": "type", "type": ["null", "string"], "default": None},
{"name": "status", "type": ["null", "string"], "default": None},
{"name": "delivery_settings", "type": ["null", "string"], "default": None},
{"name": "id", "type": ["null", "string"], "default": None},
]

ROLE_PRIVILEGE_FIELDS = [
{"name": "serviceId", "type": ["null", "string"], "default": None},
{"name": "privilegeName", "type": ["null", "string"], "default": None},
]

ROLE_FIELDS = [
{"name": "roleId", "type": ["null", "string"], "default": None},
{"name": "roleName", "type": ["null", "string"], "default": None},
{"name": "roleDescription", "type": ["null", "string"], "default": None},
{"name": "isSystemRole", "type": ["null", "boolean"], "default": None},
{"name": "isSuperAdminRole", "type": ["null", "boolean"], "default": None},
{"name": "kind", "type": ["null", "string"], "default": None},
{"name": "etag", "type": ["null", "string"], "default": None},
{
"name": "rolePrivileges",
"type": [
"null",
{
"type": "array",
"items": get_avro_record_schema(
name="role_privileges",
fields=ROLE_PRIVILEGE_FIELDS,
namespace="role",
),
"default": [],
},
],
"default": None,
},
]

ROLE_ASSIGNMENT_FIELDS = [
{"name": "roleAssignmentId", "type": ["null", "string"], "default": None},
{"name": "roleId", "type": ["null", "string"], "default": None},
{"name": "kind", "type": ["null", "string"], "default": None},
{"name": "etag", "type": ["null", "string"], "default": None},
{"name": "assignedTo", "type": ["null", "string"], "default": None},
{"name": "assigneeType", "type": ["null", "string"], "default": None},
{"name": "scopeType", "type": ["null", "string"], "default": None},
{"name": "orgUnitId", "type": ["null", "string"], "default": None},
{"name": "condition", "type": ["null", "string"], "default": None},
]

ASSET_FIELDS = {
"users": USER_FIELDS,
"groups": GROUP_FIELDS,
"members": MEMBER_FIELDS,
"roles": ROLE_FIELDS,
"role_assignments": ROLE_ASSIGNMENT_FIELDS,
}
21 changes: 19 additions & 2 deletions src/teamster/kipptaf/google/assets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from dagster import StaticPartitionsDefinition, config_from_files

from teamster.core.google.directory.assets import build_google_directory_assets
from teamster.core.google.directory.assets import (
build_google_directory_assets,
build_google_directory_partitioned_assets,
)
from teamster.core.google.forms.assets import build_google_forms_assets
from teamster.core.google.sheets.assets import build_gsheet_asset
from teamster.kipptaf import CODE_LOCATION
Expand All @@ -21,7 +24,21 @@
partitions_def=StaticPartitionsDefinition(FORM_IDS),
)

google_directory_assets = [build_google_directory_assets(code_location=CODE_LOCATION)]
GROUP_KEYS = [
"group-students-camden@teamstudents.org",
"group-students-miami@teamstudents.org",
"group-students-newark@teamstudents.org",
]

google_directory_assets = build_google_directory_assets(code_location=CODE_LOCATION)
google_directory_partitioned_assets = build_google_directory_partitioned_assets(
code_location=CODE_LOCATION, partitions_def=StaticPartitionsDefinition(GROUP_KEYS)
)

google_directory_assets = [
*google_directory_assets,
*google_directory_partitioned_assets,
]

__all__ = [
*google_sheets_assets,
Expand Down
65 changes: 38 additions & 27 deletions tests/google/directory/test_schema_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,52 @@
from teamster.core.google.directory.schema import ASSET_FIELDS
from teamster.core.utils.functions import get_avro_record_schema

TESTS = ["users"]

def _test_schema(asset_name):
print(asset_name)

def test_schema():
for asset_name in TESTS:
print(asset_name)
with open(file=f"env/{asset_name}.json", mode="r") as fp:
records = json.load(fp=fp)

with open(file=f"env/{asset_name}.json", mode="r") as fp:
records = json.load(fp=fp)
schema = get_avro_record_schema(name=asset_name, fields=ASSET_FIELDS[asset_name])
# print(schema)

schema = get_avro_record_schema(
name=asset_name, fields=ASSET_FIELDS[asset_name]
)
# print(schema)
parsed_schema = parse_schema(schema)

parsed_schema = parse_schema(schema)
count = len(records)

count = len(records)
sample_record = records[random.randint(a=0, b=(count - 1))]
# print(sample_record)

sample_record = records[random.randint(a=0, b=(count - 1))]
# print(sample_record)
assert validation.validate(datum=sample_record, schema=parsed_schema, strict=True)

assert validation.validate(
datum=sample_record, schema=parsed_schema, strict=True
)
assert validation.validate_many(records=records, schema=parsed_schema, strict=True)

assert validation.validate_many(
records=records, schema=parsed_schema, strict=True
with open(file="/dev/null", mode="wb") as fo:
writer(
fo=fo,
schema=parsed_schema,
records=records,
codec="snappy",
strict_allow_default=True,
)

with open(file="/dev/null", mode="wb") as fo:
writer(
fo=fo,
schema=parsed_schema,
records=records,
codec="snappy",
strict_allow_default=True,
)

def test_users():
_test_schema("users")


def test_groups():
_test_schema("groups")


def test_members():
_test_schema("members")


def test_roles():
_test_schema("roles")


def test_role_assignments():
_test_schema("role_assignments")

0 comments on commit 0b92ac0

Please sign in to comment.