Skip to content

Commit

Permalink
Merge pull request #136 from jeromekelleher/distributed-encode
Browse files Browse the repository at this point in the history
Distributed encode
  • Loading branch information
jeromekelleher authored Apr 24, 2024
2 parents fa2162d + 7835d3f commit e6b9a19
Show file tree
Hide file tree
Showing 7 changed files with 706 additions and 297 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# 0.0.6 2024-04-xx
# 0.0.6 2024-04-24

- Only use NOSHUFFLE by default on ``call_genotype`` and bool arrays.
- Add initial implementation of distributed encode

# 0.0.5 2024-04-17

Expand Down
149 changes: 125 additions & 24 deletions bio2zarr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import click
import coloredlogs
import humanfriendly
import numcodecs
import tabulate

Expand Down Expand Up @@ -39,6 +40,14 @@ def list_commands(self, ctx):
"zarr_path", type=click.Path(file_okay=False, dir_okay=True)
)

zarr_path = click.argument(
"zarr_path", type=click.Path(exists=True, file_okay=False, dir_okay=True)
)

num_partitions = click.argument("num_partitions", type=click.IntRange(min=1))

partition = click.argument("partition", type=click.IntRange(min=0))

verbose = click.option("-v", "--verbose", count=True, help="Increase verbosity")

force = click.option(
Expand Down Expand Up @@ -92,6 +101,27 @@ def list_commands(self, ctx):
help="Chunk size in the samples dimension",
)

schema = click.option("-s", "--schema", default=None, type=click.Path(exists=True))

max_variant_chunks = click.option(
"-V",
"--max-variant-chunks",
type=int,
default=None,
help=(
"Truncate the output in the variants dimension to have "
"this number of chunks. Mainly intended to help with "
"schema tuning."
),
)

max_memory = click.option(
"-M",
"--max-memory",
default=None,
help="An approximate bound on overall memory usage (e.g. 10G),",
)


