Skip to content

Commit

Permalink
#810: some async work
Browse files Browse the repository at this point in the history
  • Loading branch information
muayyad-alsadi committed Feb 2, 2024
1 parent 8d8fa54 commit 82bcbba
Showing 1 changed file with 91 additions and 89 deletions.
180 changes: 91 additions & 89 deletions podman_compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -1156,31 +1156,39 @@ def flat_deps(services, with_extends=False):
# podman and compose classes
###################

class Pool:
def __init__(self, parallel):
self.semaphore = asyncio.Semaphore(parallel) if isinstance(parallel, int) else parallel
self.tasks = []

async def join(self):
await asyncio.gather(*self.tasks)



class Podman:
def __init__(self, compose, podman_path="podman", dry_run=False, semaphore: asyncio.Semaphore = asyncio.Semaphore(sys.maxsize)):
def __init__(self, compose, podman_path="podman", dry_run=False):
self.compose = compose
self.semaphore = compose.semaphore
self.podman_path = podman_path
self.dry_run = dry_run
self.semaphore = semaphore

async def output(self, podman_args, cmd="", cmd_args=None):
async with self.semaphore:
cmd_args = cmd_args or []
xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
log(cmd_ls)
p = await asyncio.subprocess.create_subprocess_exec(
*cmd_ls,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
cmd_args = cmd_args or []
xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
log(cmd_ls)
p = await asyncio.subprocess.create_subprocess_exec(
*cmd_ls,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)

stdout_data, stderr_data = await p.communicate()
if p.returncode == 0:
return stdout_data
else:
raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data)
stdout_data, stderr_data = await p.communicate()
if p.returncode == 0:
return stdout_data
else:
raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data)

