Skip to content

Commit

Permalink
fix: update dependencies (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneskoester authored Oct 27, 2023
1 parent a31cf75 commit 5914f33
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
uses: medyagh/setup-minikube@v0.0.14

- name: Run pytest
run: poetry run coverage run -m pytest tests/tests.py
run: poetry run coverage run -m pytest tests/tests.py -sv

- name: Run Coverage
run: poetry run coverage report -m
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ readme = "README.md"

[tool.poetry.dependencies]
python = "^3.11"
snakemake-interface-common = "^1.13.0"
snakemake-interface-executor-plugins = "^7.0.0"
snakemake-interface-common = "^1.14.1"
snakemake-interface-executor-plugins = "^7.0.3"
kubernetes = "^27.2.0"

[tool.poetry.group.dev.dependencies]
Expand All @@ -17,7 +17,7 @@ flake8 = "^6.1.0"
coverage = "^7.3.1"
pytest = "^7.4.1"
snakemake = {git = "https://github.com/snakemake/snakemake.git"}
snakemake-storage-plugin-s3 = "^0.2.0"
snakemake-storage-plugin-s3 = "^0.2.4"

[tool.coverage.run]
omit = [".*", "*/site-packages/*", "Snakefile"]
Expand Down
111 changes: 11 additions & 100 deletions snakemake_executor_plugin_kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import base64
from dataclasses import dataclass, field
import os
import shlex
import subprocess
import time
Expand Down Expand Up @@ -93,7 +92,7 @@ def __post_init__(self):
self.kubeapi = kubernetes.client.CoreV1Api()
self.batchapi = kubernetes.client.BatchV1Api()
self.namespace = self.workflow.executor_settings.namespace
self.envvars = self.workflow.envvars
self.envvars = self.workflow.spawned_job_args_factory.envvars()
self.secret_files = {}
self.run_namespace = str(uuid.uuid4())
self.secret_envvars = {}
Expand All @@ -113,6 +112,8 @@ def run_job(self, job: JobExecutorInterface):
# snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo.

exec_job = self.format_job_exec(job)
exec_job = "echo $SNAKEMAKE_STORAGE_S3_SECRET_KEY && " + exec_job
self.logger.debug(f"Executing job: {exec_job}")

# Kubernetes silently does not submit a job if the name is too long
# therefore, we ensure that it is not longer than snakejob+uuid.
Expand All @@ -133,7 +134,6 @@ def run_job(self, job: JobExecutorInterface):
container.working_dir = "/workdir"
container.volume_mounts = [
kubernetes.client.V1VolumeMount(name="workdir", mount_path="/workdir"),
kubernetes.client.V1VolumeMount(name="source", mount_path="/source"),
]

node_selector = {}
Expand All @@ -153,33 +153,9 @@ def run_job(self, job: JobExecutorInterface):
# fail on first error
body.spec.restart_policy = "Never"

# source files as a secret volume
# we copy these files to the workdir before executing Snakemake
too_large = [
path
for path in self.secret_files.values()
if os.path.getsize(path) > 1000000
]
if too_large:
raise WorkflowError(
"The following source files exceed the maximum "
"file size (1MB) that can be passed from host to "
"kubernetes. These are likely not source code "
"files. Consider adding them to your "
"remote storage instead or (if software) use "
"Conda packages or container images:\n{}".format("\n".join(too_large))
)
secret_volume = kubernetes.client.V1Volume(name="source")
secret_volume.secret = kubernetes.client.V1SecretVolumeSource()
secret_volume.secret.secret_name = self.run_namespace
secret_volume.secret.items = [
kubernetes.client.V1KeyToPath(key=key, path=path)
for key, path in self.secret_files.items()
]
# workdir as an emptyDir volume of undefined size
workdir_volume = kubernetes.client.V1Volume(name="workdir")
workdir_volume.empty_dir = kubernetes.client.V1EmptyDirVolumeSource()
body.spec.volumes = [secret_volume, workdir_volume]
body.spec.volumes = [workdir_volume]

# env vars
container.env = []
Expand Down Expand Up @@ -262,8 +238,8 @@ async def check_active_jobs(
#
# async with self.status_rate_limiter:
# # query remote middleware here
import kubernetes

self.logger.debug(f"Checking status of {len(active_jobs)} jobs")
for j in active_jobs:
async with self.status_rate_limiter:
try:
Expand Down Expand Up @@ -328,9 +304,6 @@ def shutdown(self):
self.unregister_secret()
super().shutdown()

def get_job_exec_prefix(self, job: JobExecutorInterface):
return "cp -rf /source/. ."

def register_secret(self):
import kubernetes.client

Expand All @@ -340,84 +313,22 @@ def register_secret(self):
secret.metadata.name = self.run_namespace
secret.type = "Opaque"
secret.data = {}
for i, f in enumerate(self.dag.get_sources()):
if f.startswith(".."):
self.logger.warning(
"Ignoring source file {}. Only files relative "
"to the working directory are allowed.".format(f)
)
continue

# The kubernetes API can't create secret files larger than 1MB.
source_file_size = os.path.getsize(f)
max_file_size = 1048576
if source_file_size > max_file_size:
self.logger.warning(
"Skipping the source file {f}. Its size {source_file_size} exceeds "
"the maximum file size (1MB) that can be passed "
"from host to kubernetes.".format(
f=f, source_file_size=source_file_size
)
)
continue

with open(f, "br") as content:
key = f"f{i}"

# Some files are smaller than 1MB, but grows larger after being
# base64 encoded.
# We should exclude them as well, otherwise Kubernetes APIs will
# complain.
encoded_contents = base64.b64encode(content.read()).decode()
encoded_size = len(encoded_contents)
if encoded_size > 1048576:
self.logger.warning(
"Skipping the source file {f} for secret key {key}. "
"Its base64 encoded size {encoded_size} exceeds "
"the maximum file size (1MB) that can be passed "
"from host to kubernetes.".format(
f=f,
key=key,
encoded_size=encoded_size,
)
)
continue

self.secret_files[key] = f
secret.data[key] = encoded_contents

for e in self.envvars:
try:
key = e.lower()
secret.data[key] = base64.b64encode(os.environ[e].encode()).decode()
self.secret_envvars[key] = e
except KeyError:
continue
for name, value in self.envvars.items():
key = name.lower()
secret.data[key] = base64.b64encode(value.encode()).decode()
self.secret_envvars[key] = name

# Test if the total size of the configMap exceeds 1MB
config_map_size = sum(
[len(base64.b64decode(v)) for k, v in secret.data.items()]
)
if config_map_size > 1048576:
self.logger.warning(
raise WorkflowError(
"The total size of the included files and other Kubernetes secrets "
"is {}, exceeding the 1MB limit.\n".format(config_map_size)
)
self.logger.warning(
"The following are the largest files. Consider removing some of them "
"(you need remove at least {} bytes):".format(config_map_size - 1048576)
f"is {config_map_size}, exceeding the 1MB limit.\n"
)

entry_sizes = {
self.secret_files[k]: len(base64.b64decode(v))
for k, v in secret.data.items()
if k in self.secret_files
}
for k, v in sorted(entry_sizes.items(), key=lambda item: item[1])[:-6:-1]:
self.logger.warning(f" * File: {k}, original size: {v}")

raise WorkflowError("ConfigMap too large")

self.kubeapi.create_namespaced_secret(self.namespace, secret)

def unregister_secret(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_remote_execution_settings(
self,
) -> snakemake.settings.RemoteExecutionSettings:
return snakemake.settings.RemoteExecutionSettings(
seconds_between_status_checks=1,
seconds_between_status_checks=10,
envvars=self.get_envvars(),
# TODO remove once we have switched to stable snakemake for dev
container_image="snakemake/snakemake:latest",
Expand Down

0 comments on commit 5914f33

Please sign in to comment.