diff --git a/tests/test_array_field.py b/tests/test_array_field.py index 5ae359d3..85cd1b14 100644 --- a/tests/test_array_field.py +++ b/tests/test_array_field.py @@ -2,6 +2,7 @@ from tempfile import NamedTemporaryFile from collections import defaultdict from assertpy.assertpy import assert_that +from multiprocessing import cpu_count import torch as ch from assertpy import assert_that @@ -66,7 +67,7 @@ def run_test(n_samples, shape, is_ch=False): writer.from_indexed_dataset(dataset) - loader = Loader(name, batch_size=3, num_workers=5) + loader = Loader(name, batch_size=3, num_workers=min(5, cpu_count())) for ixes, activations in loader: for ix, activation in zip(ixes, activations): d = dataset[ix][1] @@ -98,7 +99,7 @@ def test_multi_fields(): writer.from_indexed_dataset(dataset) - loader = Loader(name, batch_size=3, num_workers=5) + loader = Loader(name, batch_size=3, num_workers=min(5, cpu_count())) page_size_l2 = int(np.log2(loader.reader.page_size)) sample_ids = loader.reader.alloc_table['sample_id'] pointers = loader.reader.alloc_table['ptr'] diff --git a/tests/test_basic_pipeline.py b/tests/test_basic_pipeline.py index 107a7866..67702f30 100644 --- a/tests/test_basic_pipeline.py +++ b/tests/test_basic_pipeline.py @@ -12,6 +12,7 @@ from tempfile import NamedTemporaryFile from ffcv.pipeline.operation import Operation from ffcv.transforms.ops import ToTensor +from multiprocessing import cpu_count from ffcv.writer import DatasetWriter from ffcv.reader import Reader @@ -52,7 +53,7 @@ def test_basic_simple(): Compiler.set_enabled(True) - loader = Loader(file_name, batch_size, num_workers=5, seed=17, + loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17, pipelines={ 'value': [FloatDecoder(), Doubler(), ToTensor()] }) @@ -63,7 +64,7 @@ def test_basic_simple(): np.arange(batch_size))).is_true() assert_that(np.allclose(2 * np.sin(np.arange(batch_size)), values.squeeze().numpy())).is_true() - + def test_multiple_iterators_success(): length = 60 batch_size = 8 @@ -79,7 +80,7 @@ def test_multiple_iterators_success(): Compiler.set_enabled(True) - loader = Loader(file_name, batch_size, num_workers=5, seed=17, + loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17, pipelines={ 'value': [FloatDecoder(), Doubler(), ToTensor()] }) @@ -102,7 +103,7 @@ def test_multiple_epoch_doesnt_recompile(): Compiler.set_enabled(True) - loader = Loader(file_name, batch_size, num_workers=5, seed=17, + loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17, pipelines={ 'value': [FloatDecoder(), Doubler(), ToTensor()] }) @@ -128,7 +129,7 @@ def test_multiple_epoch_does_recompile(): Compiler.set_enabled(True) - loader = Loader(file_name, batch_size, num_workers=5, seed=17, + loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17, recompile=True, pipelines={ 'value': [FloatDecoder(), Doubler(), ToTensor()] diff --git a/tests/test_cuda_nonblocking.py b/tests/test_cuda_nonblocking.py index c8e64866..106c5def 100644 --- a/tests/test_cuda_nonblocking.py +++ b/tests/test_cuda_nonblocking.py @@ -2,6 +2,7 @@ import torch as ch from tqdm import tqdm import time +from multiprocessing import cpu_count from assertpy import assert_that import numpy as np @@ -15,11 +16,15 @@ from ffcv.transforms import ToDevice, ToTensor, Squeeze import time +BATCH = 256 +SIZE = 25_000 +WORKERS = min(10, cpu_count()) + class DummyArrayDataset(Dataset): def __init__(self, n_samples, shape): self.n_samples = n_samples self.shape = shape - + def __len__(self): return self.n_samples @@ -27,11 +32,11 @@ def __getitem__(self, index): if index >= self.n_samples: raise IndexError() np.random.seed(index) - return (np.random.rand(50000) > 0.5).astype('bool'), np.random.rand(50000).astype('float32'), index + return (np.random.rand(SIZE) > 0.5).astype('bool'), np.random.rand(SIZE).astype('float32'), index def run_experiment_cuda(weight, loader, sync=False): total = 0. - X = ch.empty(2048, 50_000, device=weight.device) + X = ch.empty(BATCH, SIZE, device=weight.device) for X_bool, _, __ in tqdm(loader): if sync: ch.cuda.synchronize() X.copy_(X_bool) @@ -42,13 +47,13 @@ def run_experiment_cuda(weight, loader, sync=False): return total.sum(0) def run_cuda(weight, sync): - n_samples, shape = (2048 * 10, (50000,)) + n_samples, shape = (BATCH * WORKERS, (SIZE,)) with NamedTemporaryFile() as handle: name = handle.name dataset = DummyArrayDataset(n_samples, shape) writer = DatasetWriter(name, { - 'mask': NDArrayField(dtype=np.dtype('bool'), shape=(50_000,)), - 'targets': NDArrayField(dtype=np.dtype('float32'), shape=(50_000,)), + 'mask': NDArrayField(dtype=np.dtype('bool'), shape=(SIZE,)), + 'targets': NDArrayField(dtype=np.dtype('float32'), shape=(SIZE,)), 'idx': IntField() }) @@ -56,8 +61,8 @@ def run_cuda(weight, sync): loader = Loader( name, - batch_size=2048, - num_workers=10, + batch_size=BATCH, + num_workers=WORKERS, order=OrderOption.QUASI_RANDOM, indices=np.arange(n_samples), drop_last=False, @@ -67,11 +72,11 @@ def run_cuda(weight, sync): 'targets': [NDArrayDecoder(), ToTensor(), ToDevice(ch.device('cuda:0'), non_blocking=False)], 'idx': [IntDecoder(), ToTensor(), Squeeze(), ToDevice(ch.device('cuda:0'), non_blocking=False)] }) - + return run_experiment_cuda(weight, loader, sync) def test_cuda(): - weight = ch.randn(50_000, 50_000).cuda() + weight = ch.randn(SIZE, SIZE).cuda() async_1 = run_cuda(weight, False) sync_1 = run_cuda(weight, True) sync_2 = run_cuda(weight, True) @@ -80,7 +85,7 @@ def test_cuda(): print(sync_2) print(ch.abs(sync_1 - sync_2).max()) print(ch.abs(sync_1 - async_1).max()) - assert ch.abs(sync_1 - sync_2).max().cpu().item() < 10., 'Sync-sync mismatch' - assert ch.abs(async_1 - sync_1).max().cpu().item() < 10., 'Async-sync mismatch' + assert ch.abs(sync_1 - sync_2).max().cpu().item() < float(WORKERS), 'Sync-sync mismatch' + assert ch.abs(async_1 - sync_1).max().cpu().item() < float(WORKERS), 'Async-sync mismatch' # test_cuda() \ No newline at end of file diff --git a/tests/test_json_field.py b/tests/test_json_field.py index 4d3c066f..792e91f7 100644 --- a/tests/test_json_field.py +++ b/tests/test_json_field.py @@ -2,7 +2,7 @@ from ctypes import pointer from tempfile import NamedTemporaryFile from collections import defaultdict -from assertpy.assertpy import assert_that +from multiprocessing import cpu_count from assertpy import assert_that import numpy as np @@ -46,11 +46,11 @@ def run_test(n_samples): writer = DatasetWriter(name, { 'index': IntField(), 'activations': JSONField() - }, num_workers=3) + }, num_workers=min(3, cpu_count())) writer.from_indexed_dataset(dataset) - loader = Loader(name, batch_size=3, num_workers=5, + loader = Loader(name, batch_size=3, num_workers=min(5, cpu_count()), pipelines={ 'activations': [BytesDecoder()], 'index': [IntDecoder()] diff --git a/tests/test_loader_filter.py b/tests/test_loader_filter.py index d8663858..ba541c0c 100644 --- a/tests/test_loader_filter.py +++ b/tests/test_loader_filter.py @@ -12,6 +12,7 @@ from tempfile import NamedTemporaryFile from ffcv.pipeline.operation import Operation from ffcv.transforms.ops import ToTensor +from multiprocessing import cpu_count from ffcv.writer import DatasetWriter from ffcv.reader import Reader @@ -52,7 +53,7 @@ def test_basic_simple(): Compiler.set_enabled(True) - loader = Loader(file_name, batch_size, num_workers=5, seed=17, + loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17, pipelines={ 'value': [FloatDecoder(), Doubler(), ToTensor()], }) diff --git a/tests/test_partial_batches.py b/tests/test_partial_batches.py index b6a3d7e7..aa2b7d6a 100644 --- a/tests/test_partial_batches.py +++ b/tests/test_partial_batches.py @@ -12,6 +12,7 @@ from tempfile import NamedTemporaryFile from ffcv.pipeline.operation import Operation from ffcv.transforms.ops import ToTensor +from multiprocessing import cpu_count from ffcv.writer import DatasetWriter from ffcv.reader import Reader @@ -52,7 +53,7 @@ def run_test(bs, exp_length, drop_last=True): Compiler.set_enabled(True) - loader = Loader(file_name, batch_size, num_workers=5, seed=17, + loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17, drop_last=drop_last, pipelines={ 'value': [FloatDecoder(), Doubler(), ToTensor()] @@ -76,6 +77,4 @@ def test_not_partial_multiple(): run_test(60, 10, False) def test_partial_multiple(): - run_test(60, 10, True) - - \ No newline at end of file + run_test(60, 10, True) \ No newline at end of file diff --git a/tests/test_partial_pipeline.py b/tests/test_partial_pipeline.py index 72598a4c..744e347d 100644 --- a/tests/test_partial_pipeline.py +++ b/tests/test_partial_pipeline.py @@ -12,6 +12,7 @@ from tempfile import NamedTemporaryFile from ffcv.pipeline.operation import Operation from ffcv.transforms.ops import ToTensor +from multiprocessing import cpu_count from ffcv.writer import DatasetWriter from ffcv.reader import Reader @@ -52,7 +53,7 @@ def test_basic_simple(): Compiler.set_enabled(True) - loader = Loader(file_name, batch_size, num_workers=5, seed=17, + loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17, pipelines={ 'value': [FloatDecoder(), Doubler(), ToTensor()], 'index': None @@ -64,5 +65,4 @@ def test_basic_simple(): assert_that(result).is_length(1) values = result[0] assert_that(np.allclose(2 * np.sin(np.arange(batch_size)), - values.squeeze().numpy())).is_true() - \ No newline at end of file + values.squeeze().numpy())).is_true() \ No newline at end of file diff --git a/tests/test_writer.py b/tests/test_writer.py index 71092e74..d1379d24 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -6,6 +6,7 @@ import os from assertpy import assert_that from tempfile import NamedTemporaryFile +from multiprocessing import cpu_count from ffcv.writer import DatasetWriter from ffcv.reader import Reader @@ -91,7 +92,7 @@ def test_multiple_workers(): writer = DatasetWriter(name, { 'index': IntField(), 'value': FloatField() - }, num_workers=30) + }, num_workers=min(30, cpu_count())) writer.from_indexed_dataset(dataset, chunksize=10000) @@ -106,7 +107,7 @@ def test_super_long(): writer = DatasetWriter(name, { 'index': IntField(), 'value': FloatField() - }, num_workers=30) + }, num_workers=min(30, cpu_count())) writer.from_indexed_dataset(dataset, chunksize=10000) @@ -120,6 +121,6 @@ def test_small_chunks_multiple_workers(): writer = DatasetWriter(name, { 'index': IntField(), 'value': BytesField() - }, num_workers=30) + }, num_workers=min(30, cpu_count())) writer.from_indexed_dataset(dataset, chunksize=1) \ No newline at end of file