def exec(
self,
Expand All @@ -1192,68 +1200,54 @@ def exec(
xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
log(" ".join([str(i) for i in cmd_ls]))
# this replaces current process no need any async stuff
os.execlp(self.podman_path, *cmd_ls)

async def run(
async def run_bg(
self,
podman_args,
cmd="",
cmd_args=None,
log_formatter=None,
*,
# Intentionally mutable default argument to hold references to tasks
task_reference=set()
) -> int:
async with self.semaphore:
cmd_args = list(map(str, cmd_args or []))
xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
log(" ".join([str(i) for i in cmd_ls]))
if self.dry_run:
return None

if log_formatter is not None:

async def format_out(stdout):
while True:
l = await stdout.readline()
if l:
print(log_formatter, l.decode('utf-8'), end='')
if stdout.at_eof():
break

p = await asyncio.subprocess.create_subprocess_exec(
*cmd_ls, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
) # pylint: disable=consider-using-with

# This is hacky to make the tasks not get garbage collected
# https://github.com/python/cpython/issues/91887
out_t = asyncio.create_task(format_out(p.stdout))
task_reference.add(out_t)
out_t.add_done_callback(task_reference.discard)

err_t = asyncio.create_task(format_out(p.stderr))
task_reference.add(err_t)
err_t.add_done_callback(task_reference.discard)
return self.run(podman_args, cmd, cmd_args, task_reference=set())

else:
p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with
async def run(
self,
podman_args,
cmd="",
cmd_args=None,
*,
# Intentionally mutable default argument to hold references to tasks
task_reference=set()
) -> int:
cmd_args = list(map(str, cmd_args or []))
xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
log(" ".join([str(i) for i in cmd_ls]))
if self.dry_run:
return None

p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with

try:
exit_code = await p.wait()
except asyncio.CancelledError as e:
log(f"Sending termination signal")
p.terminate()
try:
exit_code = await p.wait()
except asyncio.CancelledError as e:
log(f"Sending termination signal")
p.terminate()
try:
async with asyncio.timeout(10):
exit_code = await p.wait()
except TimeoutError:
log(f"container did not shut down after 10 seconds, killing")
p.kill()
async with asyncio.timeout(10):
exit_code = await p.wait()
except TimeoutError:
log(f"container did not shut down after 10 seconds, killing")
p.kill()
exit_code = await p.wait()

log(f"exit code: {exit_code}")
return exit_code
log(f"exit code: {exit_code}")
return exit_code

async def volume_ls(self, proj=None):
if not proj:
Expand Down Expand Up @@ -1468,6 +1462,7 @@ def dotenv_to_dict(dotenv_path):

class PodmanCompose:
def __init__(self):
self.semaphore = None
self.podman = None
self.podman_version = None
self.environ = {}
Expand Down Expand Up @@ -1529,7 +1524,8 @@ async def run(self):
if args.dry_run is False:
log(f"Binary {podman_path} has not been found.")
sys.exit(1)
self.podman = Podman(self, podman_path, args.dry_run, asyncio.Semaphore(args.parallel))
self.semaphore = asyncio.Semaphore(args.parallel)
self.podman = Podman(self, podman_path, args.dry_run)

if not args.dry_run:
# just to make sure podman is running
Expand Down Expand Up @@ -1994,7 +1990,7 @@ async def compose_version(compose, args):
print(json.dumps(res))
return
print("podman-compose version", __version__)
await compose.podman.run(["--version"], "", [])
await compose.podman.exec(["--version"], "", [])


def is_local(container: dict) -> bool:
Expand Down Expand Up @@ -2242,19 +2238,13 @@ def get_excluded(compose, args):
log("** excluding: ", excluded)
return excluded


@cmd_run(
podman_compose, "up", "Create and start the entire stack or some of its services"
podman_compose, "create", "Create the containers but do not start them"
)
async def compose_up(compose: PodmanCompose, args):
async def compose_create(compose: PodmanCompose, args):
proj_name = compose.project_name
excluded = get_excluded(compose, args)
if not args.no_build:
# `podman build` does not cache, so don't always build
build_args = argparse.Namespace(if_not_exists=(not args.build), **args.__dict__)
if await compose.commands["build"](compose, build_args) != 0:
log("Build command failed")

# args.no_recreate disables check for changes (which is not implemented)
hashes = (
(await compose.podman.output(
[],
Expand All @@ -2276,9 +2266,7 @@ async def compose_up(compose: PodmanCompose, args):
down_args = argparse.Namespace(**dict(args.__dict__, volumes=False))
await compose.commands["down"](compose, down_args)
log("recreating: done\n\n")
# args.no_recreate disables check for changes (which is not implemented)

podman_command = "run" if args.detach and not args.no_start else "create"
podman_command = "create"

await create_pods(compose, args)
for cnt in compose.containers:
Expand All @@ -2287,10 +2275,32 @@ async def compose_up(compose: PodmanCompose, args):
continue
podman_args = await container_to_args(compose, cnt, detached=args.detach)
subproc = await compose.podman.run([], podman_command, podman_args)
if podman_command == "run" and subproc is not None:
await compose.podman.run([], "start", [cnt["name"]])
if args.no_start or args.detach or args.dry_run:


@cmd_run(
podman_compose, "up", "Create and start the entire stack or some of its services"
)
async def compose_up(compose: PodmanCompose, args):
proj_name = compose.project_name
excluded = get_excluded(compose, args)
if not args.no_build:
# `podman build` does not cache, so don't always build
build_args = argparse.Namespace(if_not_exists=(not args.build), **args.__dict__)
if await compose.commands["build"](compose, build_args) != 0:
log("Build command failed")

await compose.commands["create"](compose, args)

if args.no_start or args.dry_run:
return
if compose.pods:
pod_name = compose.pods[0]["name"]
await compose.podman.run([], "pod", ["start", pod_name])
# TODO: use run and do a loop to capture exit_code_from using podman wait
await compose.podman.run([], "pod", ["logs", "--color", "-n", "-f", pod_name])
await compose.commands["stop"](compose, argparse.Namespace(services=[]))
return 0

# TODO: handle already existing
# TODO: if error creating do not enter loop
# TODO: colors if sys.stdout.isatty()
Expand All @@ -2311,20 +2321,13 @@ async def compose_up(compose: PodmanCompose, args):
loop.add_signal_handler(signal.SIGINT, lambda: [t.cancel("User exit") for t in tasks])

for i, cnt in enumerate(compose.containers):
# Add colored service prefix to output by piping output through sed
color_idx = i % len(compose.console_colors)
color = compose.console_colors[color_idx]
space_suffix = " " * (max_service_length - len(cnt["_service"]) + 1)
log_formatter = "{}[{}]{}|\x1B[0m".format(
color, cnt["_service"], space_suffix
)
if cnt["_service"] in excluded:
log("** skipping: ", cnt["name"])
continue

tasks.add(
asyncio.create_task(
compose.podman.run([], "start", ["-a", cnt["name"]], log_formatter=log_formatter),
compose.podman.run([], "start", ["-a", cnt["name"]]),
name=cnt["_service"]
)
)
Expand Down Expand Up @@ -2469,7 +2472,6 @@ async def compose_run(compose, args):
no_start=False,
no_cache=False,
build_arg=[],
parallel=1,
remove_orphans=True
)
)
Expand Down

0 comments on commit 82bcbba

Please sign in to comment.