Skip to content

Commit

Permalink
Modify tests to work on consumer hardware
Browse files Browse the repository at this point in the history
  • Loading branch information
warner-benjamin committed Mar 2, 2023
1 parent e32280b commit 695c563
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 35 deletions.
9 changes: 5 additions & 4 deletions tests/test_array_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from tempfile import NamedTemporaryFile
from collections import defaultdict
from assertpy.assertpy import assert_that
from multiprocessing import cpu_count

from assertpy import assert_that
import numpy as np
Expand All @@ -15,7 +16,7 @@ class DummyActivationsDataset(Dataset):
def __init__(self, n_samples, shape):
self.n_samples = n_samples
self.shape = shape

def __len__(self):
return self.n_samples

Expand Down Expand Up @@ -50,11 +51,11 @@ def run_test(n_samples, shape):
writer = DatasetWriter(name, {
'index': IntField(),
'activations': NDArrayField(np.dtype('<f4'), shape)
}, num_workers=3)
}, num_workers=min(3, cpu_count()))

writer.from_indexed_dataset(dataset)

loader = Loader(name, batch_size=3, num_workers=5)
loader = Loader(name, batch_size=3, num_workers=min(5, cpu_count()))
for ixes, activations in loader:
for ix, activation in zip(ixes, activations):
assert_that(np.all(dataset[ix][1] == activation.numpy())).is_true()
Expand All @@ -80,7 +81,7 @@ def test_multi_fields():

writer.from_indexed_dataset(dataset)

loader = Loader(name, batch_size=3, num_workers=5)
loader = Loader(name, batch_size=3, num_workers=min(5, cpu_count()))
page_size_l2 = int(np.log2(loader.reader.page_size))
sample_ids = loader.reader.alloc_table['sample_id']
pointers = loader.reader.alloc_table['ptr']
Expand Down
11 changes: 6 additions & 5 deletions tests/test_basic_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from tempfile import NamedTemporaryFile
from ffcv.pipeline.operation import Operation
from ffcv.transforms.ops import ToTensor
from multiprocessing import cpu_count

from ffcv.writer import DatasetWriter
from ffcv.reader import Reader
Expand Down Expand Up @@ -52,7 +53,7 @@ def test_basic_simple():

Compiler.set_enabled(True)

loader = Loader(file_name, batch_size, num_workers=5, seed=17,
loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17,
pipelines={
'value': [FloatDecoder(), Doubler(), ToTensor()]
})
Expand All @@ -63,7 +64,7 @@ def test_basic_simple():
np.arange(batch_size))).is_true()
assert_that(np.allclose(2 * np.sin(np.arange(batch_size)),
values.squeeze().numpy())).is_true()

def test_multiple_iterators_success():
length = 60
batch_size = 8
Expand All @@ -79,7 +80,7 @@ def test_multiple_iterators_success():

Compiler.set_enabled(True)

loader = Loader(file_name, batch_size, num_workers=5, seed=17,
loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17,
pipelines={
'value': [FloatDecoder(), Doubler(), ToTensor()]
})
Expand All @@ -102,7 +103,7 @@ def test_multiple_epoch_doesnt_recompile():

Compiler.set_enabled(True)

loader = Loader(file_name, batch_size, num_workers=5, seed=17,
loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17,
pipelines={
'value': [FloatDecoder(), Doubler(), ToTensor()]
})
Expand All @@ -128,7 +129,7 @@ def test_multiple_epoch_does_recompile():

Compiler.set_enabled(True)

