From f5633deb950c039d3016099c7dca914c584efa86 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Thu, 25 Apr 2024 22:17:14 +0100 Subject: [PATCH 1/5] Simplify VcfWriterMetadata and test version mismatch --- bio2zarr/vcf.py | 50 ++++++++++++++++++++--------------------------- tests/test_vcf.py | 10 ++++++++++ 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index 89aa9d2..d823a12 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -1548,10 +1548,8 @@ def parse_max_memory(max_memory): @dataclasses.dataclass class VcfZarrPartition: - start_index: int - stop_index: int - start_chunk: int - stop_chunk: int + start: int + stop: int @staticmethod def generate_partitions(num_records, chunk_size, num_partitions, max_chunks=None): @@ -1565,9 +1563,7 @@ def generate_partitions(num_records, chunk_size, num_partitions, max_chunks=None stop_chunk = int(chunk_slice[-1]) + 1 start_index = start_chunk * chunk_size stop_index = min(stop_chunk * chunk_size, num_records) - partitions.append( - VcfZarrPartition(start_index, stop_index, start_chunk, stop_chunk) - ) + partitions.append(VcfZarrPartition(start_index, stop_index)) return partitions @@ -1590,7 +1586,7 @@ def asdict(self): def fromdict(d): if d["format_version"] != VZW_METADATA_FORMAT_VERSION: raise ValueError( - "VcfZarrWriter format version mismatch: " + "VcfZarrWriter format version mismatch: " f"{d['format_version']} != {VZW_METADATA_FORMAT_VERSION}" ) ret = VcfZarrWriterMetadata(**d) @@ -1675,7 +1671,7 @@ def init( root = zarr.group(store=store) for column in self.schema.columns.values(): - self.init_array(root, column, partitions[-1].stop_index) + self.init_array(root, column, partitions[-1].stop) logger.info("Writing WIP metadata") with open(self.wip_path / "metadata.json", "w") as f: @@ -1809,13 +1805,11 @@ def encode_array_partition(self, column, partition_index): array = self.init_partition_array(partition_index, column.name) partition = self.metadata.partitions[partition_index] - ba = core.BufferedArray(array, partition.start_index) + ba = core.BufferedArray(array, partition.start) source_col = self.icf.columns[column.vcf_field] sanitiser = source_col.sanitiser_factory(ba.buff.shape) - for value in source_col.iter_values( - partition.start_index, partition.stop_index - ): + for value in source_col.iter_values(partition.start, partition.stop): # We write directly into the buffer in the sanitiser function # to make it easier to reason about dimension padding j = ba.next_buffer_row() @@ -1831,14 +1825,12 @@ def encode_genotypes_partition(self, partition_index): ) partition = self.metadata.partitions[partition_index] - gt = core.BufferedArray(gt_array, partition.start_index) - gt_mask = core.BufferedArray(gt_mask_array, partition.start_index) - gt_phased = core.BufferedArray(gt_phased_array, partition.start_index) + gt = core.BufferedArray(gt_array, partition.start) + gt_mask = core.BufferedArray(gt_mask_array, partition.start) + gt_phased = core.BufferedArray(gt_phased_array, partition.start) source_col = self.icf.columns["FORMAT/GT"] - for value in source_col.iter_values( - partition.start_index, partition.stop_index - ): + for value in source_col.iter_values(partition.start, partition.stop): j = gt.next_buffer_row() sanitise_value_int_2d(gt.buff, j, value[:, :-1]) j = gt_phased.next_buffer_row() @@ -1859,13 +1851,13 @@ def encode_alleles_partition(self, partition_index): array_name = "variant_allele" alleles_array = self.init_partition_array(partition_index, array_name) partition = self.metadata.partitions[partition_index] - alleles = core.BufferedArray(alleles_array, partition.start_index) + alleles = core.BufferedArray(alleles_array, partition.start) ref_col = self.icf.columns["REF"] alt_col = self.icf.columns["ALT"] for ref, alt in zip( - ref_col.iter_values(partition.start_index, partition.stop_index), - alt_col.iter_values(partition.start_index, partition.stop_index), + ref_col.iter_values(partition.start, partition.stop), + alt_col.iter_values(partition.start, partition.stop), ): j = alleles.next_buffer_row() alleles.buff[j, :] = STR_FILL @@ -1879,11 +1871,11 @@ def encode_id_partition(self, partition_index): vid_array = self.init_partition_array(partition_index, "variant_id") vid_mask_array = self.init_partition_array(partition_index, "variant_id_mask") partition = self.metadata.partitions[partition_index] - vid = core.BufferedArray(vid_array, partition.start_index) - vid_mask = core.BufferedArray(vid_mask_array, partition.start_index) + vid = core.BufferedArray(vid_array, partition.start) + vid_mask = core.BufferedArray(vid_mask_array, partition.start) col = self.icf.columns["ID"] - for value in col.iter_values(partition.start_index, partition.stop_index): + for value in col.iter_values(partition.start, partition.stop): j = vid.next_buffer_row() k = vid_mask.next_buffer_row() assert j == k @@ -1904,10 +1896,10 @@ def encode_filters_partition(self, partition_index): array_name = "variant_filter" array = self.init_partition_array(partition_index, array_name) partition = self.metadata.partitions[partition_index] - var_filter = core.BufferedArray(array, partition.start_index) + var_filter = core.BufferedArray(array, partition.start) col = self.icf.columns["FILTERS"] - for value in col.iter_values(partition.start_index, partition.stop_index): + for value in col.iter_values(partition.start, partition.stop): j = var_filter.next_buffer_row() var_filter.buff[j] = False for f in value: @@ -1926,10 +1918,10 @@ def encode_contig_partition(self, partition_index): array_name = "variant_contig" array = self.init_partition_array(partition_index, array_name) partition = self.metadata.partitions[partition_index] - contig = core.BufferedArray(array, partition.start_index) + contig = core.BufferedArray(array, partition.start) col = self.icf.columns["CHROM"] - for value in col.iter_values(partition.start_index, partition.stop_index): + for value in col.iter_values(partition.start, partition.stop): j = contig.next_buffer_row() # Note: because we are using the indexes to define the lookups # and we always have an index, it seems that we the contig lookup diff --git a/tests/test_vcf.py b/tests/test_vcf.py index dfd690e..c5c1837 100644 --- a/tests/test_vcf.py +++ b/tests/test_vcf.py @@ -98,6 +98,16 @@ def test_exploded_metadata_mismatch(self, tmpdir, icf_path, version): ): vcf.IcfMetadata.fromdict(d) + @pytest.mark.parametrize("version", ["0.0", "1.0", "xxxxx", 0.1]) + def test_encode_metadata_mismatch(self, tmpdir, icf_path, version): + zarr_path = tmpdir / "zarr" + vcf.encode_init(icf_path, zarr_path, 1) + with open(zarr_path / "wip" / "metadata.json") as f: + d = json.load(f) + d["format_version"] = version + with pytest.raises(ValueError, match="VcfZarrWriter format version mismatch"): + vcf.VcfZarrWriterMetadata.fromdict(d) + class TestEncodeDimensionSeparator: @pytest.mark.parametrize("dimension_separator", [None, "/"]) From 9d1e35b3950f72c18447b98b61826095aec945e4 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Thu, 25 Apr 2024 23:31:39 +0100 Subject: [PATCH 2/5] Simplify distributed encode Closes #138 Closes #139 --- CHANGELOG.md | 5 ++ bio2zarr/vcf.py | 40 +++++++++++----- tests/test_vcf.py | 116 +++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 148 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42f79f4..3dbf094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 0.0.7 2024-04-xx +- Change on-disk format of distributed encode and simplify +- Check for all partitions nominally completed encoding before doing + anything destructive in dencode-finalise + # 0.0.6 2024-04-24 - Only use NOSHUFFLE by default on ``call_genotype`` and bool arrays. diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index d823a12..fcca227 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -1758,28 +1758,42 @@ def load_metadata(self): def partition_path(self, partition_index): return self.partitions_path / f"p{partition_index}" + def wip_partition_path(self, partition_index): + return self.partitions_path / f"wip_p{partition_index}" + def wip_partition_array_path(self, partition_index, name): - return self.partition_path(partition_index) / f"wip_{name}" + return self.wip_partition_path(partition_index) / name def partition_array_path(self, partition_index, name): return self.partition_path(partition_index) / name def encode_partition(self, partition_index): self.load_metadata() - partition_path = self.partition_path(partition_index) + if partition_index < 0 or partition_index >= self.num_partitions: + raise ValueError( + "Partition index must be in the range 0 <= index < num_partitions" + ) + partition_path = self.wip_partition_path(partition_index) partition_path.mkdir(exist_ok=True) logger.info(f"Encoding partition {partition_index} to {partition_path}") - self.encode_alleles_partition(partition_index) self.encode_id_partition(partition_index) self.encode_filters_partition(partition_index) self.encode_contig_partition(partition_index) + self.encode_alleles_partition(partition_index) for col in self.schema.columns.values(): if col.vcf_field is not None: self.encode_array_partition(col, partition_index) if "call_genotype" in self.schema.columns: self.encode_genotypes_partition(partition_index) + final_path = self.partition_path(partition_index) + logger.info(f"Finalising {partition_index} at {final_path}") + if final_path.exists(): + logger.warning("Removing existing partition at {final_path}") + shutil.rmtree(final_path) + os.rename(partition_path, final_path) + def init_partition_array(self, partition_index, name): wip_path = self.wip_partition_array_path(partition_index, name) # Create an empty array like the definition @@ -1791,14 +1805,6 @@ def init_partition_array(self, partition_index, name): return array def finalise_partition_array(self, partition_index, name): - wip_path = self.wip_partition_array_path(partition_index, name) - final_path = self.partition_array_path(partition_index, name) - if final_path.exists(): - # NEEDS TEST - logger.warning(f"Removing existing {final_path}") - shutil.rmtree(final_path) - # Atomic swap - os.rename(wip_path, final_path) logger.debug(f"Encoded {name} partition {partition_index}") def encode_array_partition(self, column, partition_index): @@ -1942,7 +1948,7 @@ def finalise_array(self, name): if final_path.exists(): # NEEDS TEST raise ValueError(f"Array {name} already exists") - for partition in range(len(self.metadata.partitions)): + for partition in range(self.num_partitions): # Move all the files in partition dir to dest dir src = self.partition_array_path(partition, name) if not src.exists(): @@ -1969,6 +1975,13 @@ def finalise_array(self, name): def finalise(self, show_progress=False): self.load_metadata() + missing = [] + for partition_id in range(self.num_partitions): + if not self.partition_path(partition_id).exists(): + missing.append(partition_id) + if len(missing) > 0: + raise FileNotFoundError(f"Partitions not encoded: {missing}") + progress_config = core.ProgressConfig( total=len(self.schema.columns), title="Finalise", @@ -1986,6 +1999,9 @@ def finalise(self, show_progress=False): with core.ParallelWorkManager(0, progress_config) as pwm: for name in self.schema.columns: pwm.submit(self.finalise_array, name) + logger.debug(f"Removing {self.wip_path}") + shutil.rmtree(self.wip_path) + logger.info("Consolidating Zarr metadata") zarr.consolidate_metadata(self.path) ###################### diff --git a/tests/test_vcf.py b/tests/test_vcf.py index c5c1837..60fa1ae 100644 --- a/tests/test_vcf.py +++ b/tests/test_vcf.py @@ -5,7 +5,7 @@ import xarray.testing as xt import zarr -from bio2zarr import vcf, vcf_utils +from bio2zarr import core, vcf, vcf_utils @pytest.fixture(scope="module") @@ -383,3 +383,117 @@ def test_fields(self, schema, field, description): # ]) # def test_filters(self, schema, filt, description): # assert schema["filters"][field]["description"] == description + + +class TestVcfZarrWriterExample: + arrays = ( + "variant_contig", + "variant_filter", + "variant_id", + "variant_AA", + "variant_AC", + "variant_AF", + "variant_AN", + "variant_DB", + "variant_DP", + "variant_H2", + "variant_NS", + "variant_position", + "variant_quality", + "variant_allele", + "call_DP", + "call_GQ", + "call_genotype", + "call_genotype_phased", + "call_genotype_mask", + "call_HQ", + ) + + def test_init_paths(self, icf_path, tmp_path): + zarr_path = tmp_path / "x.zarr" + assert not zarr_path.exists() + num_partitions, _ = vcf.encode_init( + icf_path, zarr_path, 7, variants_chunk_size=3 + ) + assert num_partitions == 3 + assert zarr_path.exists() + wip_path = zarr_path / "wip" + assert wip_path.exists() + wip_partitions_path = wip_path / "partitions" + assert wip_partitions_path.exists() + wip_arrays_path = wip_path / "arrays" + assert wip_arrays_path.exists() + for name in self.arrays: + array_path = wip_arrays_path / name + assert array_path.exists() + with open(wip_path / "metadata.json") as f: + d = json.loads(f.read()) + # Basic test + assert len(d["partitions"]) == 3 + + def test_finalise_paths(self, icf_path, tmp_path): + zarr_path = tmp_path / "x.zarr" + assert not zarr_path.exists() + num_partitions, _ = vcf.encode_init( + icf_path, zarr_path, 7, variants_chunk_size=3 + ) + wip_path = zarr_path / "wip" + assert wip_path.exists() + for j in range(num_partitions): + vcf.encode_partition(zarr_path, j) + assert (wip_path / "partitions" / f"p{j}").exists() + vcf.encode_finalise(zarr_path) + assert zarr_path.exists() + assert not wip_path.exists() + + def test_finalise_no_partitions_fails(self, icf_path, tmp_path): + zarr_path = tmp_path / "x.zarr" + vcf.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) + with pytest.raises( + FileNotFoundError, match="Partitions not encoded: \\[0, 1, 2\\]" + ): + vcf.encode_finalise(zarr_path) + + @pytest.mark.parametrize("partition", [0, 1, 2]) + def test_finalise_missing_partition_fails(self, icf_path, tmp_path, partition): + zarr_path = tmp_path / "x.zarr" + vcf.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) + for j in range(3): + if j != partition: + vcf.encode_partition(zarr_path, j) + with pytest.raises( + FileNotFoundError, match=f"Partitions not encoded: \\[{partition}\\]" + ): + vcf.encode_finalise(zarr_path) + + @pytest.mark.parametrize("partition", [0, 1, 2]) + def test_encode_partition(self, icf_path, tmp_path, partition): + zarr_path = tmp_path / "x.zarr" + vcf.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) + partition_path = zarr_path / "wip" / "partitions" / f"p{partition}" + assert not partition_path.exists() + vcf.encode_partition(zarr_path, partition) + assert partition_path.exists() + + def test_double_encode_partition(self, icf_path, tmp_path, caplog): + partition = 1 + zarr_path = tmp_path / "x.zarr" + vcf.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) + partition_path = zarr_path / "wip" / "partitions" / f"p{partition}" + assert not partition_path.exists() + vcf.encode_partition(zarr_path, partition) + assert partition_path.exists() + size = core.du(partition_path) + assert size > 0 + with caplog.at_level("WARNING"): + vcf.encode_partition(zarr_path, partition) + assert "Removing existing partition at" in caplog.text + assert partition_path.exists() + assert core.du(partition_path) == size + + @pytest.mark.parametrize("partition", [-1, 3, 100]) + def test_encode_partition_out_of_range(self, icf_path, tmp_path, partition): + zarr_path = tmp_path / "x.zarr" + vcf.encode_init(icf_path, zarr_path, 3, variants_chunk_size=3) + with pytest.raises(ValueError, match="Partition index must be in the range"): + vcf.encode_partition(zarr_path, partition) From b22f2da7fd07d2449e3852f7a9cf4b36021caadc Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Fri, 26 Apr 2024 11:43:22 +0100 Subject: [PATCH 3/5] Add some logging --- bio2zarr/vcf.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index fcca227..ba0e2c3 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -1975,7 +1975,9 @@ def finalise_array(self, name): def finalise(self, show_progress=False): self.load_metadata() + logger.info("Scanning {self.num_partitions} partitions") missing = [] + # TODO may need a progress bar here for partition_id in range(self.num_partitions): if not self.partition_path(partition_id).exists(): missing.append(partition_id) From 117ce93322004c8d95be3a47c17d4a78914d85e1 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Fri, 26 Apr 2024 11:57:05 +0100 Subject: [PATCH 4/5] Workaround for issue #146 --- bio2zarr/vcf.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/bio2zarr/vcf.py b/bio2zarr/vcf.py index ba0e2c3..1961cc8 100644 --- a/bio2zarr/vcf.py +++ b/bio2zarr/vcf.py @@ -294,17 +294,18 @@ def scan_vcf(path, target_num_partitions): def check_overlap(partitions): for i in range(1, len(partitions)): - prev_partition = partitions[i - 1] - current_partition = partitions[i] - if ( - prev_partition.region.contig == current_partition.region.contig - and prev_partition.region.end > current_partition.region.start - ): - raise ValueError( - f"Multiple VCFs have the region " - f"{prev_partition.region.contig}:{prev_partition.region.start}-" - f"{current_partition.region.end}" - ) + prev_region = partitions[i - 1].region + current_region = partitions[i].region + if prev_region.contig == current_region.contig: + if prev_region.end is None: + logger.warning("Cannot check overlaps; issue #146") + continue + if prev_region.end > current_region.start: + raise ValueError( + f"Multiple VCFs have the region " + f"{prev_region.contig}:{prev_region.start}-" + f"{current_region.end}" + ) def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1): @@ -453,7 +454,7 @@ def sanitise_value_float_2d(buff, j, value): def sanitise_int_array(value, ndmin, dtype): if isinstance(value, tuple): - value = [VCF_INT_MISSING if x is None else x for x in value] # NEEDS TEST + value = [VCF_INT_MISSING if x is None else x for x in value] # NEEDS TEST value = np.array(value, ndmin=ndmin, copy=False) value[value == VCF_INT_MISSING] = -1 value[value == VCF_INT_FILL] = -2 From 3faa3c87c3fe1547ede608c1ef1e23529729f5d3 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Tue, 30 Apr 2024 15:53:00 +0100 Subject: [PATCH 5/5] Update changelog for 0.0.7 release --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dbf094..35a7a47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -# 0.0.7 2024-04-xx +# 0.0.7 2024-04-30 - Change on-disk format of distributed encode and simplify - Check for all partitions nominally completed encoding before doing anything destructive in dencode-finalise