diff --git a/podman_compose.py b/podman_compose.py index 3295bc32..211a41a2 100755 --- a/podman_compose.py +++ b/podman_compose.py @@ -1156,31 +1156,39 @@ def flat_deps(services, with_extends=False): # podman and compose classes ################### +class Pool: + def __init__(self, parallel): + self.semaphore = asyncio.Semaphore(parallel) if isinstance(parallel, int) else parallel + self.tasks = [] + + async def join(self): + await asyncio.gather(*self.tasks) + + class Podman: - def __init__(self, compose, podman_path="podman", dry_run=False, semaphore: asyncio.Semaphore = asyncio.Semaphore(sys.maxsize)): + def __init__(self, compose, podman_path="podman", dry_run=False): self.compose = compose + self.semaphore = compose.semaphore self.podman_path = podman_path self.dry_run = dry_run - self.semaphore = semaphore async def output(self, podman_args, cmd="", cmd_args=None): - async with self.semaphore: - cmd_args = cmd_args or [] - xargs = self.compose.get_podman_args(cmd) if cmd else [] - cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args - log(cmd_ls) - p = await asyncio.subprocess.create_subprocess_exec( - *cmd_ls, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) + cmd_args = cmd_args or [] + xargs = self.compose.get_podman_args(cmd) if cmd else [] + cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args + log(cmd_ls) + p = await asyncio.subprocess.create_subprocess_exec( + *cmd_ls, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) - stdout_data, stderr_data = await p.communicate() - if p.returncode == 0: - return stdout_data - else: - raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data) + stdout_data, stderr_data = await p.communicate() + if p.returncode == 0: + return stdout_data + else: + raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data) def exec( self, @@ -1192,68 +1200,54 @@ def exec( xargs = self.compose.get_podman_args(cmd) if cmd else [] cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args log(" ".join([str(i) for i in cmd_ls])) + # this replaces current process no need any async stuff os.execlp(self.podman_path, *cmd_ls) - async def run( + async def run_bg( self, podman_args, cmd="", cmd_args=None, - log_formatter=None, *, # Intentionally mutable default argument to hold references to tasks task_reference=set() ) -> int: async with self.semaphore: - cmd_args = list(map(str, cmd_args or [])) - xargs = self.compose.get_podman_args(cmd) if cmd else [] - cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args - log(" ".join([str(i) for i in cmd_ls])) - if self.dry_run: - return None - - if log_formatter is not None: - - async def format_out(stdout): - while True: - l = await stdout.readline() - if l: - print(log_formatter, l.decode('utf-8'), end='') - if stdout.at_eof(): - break - - p = await asyncio.subprocess.create_subprocess_exec( - *cmd_ls, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) # pylint: disable=consider-using-with - - # This is hacky to make the tasks not get garbage collected - # https://github.com/python/cpython/issues/91887 - out_t = asyncio.create_task(format_out(p.stdout)) - task_reference.add(out_t) - out_t.add_done_callback(task_reference.discard) - - err_t = asyncio.create_task(format_out(p.stderr)) - task_reference.add(err_t) - err_t.add_done_callback(task_reference.discard) + return self.run(podman_args, cmd, cmd_args, task_reference=set()) - else: - p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with + async def run( + self, + podman_args, + cmd="", + cmd_args=None, + *, + # Intentionally mutable default argument to hold references to tasks + task_reference=set() + ) -> int: + cmd_args = list(map(str, cmd_args or [])) + xargs = self.compose.get_podman_args(cmd) if cmd else [] + cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args + log(" ".join([str(i) for i in cmd_ls])) + if self.dry_run: + return None + p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with + + try: + exit_code = await p.wait() + except asyncio.CancelledError as e: + log(f"Sending termination signal") + p.terminate() try: - exit_code = await p.wait() - except asyncio.CancelledError as e: - log(f"Sending termination signal") - p.terminate() - try: - async with asyncio.timeout(10): - exit_code = await p.wait() - except TimeoutError: - log(f"container did not shut down after 10 seconds, killing") - p.kill() + async with asyncio.timeout(10): exit_code = await p.wait() + except TimeoutError: + log(f"container did not shut down after 10 seconds, killing") + p.kill() + exit_code = await p.wait() - log(f"exit code: {exit_code}") - return exit_code + log(f"exit code: {exit_code}") + return exit_code async def volume_ls(self, proj=None): if not proj: @@ -1468,6 +1462,7 @@ def dotenv_to_dict(dotenv_path): class PodmanCompose: def __init__(self): + self.semaphore = None self.podman = None self.podman_version = None self.environ = {} @@ -1529,7 +1524,8 @@ async def run(self): if args.dry_run is False: log(f"Binary {podman_path} has not been found.") sys.exit(1) - self.podman = Podman(self, podman_path, args.dry_run, asyncio.Semaphore(args.parallel)) + self.semaphore = asyncio.Semaphore(args.parallel) + self.podman = Podman(self, podman_path, args.dry_run) if not args.dry_run: # just to make sure podman is running @@ -1994,7 +1990,7 @@ async def compose_version(compose, args): print(json.dumps(res)) return print("podman-compose version", __version__) - await compose.podman.run(["--version"], "", []) + await compose.podman.exec(["--version"], "", []) def is_local(container: dict) -> bool: @@ -2242,19 +2238,13 @@ def get_excluded(compose, args): log("** excluding: ", excluded) return excluded - @cmd_run( - podman_compose, "up", "Create and start the entire stack or some of its services" + podman_compose, "create", "Create the containers but do not start them" ) -async def compose_up(compose: PodmanCompose, args): +async def compose_create(compose: PodmanCompose, args): proj_name = compose.project_name excluded = get_excluded(compose, args) - if not args.no_build: - # `podman build` does not cache, so don't always build - build_args = argparse.Namespace(if_not_exists=(not args.build), **args.__dict__) - if await compose.commands["build"](compose, build_args) != 0: - log("Build command failed") - + # args.no_recreate disables check for changes (which is not implemented) hashes = ( (await compose.podman.output( [], @@ -2276,9 +2266,7 @@ async def compose_up(compose: PodmanCompose, args): down_args = argparse.Namespace(**dict(args.__dict__, volumes=False)) await compose.commands["down"](compose, down_args) log("recreating: done\n\n") - # args.no_recreate disables check for changes (which is not implemented) - - podman_command = "run" if args.detach and not args.no_start else "create" + podman_command = "create" await create_pods(compose, args) for cnt in compose.containers: @@ -2287,10 +2275,32 @@ async def compose_up(compose: PodmanCompose, args): continue podman_args = await container_to_args(compose, cnt, detached=args.detach) subproc = await compose.podman.run([], podman_command, podman_args) - if podman_command == "run" and subproc is not None: - await compose.podman.run([], "start", [cnt["name"]]) - if args.no_start or args.detach or args.dry_run: + + +@cmd_run( + podman_compose, "up", "Create and start the entire stack or some of its services" +) +async def compose_up(compose: PodmanCompose, args): + proj_name = compose.project_name + excluded = get_excluded(compose, args) + if not args.no_build: + # `podman build` does not cache, so don't always build + build_args = argparse.Namespace(if_not_exists=(not args.build), **args.__dict__) + if await compose.commands["build"](compose, build_args) != 0: + log("Build command failed") + + await compose.commands["create"](compose, args) + + if args.no_start or args.dry_run: return + if compose.pods: + pod_name = compose.pods[0]["name"] + await compose.podman.run([], "pod", ["start", pod_name]) + # TODO: use run and do a loop to capture exit_code_from using podman wait + await compose.podman.run([], "pod", ["logs", "--color", "-n", "-f", pod_name]) + await compose.commands["stop"](compose, argparse.Namespace(services=[])) + return 0 + # TODO: handle already existing # TODO: if error creating do not enter loop # TODO: colors if sys.stdout.isatty() @@ -2311,20 +2321,13 @@ async def compose_up(compose: PodmanCompose, args): loop.add_signal_handler(signal.SIGINT, lambda: [t.cancel("User exit") for t in tasks]) for i, cnt in enumerate(compose.containers): - # Add colored service prefix to output by piping output through sed - color_idx = i % len(compose.console_colors) - color = compose.console_colors[color_idx] - space_suffix = " " * (max_service_length - len(cnt["_service"]) + 1) - log_formatter = "{}[{}]{}|\x1B[0m".format( - color, cnt["_service"], space_suffix - ) if cnt["_service"] in excluded: log("** skipping: ", cnt["name"]) continue tasks.add( asyncio.create_task( - compose.podman.run([], "start", ["-a", cnt["name"]], log_formatter=log_formatter), + compose.podman.run([], "start", ["-a", cnt["name"]]), name=cnt["_service"] ) ) @@ -2469,7 +2472,6 @@ async def compose_run(compose, args): no_start=False, no_cache=False, build_arg=[], - parallel=1, remove_orphans=True ) )