loader = Loader(file_name, batch_size, num_workers=5, seed=17,
loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17,
recompile=True,
pipelines={
'value': [FloatDecoder(), Doubler(), ToTensor()]
Expand Down
29 changes: 17 additions & 12 deletions tests/test_cuda_nonblocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import torch as ch
from tqdm import tqdm
import time
from multiprocessing import cpu_count

from assertpy import assert_that
import numpy as np
Expand All @@ -15,23 +16,27 @@
from ffcv.transforms import ToDevice, ToTensor, Squeeze
import time

BATCH = 256
SIZE = 25_000
WORKERS = min(10, cpu_count())

class DummyArrayDataset(Dataset):
def __init__(self, n_samples, shape):
self.n_samples = n_samples
self.shape = shape

def __len__(self):
return self.n_samples

def __getitem__(self, index):
if index >= self.n_samples:
raise IndexError()
np.random.seed(index)
return (np.random.rand(50000) > 0.5).astype('bool'), np.random.rand(50000).astype('float32'), index
return (np.random.rand(SIZE) > 0.5).astype('bool'), np.random.rand(SIZE).astype('float32'), index

def run_experiment_cuda(weight, loader, sync=False):
total = 0.
X = ch.empty(2048, 50_000, device=weight.device)
X = ch.empty(BATCH, SIZE, device=weight.device)
for X_bool, _, __ in tqdm(loader):
if sync: ch.cuda.synchronize()
X.copy_(X_bool)
Expand All @@ -42,22 +47,22 @@ def run_experiment_cuda(weight, loader, sync=False):
return total.sum(0)

def run_cuda(weight, sync):
n_samples, shape = (2048 * 10, (50000,))
n_samples, shape = (BATCH * WORKERS, (SIZE,))
with NamedTemporaryFile() as handle:
name = handle.name
dataset = DummyArrayDataset(n_samples, shape)
writer = DatasetWriter(name, {
'mask': NDArrayField(dtype=np.dtype('bool'), shape=(50_000,)),
'targets': NDArrayField(dtype=np.dtype('float32'), shape=(50_000,)),
'mask': NDArrayField(dtype=np.dtype('bool'), shape=(SIZE,)),
'targets': NDArrayField(dtype=np.dtype('float32'), shape=(SIZE,)),
'idx': IntField()
})

writer.from_indexed_dataset(dataset)

loader = Loader(
name,
batch_size=2048,
num_workers=10,
batch_size=BATCH,
num_workers=WORKERS,
order=OrderOption.QUASI_RANDOM,
indices=np.arange(n_samples),
drop_last=False,
Expand All @@ -67,11 +72,11 @@ def run_cuda(weight, sync):
'targets': [NDArrayDecoder(), ToTensor(), ToDevice(ch.device('cuda:0'), non_blocking=False)],
'idx': [IntDecoder(), ToTensor(), Squeeze(), ToDevice(ch.device('cuda:0'), non_blocking=False)]
})

return run_experiment_cuda(weight, loader, sync)

def test_cuda():
weight = ch.randn(50_000, 50_000).cuda()
weight = ch.randn(SIZE, SIZE).cuda()
async_1 = run_cuda(weight, False)
sync_1 = run_cuda(weight, True)
sync_2 = run_cuda(weight, True)
Expand All @@ -80,7 +85,7 @@ def test_cuda():
print(sync_2)
print(ch.abs(sync_1 - sync_2).max())
print(ch.abs(sync_1 - async_1).max())
assert ch.abs(sync_1 - sync_2).max().cpu().item() < 10., 'Sync-sync mismatch'
assert ch.abs(async_1 - sync_1).max().cpu().item() < 10., 'Async-sync mismatch'
assert ch.abs(sync_1 - sync_2).max().cpu().item() < float(WORKERS), 'Sync-sync mismatch'
assert ch.abs(async_1 - sync_1).max().cpu().item() < float(WORKERS), 'Async-sync mismatch'

# test_cuda()
6 changes: 3 additions & 3 deletions tests/test_json_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from ctypes import pointer
from tempfile import NamedTemporaryFile
from collections import defaultdict
from assertpy.assertpy import assert_that
from multiprocessing import cpu_count

from assertpy import assert_that
import numpy as np
Expand Down Expand Up @@ -46,11 +46,11 @@ def run_test(n_samples):
writer = DatasetWriter(name, {
'index': IntField(),
'activations': JSONField()
}, num_workers=3)
}, num_workers=min(3, cpu_count()))