def setup_logging(verbosity):
level = "WARNING"
Expand Down Expand Up @@ -158,7 +188,7 @@ def explode(
@click.command
@vcfs
@new_icf_path
@click.argument("num_partitions", type=click.IntRange(min=1))
@num_partitions
@force
@column_chunk_size
@compressor
Expand Down Expand Up @@ -194,7 +224,7 @@ def dexplode_init(

@click.command
@icf_path
@click.argument("partition", type=click.IntRange(min=0))
@partition
@verbose
def dexplode_partition(icf_path, partition, verbose):
"""
Expand All @@ -207,14 +237,14 @@ def dexplode_partition(icf_path, partition, verbose):


@click.command
@click.argument("path", type=click.Path(), required=True)
@icf_path
@verbose
def dexplode_finalise(path, verbose):
def dexplode_finalise(icf_path, verbose):
"""
Final step for distributed conversion of VCF(s) to intermediate columnar format.
"""
setup_logging(verbose)
vcf.explode_finalise(path)
vcf.explode_finalise(icf_path)


@click.command
Expand Down Expand Up @@ -244,26 +274,11 @@ def mkschema(icf_path):
@new_zarr_path
@force
@verbose
@click.option("-s", "--schema", default=None, type=click.Path(exists=True))
@schema
@variants_chunk_size
@samples_chunk_size
@click.option(
"-V",
"--max-variant-chunks",
type=int,
default=None,
help=(
"Truncate the output in the variants dimension to have "
"this number of chunks. Mainly intended to help with "
"schema tuning."
),
)
@click.option(
"-M",
"--max-memory",
default=None,
help="An approximate bound on overall memory usage (e.g. 10G),",
)
@max_variant_chunks
@max_memory
@worker_processes
def encode(
icf_path,
Expand All @@ -288,13 +303,96 @@ def encode(
schema_path=schema,
variants_chunk_size=variants_chunk_size,
samples_chunk_size=samples_chunk_size,
max_v_chunks=max_variant_chunks,
max_variant_chunks=max_variant_chunks,
worker_processes=worker_processes,
max_memory=max_memory,
show_progress=True,
)


@click.command
@icf_path
@new_zarr_path
@num_partitions
@force
@schema
@variants_chunk_size
@samples_chunk_size
@max_variant_chunks
@verbose
def dencode_init(
icf_path,
zarr_path,
num_partitions,
force,
schema,
variants_chunk_size,
samples_chunk_size,
max_variant_chunks,
verbose,
):
"""
Initialise conversion of intermediate format to VCF Zarr. This will
set up the specified ZARR_PATH to perform this conversion over
NUM_PARTITIONS.
The output of this commmand is the actual number of partitions generated
(which may be less then the requested number, if there is not sufficient
chunks in the variants dimension) and a rough lower-bound on the amount
of memory required to encode a partition.
NOTE: the format of this output will likely change in subsequent releases;
it should not be considered machine-readable for now.
"""
setup_logging(verbose)
check_overwrite_dir(zarr_path, force)
num_partitions, max_memory = vcf.encode_init(
icf_path,
zarr_path,
target_num_partitions=num_partitions,
schema_path=schema,
variants_chunk_size=variants_chunk_size,
samples_chunk_size=samples_chunk_size,
max_variant_chunks=max_variant_chunks,
show_progress=True,
)
formatted_size = humanfriendly.format_size(max_memory, binary=True)
# NOTE adding the size to the stdout here so that users can parse it
# and use in their submission scripts. This is a first pass, and
# will most likely change as we see what works and doesn't.
# NOTE we probably want to format this as a table, which lists
# some other properties, line by line
# NOTE This size number is also not quite enough, you need a bit of
# headroom with it (probably 10% or so). We should include this.
click.echo(f"{num_partitions}\t{formatted_size}")


@click.command
@zarr_path
@partition
@verbose
def dencode_partition(zarr_path, partition, verbose):
"""
Convert a partition from intermediate columnar format to VCF Zarr.
Must be called *after* the Zarr path has been initialised with dencode_init.
Partition indexes must be from 0 (inclusive) to the number of paritions
returned by dencode_init (exclusive).
"""
setup_logging(verbose)
vcf.encode_partition(zarr_path, partition)


@click.command
@zarr_path
@verbose
def dencode_finalise(zarr_path, verbose):
"""
Final step for distributed conversion of ICF to VCF Zarr.
"""
setup_logging(verbose)
vcf.encode_finalise(zarr_path, show_progress=True)


@click.command(name="convert")
@vcfs
@new_zarr_path
Expand Down Expand Up @@ -382,6 +480,9 @@ def vcf2zarr():
vcf2zarr.add_command(dexplode_init)
vcf2zarr.add_command(dexplode_partition)
vcf2zarr.add_command(dexplode_finalise)
vcf2zarr.add_command(dencode_init)
vcf2zarr.add_command(dencode_partition)
vcf2zarr.add_command(dencode_finalise)


@click.command(name="convert")
Expand Down
6 changes: 3 additions & 3 deletions bio2zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def flush(self):
sync_flush_2d_array(
self.buff[: self.buffer_row], self.array, self.array_offset
)
# FIXME the array.name doesn't seem to be working here for some reason
logger.debug(
f"Flushed <{self.array.name} {self.array.shape} "
f"{self.array.dtype}> "
Expand All @@ -131,8 +132,7 @@ def sync_flush_2d_array(np_buffer, zarr_array, offset):
# encoder implementations.
s = slice(offset, offset + np_buffer.shape[0])
samples_chunk_size = zarr_array.chunks[1]
# TODO use zarr chunks here to support non-uniform chunking later
# and for simplicity
# TODO use zarr chunks here for simplicity
zarr_array_width = zarr_array.shape[1]
start = 0
while start < zarr_array_width:
Expand Down Expand Up @@ -192,7 +192,7 @@ def __init__(self, worker_processes=1, progress_config=None):
self.progress_config = progress_config
self.progress_bar = tqdm.tqdm(
total=progress_config.total,
desc=f"{progress_config.title:>7}",
desc=f"{progress_config.title:>8}",
unit_scale=True,
unit=progress_config.units,
smoothing=0.1,
Expand Down
Loading

0 comments on commit e6b9a19

Please sign in to comment.