diff --git a/tests/test_array_field.py b/tests/test_array_field.py index 3f6692dd..6cfe57a7 100644 --- a/tests/test_array_field.py +++ b/tests/test_array_field.py @@ -2,6 +2,7 @@ from tempfile import NamedTemporaryFile from collections import defaultdict from assertpy.assertpy import assert_that +from multiprocessing import cpu_count from assertpy import assert_that import numpy as np @@ -15,7 +16,7 @@ class DummyActivationsDataset(Dataset): def __init__(self, n_samples, shape): self.n_samples = n_samples self.shape = shape - + def __len__(self): return self.n_samples @@ -50,11 +51,11 @@ def run_test(n_samples, shape): writer = DatasetWriter(name, { 'index': IntField(), 'activations': NDArrayField(np.dtype('= self.n_samples: raise IndexError() np.random.seed(index) - return (np.random.rand(50000) > 0.5).astype('bool'), np.random.rand(50000).astype('float32'), index + return (np.random.rand(SIZE) > 0.5).astype('bool'), np.random.rand(SIZE).astype('float32'), index def run_experiment_cuda(weight, loader, sync=False): total = 0. - X = ch.empty(2048, 50_000, device=weight.device) + X = ch.empty(BATCH, SIZE, device=weight.device) for X_bool, _, __ in tqdm(loader): if sync: ch.cuda.synchronize() X.copy_(X_bool) @@ -42,13 +47,13 @@ def run_experiment_cuda(weight, loader, sync=False): return total.sum(0) def run_cuda(weight, sync): - n_samples, shape = (2048 * 10, (50000,)) + n_samples, shape = (BATCH * WORKERS, (SIZE,)) with NamedTemporaryFile() as handle: name = handle.name dataset = DummyArrayDataset(n_samples, shape) writer = DatasetWriter(name, { - 'mask': NDArrayField(dtype=np.dtype('bool'), shape=(50_000,)), - 'targets': NDArrayField(dtype=np.dtype('float32'), shape=(50_000,)), + 'mask': NDArrayField(dtype=np.dtype('bool'), shape=(SIZE,)), + 'targets': NDArrayField(dtype=np.dtype('float32'), shape=(SIZE,)), 'idx': IntField() }) @@ -56,8 +61,8 @@ def run_cuda(weight, sync): loader = Loader( name, - batch_size=2048, - num_workers=10, + batch_size=BATCH, + num_workers=WORKERS, order=OrderOption.QUASI_RANDOM, indices=np.arange(n_samples), drop_last=False, @@ -67,11 +72,11 @@ def run_cuda(weight, sync): 'targets': [NDArrayDecoder(), ToTensor(), ToDevice(ch.device('cuda:0'), non_blocking=False)], 'idx': [IntDecoder(), ToTensor(), Squeeze(), ToDevice(ch.device('cuda:0'), non_blocking=False)] }) - + return run_experiment_cuda(weight, loader, sync) def test_cuda(): - weight = ch.randn(50_000, 50_000).cuda() + weight = ch.randn(SIZE, SIZE).cuda() async_1 = run_cuda(weight, False) sync_1 = run_cuda(weight, True) sync_2 = run_cuda(weight, True) @@ -80,7 +85,7 @@ def test_cuda(): print(sync_2) print(ch.abs(sync_1 - sync_2).max()) print(ch.abs(sync_1 - async_1).max()) - assert ch.abs(sync_1 - sync_2).max().cpu().item() < 10., 'Sync-sync mismatch' - assert ch.abs(async_1 - sync_1).max().cpu().item() < 10., 'Async-sync mismatch' + assert ch.abs(sync_1 - sync_2).max().cpu().item() < float(WORKERS), 'Sync-sync mismatch' + assert ch.abs(async_1 - sync_1).max().cpu().item() < float(WORKERS), 'Async-sync mismatch' # test_cuda() \ No newline at end of file diff --git a/tests/test_json_field.py b/tests/test_json_field.py index ee3165c9..895173dd 100644 --- a/tests/test_json_field.py +++ b/tests/test_json_field.py @@ -2,7 +2,7 @@ from ctypes import pointer from tempfile import NamedTemporaryFile from collections import defaultdict -from assertpy.assertpy import assert_that +from multiprocessing import cpu_count from assertpy import assert_that import numpy as np @@ -46,11 +46,11 @@ def run_test(n_samples): writer = DatasetWriter(name, { 'index': IntField(), 'activations': JSONField() - }, num_workers=3) + }, num_workers=min(3, cpu_count())) writer.from_indexed_dataset(dataset) - loader = Loader(name, batch_size=3, num_workers=5, + loader = Loader(name, batch_size=3, num_workers=min(5, cpu_count()), pipelines={ 'activation': [BytesDecoder()], 'index': [IntDecoder()] diff --git a/tests/test_loader_filter.py b/tests/test_loader_filter.py index d8663858..ba541c0c 100644 --- a/tests/test_loader_filter.py +++ b/tests/test_loader_filter.py @@ -12,6 +12,7 @@ from tempfile import NamedTemporaryFile from ffcv.pipeline.operation import Operation from ffcv.transforms.ops import ToTensor +from multiprocessing import cpu_count from ffcv.writer import DatasetWriter from ffcv.reader import Reader @@ -52,7 +53,7 @@ def test_basic_simple(): Compiler.set_enabled(True) - loader = Loader(file_name, batch_size, num_workers=5, seed=17, + loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17, pipelines={ 'value': [FloatDecoder(), Doubler(), ToTensor()], }) diff --git a/tests/test_partial_batches.py b/tests/test_partial_batches.py index b6a3d7e7..aa2b7d6a 100644 --- a/tests/test_partial_batches.py +++ b/tests/test_partial_batches.py @@ -12,6 +12,7 @@ from tempfile import NamedTemporaryFile from ffcv.pipeline.operation import Operation from ffcv.transforms.ops import ToTensor +from multiprocessing import cpu_count from ffcv.writer import DatasetWriter from ffcv.reader import Reader @@ -52,7 +53,7 @@ def run_test(bs, exp_length, drop_last=True): Compiler.set_enabled(True) - loader = Loader(file_name, batch_size, num_workers=5, seed=17, + loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17, drop_last=drop_last, pipelines={ 'value': [FloatDecoder(), Doubler(), ToTensor()] @@ -76,6 +77,4 @@ def test_not_partial_multiple(): run_test(60, 10, False) def test_partial_multiple(): - run_test(60, 10, True) - - \ No newline at end of file + run_test(60, 10, True) \ No newline at end of file diff --git a/tests/test_partial_pipeline.py b/tests/test_partial_pipeline.py index 72598a4c..744e347d 100644 --- a/tests/test_partial_pipeline.py +++ b/tests/test_partial_pipeline.py @@ -12,6 +12,7 @@ from tempfile import NamedTemporaryFile from ffcv.pipeline.operation import Operation from ffcv.transforms.ops import ToTensor +from multiprocessing import cpu_count from ffcv.writer import DatasetWriter from ffcv.reader import Reader @@ -52,7 +53,7 @@ def test_basic_simple(): Compiler.set_enabled(True) - loader = Loader(file_name, batch_size, num_workers=5, seed=17, + loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17, pipelines={ 'value': [FloatDecoder(), Doubler(), ToTensor()], 'index': None @@ -64,5 +65,4 @@ def test_basic_simple(): assert_that(result).is_length(1) values = result[0] assert_that(np.allclose(2 * np.sin(np.arange(batch_size)), - values.squeeze().numpy())).is_true() - \ No newline at end of file + values.squeeze().numpy())).is_true() \ No newline at end of file diff --git a/tests/test_writer.py b/tests/test_writer.py index 71092e74..d1379d24 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -6,6 +6,7 @@ import os from assertpy import assert_that from tempfile import NamedTemporaryFile +from multiprocessing import cpu_count from ffcv.writer import DatasetWriter from ffcv.reader import Reader @@ -91,7 +92,7 @@ def test_multiple_workers(): writer = DatasetWriter(name, { 'index': IntField(), 'value': FloatField() - }, num_workers=30) + }, num_workers=min(30, cpu_count())) writer.from_indexed_dataset(dataset, chunksize=10000) @@ -106,7 +107,7 @@ def test_super_long(): writer = DatasetWriter(name, { 'index': IntField(), 'value': FloatField() - }, num_workers=30) + }, num_workers=min(30, cpu_count())) writer.from_indexed_dataset(dataset, chunksize=10000) @@ -120,6 +121,6 @@ def test_small_chunks_multiple_workers(): writer = DatasetWriter(name, { 'index': IntField(), 'value': BytesField() - }, num_workers=30) + }, num_workers=min(30, cpu_count())) writer.from_indexed_dataset(dataset, chunksize=1) \ No newline at end of file