writer.from_indexed_dataset(dataset)

loader = Loader(name, batch_size=3, num_workers=5,
loader = Loader(name, batch_size=3, num_workers=min(5, cpu_count()),
pipelines={
'activation': [BytesDecoder()],
'index': [IntDecoder()]
Expand Down
3 changes: 2 additions & 1 deletion tests/test_loader_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from tempfile import NamedTemporaryFile
from ffcv.pipeline.operation import Operation
from ffcv.transforms.ops import ToTensor
from multiprocessing import cpu_count

from ffcv.writer import DatasetWriter
from ffcv.reader import Reader
Expand Down Expand Up @@ -52,7 +53,7 @@ def test_basic_simple():

Compiler.set_enabled(True)

loader = Loader(file_name, batch_size, num_workers=5, seed=17,
loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17,
pipelines={
'value': [FloatDecoder(), Doubler(), ToTensor()],
})
Expand Down
7 changes: 3 additions & 4 deletions tests/test_partial_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from tempfile import NamedTemporaryFile
from ffcv.pipeline.operation import Operation
from ffcv.transforms.ops import ToTensor
from multiprocessing import cpu_count

from ffcv.writer import DatasetWriter
from ffcv.reader import Reader
Expand Down Expand Up @@ -52,7 +53,7 @@ def run_test(bs, exp_length, drop_last=True):

Compiler.set_enabled(True)

loader = Loader(file_name, batch_size, num_workers=5, seed=17,
loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17,
drop_last=drop_last,
pipelines={
'value': [FloatDecoder(), Doubler(), ToTensor()]
Expand All @@ -76,6 +77,4 @@ def test_not_partial_multiple():
run_test(60, 10, False)

def test_partial_multiple():
run_test(60, 10, True)


run_test(60, 10, True)
6 changes: 3 additions & 3 deletions tests/test_partial_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from tempfile import NamedTemporaryFile
from ffcv.pipeline.operation import Operation
from ffcv.transforms.ops import ToTensor
from multiprocessing import cpu_count

from ffcv.writer import DatasetWriter
from ffcv.reader import Reader
Expand Down Expand Up @@ -52,7 +53,7 @@ def test_basic_simple():

Compiler.set_enabled(True)

loader = Loader(file_name, batch_size, num_workers=5, seed=17,
loader = Loader(file_name, batch_size, num_workers=min(5, cpu_count()), seed=17,
pipelines={
'value': [FloatDecoder(), Doubler(), ToTensor()],
'index': None
Expand All @@ -64,5 +65,4 @@ def test_basic_simple():
assert_that(result).is_length(1)
values = result[0]
assert_that(np.allclose(2 * np.sin(np.arange(batch_size)),
values.squeeze().numpy())).is_true()

values.squeeze().numpy())).is_true()
7 changes: 4 additions & 3 deletions tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
from assertpy import assert_that
from tempfile import NamedTemporaryFile
from multiprocessing import cpu_count

from ffcv.writer import DatasetWriter
from ffcv.reader import Reader
Expand Down Expand Up @@ -91,7 +92,7 @@ def test_multiple_workers():
writer = DatasetWriter(name, {
'index': IntField(),
'value': FloatField()
}, num_workers=30)
}, num_workers=min(30, cpu_count()))

writer.from_indexed_dataset(dataset, chunksize=10000)

Expand All @@ -106,7 +107,7 @@ def test_super_long():
writer = DatasetWriter(name, {
'index': IntField(),
'value': FloatField()
}, num_workers=30)
}, num_workers=min(30, cpu_count()))

writer.from_indexed_dataset(dataset, chunksize=10000)

Expand All @@ -120,6 +121,6 @@ def test_small_chunks_multiple_workers():
writer = DatasetWriter(name, {
'index': IntField(),
'value': BytesField()
}, num_workers=30)
}, num_workers=min(30, cpu_count()))

writer.from_indexed_dataset(dataset, chunksize=1)

0 comments on commit 695c563

Please sign in to comment.