diff --git a/.github/workflows/fpga-ci.yml b/.github/workflows/fpga-ci.yml index 9da22c5157..94699a3910 100644 --- a/.github/workflows/fpga-ci.yml +++ b/.github/workflows/fpga-ci.yml @@ -5,6 +5,8 @@ on: branches: [ master, ci-fix ] pull_request: branches: [ master, ci-fix ] + merge_group: + branches: [ master, ci-fix ] jobs: test-fpga: diff --git a/.github/workflows/general-ci.yml b/.github/workflows/general-ci.yml index 063c1f3e7d..f7b44e6978 100644 --- a/.github/workflows/general-ci.yml +++ b/.github/workflows/general-ci.yml @@ -5,6 +5,8 @@ on: branches: [ master, ci-fix ] pull_request: branches: [ master, ci-fix ] + merge_group: + branches: [ master, ci-fix ] jobs: test: diff --git a/.github/workflows/gpu-ci.yml b/.github/workflows/gpu-ci.yml index 7c3be4e62e..79c9d90246 100644 --- a/.github/workflows/gpu-ci.yml +++ b/.github/workflows/gpu-ci.yml @@ -5,6 +5,8 @@ on: branches: [ master, ci-fix ] pull_request: branches: [ master, ci-fix ] + merge_group: + branches: [ master, ci-fix ] jobs: test-gpu: diff --git a/.github/workflows/heterogeneous-ci.yml b/.github/workflows/heterogeneous-ci.yml index 0c5d246349..f253344d58 100644 --- a/.github/workflows/heterogeneous-ci.yml +++ b/.github/workflows/heterogeneous-ci.yml @@ -5,6 +5,8 @@ on: branches: [ master, ci-fix ] pull_request: branches: [ master, ci-fix ] + merge_group: + branches: [ master, ci-fix ] jobs: test-heterogeneous: diff --git a/dace/codegen/targets/cuda.py b/dace/codegen/targets/cuda.py index b729b34088..5060339e18 100644 --- a/dace/codegen/targets/cuda.py +++ b/dace/codegen/targets/cuda.py @@ -1132,10 +1132,22 @@ def _emit_copy(self, state_id, src_node, src_storage, dst_node, dst_storage, dst func=funcname, type=dst_node.desc(sdfg).dtype.ctype, bdims=', '.join(_topy(self._block_dims)), - is_async='true' if state_dfg.out_degree(dst_node) > 0 else 'true', + is_async='true' if state_dfg.out_degree(dst_node) == 0 else 'false', accum=accum, args=', '.join([src_expr] + _topy(src_strides) + [dst_expr] + custom_reduction + _topy(dst_strides) + _topy(copy_shape))), sdfg, state_id, [src_node, dst_node]) + elif funcname == 'dace::SharedToGlobal1D': + # special case: use a new template struct that provides functions for copy and reduction + callsite_stream.write( + (' {func}<{type}, {bdims}, {copysize}, {is_async}>{accum}({args});').format( + func=funcname, + type=dst_node.desc(sdfg).dtype.ctype, + bdims=', '.join(_topy(self._block_dims)), + copysize=', '.join(_topy(copy_shape)), + is_async='true' if state_dfg.out_degree(dst_node) == 0 else 'false', + accum=accum or '::Copy', + args=', '.join([src_expr] + _topy(src_strides) + [dst_expr] + _topy(dst_strides) + custom_reduction)), sdfg, + state_id, [src_node, dst_node]) else: callsite_stream.write( (' {func}<{type}, {bdims}, {copysize}, ' + @@ -1145,7 +1157,7 @@ def _emit_copy(self, state_id, src_node, src_storage, dst_node, dst_storage, dst bdims=', '.join(_topy(self._block_dims)), copysize=', '.join(_topy(copy_shape)), dststrides=', '.join(_topy(dst_strides)), - is_async='true' if state_dfg.out_degree(dst_node) > 0 else 'true', + is_async='true' if state_dfg.out_degree(dst_node) == 0 else 'false', accum=accum, args=', '.join([src_expr] + _topy(src_strides) + [dst_expr] + custom_reduction)), sdfg, state_id, [src_node, dst_node]) diff --git a/dace/memlet.py b/dace/memlet.py index d448ca1134..e7f0699eb8 100644 --- a/dace/memlet.py +++ b/dace/memlet.py @@ -169,8 +169,7 @@ def to_json(self): attrs['is_data_src'] = self._is_data_src # Fill in legacy (DEPRECATED) values for backwards compatibility - attrs['num_accesses'] = \ - str(self.volume) if not self.dynamic else -1 + attrs['num_accesses'] = str(self.volume) if not self.dynamic else -1 return {"type": "Memlet", "attributes": attrs} @@ -421,13 +420,11 @@ def from_array(dataname, datadesc, wcr=None): return Memlet.simple(dataname, rng, wcr_str=wcr) def __hash__(self): - return hash((self.volume, self.src_subset, self.dst_subset, str(self.wcr))) + return hash((self.data, self.volume, self.src_subset, self.dst_subset, str(self.wcr))) def __eq__(self, other): - return all([ - self.volume == other.volume, self.src_subset == other.src_subset, self.dst_subset == other.dst_subset, - self.wcr == other.wcr - ]) + return all((self.data == other.data, self.volume == other.volume, self.src_subset == other.src_subset, + self.dst_subset == other.dst_subset, self.wcr == other.wcr)) def replace(self, repl_dict): """ diff --git a/dace/runtime/include/dace/cuda/copy.cuh b/dace/runtime/include/dace/cuda/copy.cuh index 52462a906d..db3d715301 100644 --- a/dace/runtime/include/dace/cuda/copy.cuh +++ b/dace/runtime/include/dace/cuda/copy.cuh @@ -736,60 +736,77 @@ namespace dace int COPY_XLEN, bool ASYNC> struct SharedToGlobal1D { - template - static DACE_DFI void Accum(const T *smem, int src_xstride, T *ptr, int DST_XSTRIDE, WCR wcr) + static constexpr int BLOCK_SIZE = BLOCK_WIDTH * BLOCK_HEIGHT * BLOCK_DEPTH; + static constexpr int TOTAL = COPY_XLEN; + static constexpr int WRITES = TOTAL / BLOCK_SIZE; + static constexpr int REM_WRITES = TOTAL % BLOCK_SIZE; + + static DACE_DFI void Copy(const T *smem, int src_xstride, T *ptr, int dst_xstride) { + // Linear thread ID + int ltid = GetLinearTID(); + + #pragma unroll + for (int i = 0; i < WRITES; ++i) { + *(ptr + (ltid + i * BLOCK_SIZE) * dst_xstride) = + *(smem + (ltid + i * BLOCK_SIZE) * src_xstride); + } + + if (REM_WRITES != 0 && ltid < REM_WRITES) { + *(ptr + (ltid + WRITES*BLOCK_SIZE)* dst_xstride) = + *(smem + (ltid + WRITES * BLOCK_SIZE) * src_xstride); + } + if (!ASYNC) __syncthreads(); + } + template + static DACE_DFI void Accum(const T *smem, int src_xstride, T *ptr, int dst_xstride, WCR wcr) + { // Linear thread ID int ltid = GetLinearTID(); - constexpr int BLOCK_SIZE = BLOCK_WIDTH * BLOCK_HEIGHT * BLOCK_DEPTH; - constexpr int TOTAL = COPY_XLEN; - constexpr int WRITES = TOTAL / BLOCK_SIZE; - constexpr int REM_WRITES = TOTAL % BLOCK_SIZE; #pragma unroll for (int i = 0; i < WRITES; ++i) { wcr_custom::template reduce( - wcr, ptr + (ltid + i * BLOCK_SIZE) * DST_XSTRIDE, + wcr, ptr + (ltid + i * BLOCK_SIZE) * dst_xstride, *(smem + (ltid + i * BLOCK_SIZE) * src_xstride)); } if (REM_WRITES != 0) { if (ltid < REM_WRITES) wcr_custom::template reduce( - ptr + (ltid + WRITES * BLOCK_SIZE)* DST_XSTRIDE, + ptr + (ltid + WRITES * BLOCK_SIZE)* dst_xstride, *(smem + (ltid + WRITES * BLOCK_SIZE) * src_xstride)); } + + if (!ASYNC) + __syncthreads(); } template - static DACE_DFI void Accum(const T *smem, int src_xstride, T *ptr, int DST_XSTRIDE) + static DACE_DFI void Accum(const T *smem, int src_xstride, T *ptr, int dst_xstride) { - if (!ASYNC) - __syncthreads(); - // Linear thread ID int ltid = GetLinearTID(); - constexpr int BLOCK_SIZE = BLOCK_WIDTH * BLOCK_HEIGHT * BLOCK_DEPTH; - constexpr int TOTAL = COPY_XLEN; - constexpr int WRITES = TOTAL / BLOCK_SIZE; - constexpr int REM_WRITES = TOTAL % BLOCK_SIZE; #pragma unroll for (int i = 0; i < WRITES; ++i) { wcr_fixed::template reduce_atomic( - ptr + (ltid + i * BLOCK_SIZE) * DST_XSTRIDE, + ptr + (ltid + i * BLOCK_SIZE) * dst_xstride, *(smem + (ltid + i * BLOCK_SIZE) * src_xstride)); } if (REM_WRITES != 0) { if (ltid < REM_WRITES) wcr_fixed::template reduce_atomic( - ptr + (ltid + WRITES*BLOCK_SIZE)* DST_XSTRIDE, + ptr + (ltid + WRITES*BLOCK_SIZE)* dst_xstride, *(smem + (ltid + WRITES * BLOCK_SIZE) * src_xstride)); } + + if (!ASYNC) + __syncthreads(); } }; diff --git a/dace/transformation/passes/analysis.py b/dace/transformation/passes/analysis.py index 86e1cde062..d6b235a876 100644 --- a/dace/transformation/passes/analysis.py +++ b/dace/transformation/passes/analysis.py @@ -2,7 +2,7 @@ from collections import defaultdict from dace.transformation import pass_pipeline as ppl -from dace import SDFG, SDFGState, properties, InterstateEdge +from dace import SDFG, SDFGState, properties, InterstateEdge, Memlet, data as dt from dace.sdfg.graph import Edge from dace.sdfg import nodes as nd from dace.sdfg.analysis import cfg @@ -505,3 +505,81 @@ def apply_pass(self, top_sdfg: SDFG, pipeline_results: Dict[str, Any]) -> Dict[i del result[desc][write] top_result[sdfg.sdfg_id] = result return top_result + + +@properties.make_properties +class AccessRanges(ppl.Pass): + """ + For each data descriptor, finds all memlets used to access it (read/write ranges). + """ + + CATEGORY: str = 'Analysis' + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.Nothing + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return modified & ppl.Modifies.Memlets + + def apply_pass(self, top_sdfg: SDFG, _) -> Dict[int, Dict[str, Set[Memlet]]]: + """ + :return: A dictionary mapping each data descriptor name to a set of memlets. + """ + top_result: Dict[int, Dict[str, Set[Memlet]]] = dict() + + for sdfg in top_sdfg.all_sdfgs_recursive(): + result: Dict[str, Set[Memlet]] = defaultdict(set) + for state in sdfg.states(): + for anode in state.data_nodes(): + for e in state.all_edges(anode): + if e.dst is anode and e.dst_conn == 'set': # Skip reference sets + continue + if e.data.is_empty(): # Skip empty memlets + continue + # Find (hopefully propagated) root memlet + e = state.memlet_tree(e).root().edge + result[anode.data].add(e.data) + top_result[sdfg.sdfg_id] = result + return top_result + + +@properties.make_properties +class FindReferenceSources(ppl.Pass): + """ + For each Reference data descriptor, finds all memlets used to set it. If a Tasklet was used + to set the reference, the Tasklet is given as a source. + """ + + CATEGORY: str = 'Analysis' + + def modifies(self) -> ppl.Modifies: + return ppl.Modifies.Nothing + + def should_reapply(self, modified: ppl.Modifies) -> bool: + return modified & ppl.Modifies.Memlets + + def apply_pass(self, top_sdfg: SDFG, _) -> Dict[int, Dict[str, Set[Union[Memlet, nd.CodeNode]]]]: + """ + :return: A dictionary mapping each data descriptor name to a set of memlets. + """ + top_result: Dict[int, Dict[str, Set[Union[Memlet, nd.CodeNode]]]] = dict() + + for sdfg in top_sdfg.all_sdfgs_recursive(): + result: Dict[str, Set[Memlet]] = defaultdict(set) + reference_descs = set(k for k, v in sdfg.arrays.items() if isinstance(v, dt.Reference)) + for state in sdfg.states(): + for anode in state.data_nodes(): + if anode.data not in reference_descs: + continue + for e in state.in_edges(anode): + if e.dst_conn != 'set': + continue + true_src = state.memlet_path(e)[0].src + if isinstance(true_src, nd.CodeNode): + # Code -> Reference + result[anode.data].add(true_src) + else: + # Array -> Reference + result[anode.data].add(e.data) + top_result[sdfg.sdfg_id] = result + return top_result diff --git a/setup.py b/setup.py index bd635fb3b7..9c8773f9e3 100644 --- a/setup.py +++ b/setup.py @@ -73,7 +73,7 @@ }, include_package_data=True, install_requires=[ - 'numpy', 'networkx >= 2.5', 'astunparse', 'sympy<=1.9', 'pyyaml', 'ply', 'websockets', 'jinja2', + 'numpy', 'networkx >= 2.5', 'astunparse', 'sympy >= 1.9', 'pyyaml', 'ply', 'websockets', 'jinja2', 'fparser >= 0.1.3', 'aenum >= 3.1', 'dataclasses; python_version < "3.7"', 'dill', 'pyreadline;platform_system=="Windows"', 'typing-compat; python_version < "3.8"' ] + cmake_requires, diff --git a/tests/codegen/cuda_memcopy_test.py b/tests/codegen/cuda_memcopy_test.py new file mode 100644 index 0000000000..a10f57eecd --- /dev/null +++ b/tests/codegen/cuda_memcopy_test.py @@ -0,0 +1,84 @@ +""" Tests code generation for array copy on GPU target. """ +import dace +from dace.transformation.auto import auto_optimize + +import pytest +import re + +# this test requires cupy module +cp = pytest.importorskip("cupy") + +# initialize random number generator +rng = cp.random.default_rng(42) + + +@pytest.mark.gpu +def test_gpu_shared_to_global_1D(): + M = 32 + N = dace.symbol('N') + + @dace.program + def transpose_shared_to_global(A: dace.float64[M, N], B: dace.float64[N, M]): + for i in dace.map[0:N]: + local_gather = dace.define_local([M], A.dtype, storage=dace.StorageType.GPU_Shared) + for j in dace.map[0:M]: + local_gather[j] = A[j, i] + B[i, :] = local_gather + + + sdfg = transpose_shared_to_global.to_sdfg() + auto_optimize.apply_gpu_storage(sdfg) + + size_M = M + size_N = 128 + + A = rng.random((size_M, size_N,)) + B = rng.random((size_N, size_M,)) + + ref = A.transpose() + + sdfg(A, B, N=size_N) + cp.allclose(ref, B) + + code = sdfg.generate_code()[1].clean_code # Get GPU code (second file) + m = re.search('dace::SharedToGlobal1D<.+>::Copy', code) + assert m is not None + + +@pytest.mark.gpu +def test_gpu_shared_to_global_1D_accumulate(): + M = 32 + N = dace.symbol('N') + + @dace.program + def transpose_and_add_shared_to_global(A: dace.float64[M, N], B: dace.float64[N, M]): + for i in dace.map[0:N]: + local_gather = dace.define_local([M], A.dtype, storage=dace.StorageType.GPU_Shared) + for j in dace.map[0:M]: + local_gather[j] = A[j, i] + local_gather[:] >> B(M, lambda x, y: x + y)[i, :] + + + sdfg = transpose_and_add_shared_to_global.to_sdfg() + auto_optimize.apply_gpu_storage(sdfg) + + size_M = M + size_N = 128 + + A = rng.random((size_M, size_N,)) + B = rng.random((size_N, size_M,)) + + ref = A.transpose() + B + + sdfg(A, B, N=size_N) + cp.allclose(ref, B) + + code = sdfg.generate_code()[1].clean_code # Get GPU code (second file) + m = re.search('dace::SharedToGlobal1D<.+>::template Accum', code) + assert m is not None + + +if __name__ == '__main__': + test_gpu_shared_to_global_1D() + test_gpu_shared_to_global_1D_accumulate() + diff --git a/tests/passes/access_ranges_test.py b/tests/passes/access_ranges_test.py new file mode 100644 index 0000000000..263cb2243d --- /dev/null +++ b/tests/passes/access_ranges_test.py @@ -0,0 +1,61 @@ +# Copyright 2019-2023 ETH Zurich and the DaCe authors. All rights reserved. +""" Tests the AccessRanges analysis pass. """ +import dace +from dace.transformation.passes.analysis import AccessRanges +import numpy as np + +N = dace.symbol('N') + + +def test_simple(): + + @dace.program + def tester(A: dace.float64[N, N], B: dace.float64[20, 20]): + for i, j in dace.map[0:20, 0:N]: + A[i, j] = 1 + + sdfg = tester.to_sdfg(simplify=True) + ranges = AccessRanges().apply_pass(sdfg, {}) + assert len(ranges) == 1 # Only one SDFG + ranges = ranges[0] + assert len(ranges) == 1 # Only one array is accessed + + # Construct write memlet + memlet = dace.Memlet('A[0:20, 0:N]') + memlet._is_data_src = False + + assert ranges['A'] == {memlet} + + +def test_simple_ranges(): + + @dace.program + def tester(A: dace.float64[N, N], B: dace.float64[20, 20]): + A[:, :] = 0 + A[1:21, 1:21] = B + A[0, 0] += 1 + + sdfg = tester.to_sdfg(simplify=True) + ranges = AccessRanges().apply_pass(sdfg, {}) + assert len(ranges) == 1 # Only one SDFG + ranges = ranges[0] + assert len(ranges) == 2 # Two arrays are accessed + + assert len(ranges['B']) == 1 + assert next(iter(ranges['B'])).src_subset == dace.subsets.Range([(0, 19, 1), (0, 19, 1)]) + + # Construct read/write memlets + memlet1 = dace.Memlet('A[0:N, 0:N]') + memlet1._is_data_src = False + memlet2 = dace.Memlet('A[1:21, 1:21] -> 0:20, 0:20') + memlet2._is_data_src = False + memlet3 = dace.Memlet('A[0, 0]') + memlet4 = dace.Memlet('A[0, 0]') + memlet4._is_data_src = False + + assert ranges['A'] == {memlet1, memlet2, memlet3, memlet4} + + +if __name__ == '__main__': + test_simple() + test_simple_ranges() diff --git a/tests/sdfg/reference_test.py b/tests/sdfg/reference_test.py index 3f2cfb685c..f1e605e315 100644 --- a/tests/sdfg/reference_test.py +++ b/tests/sdfg/reference_test.py @@ -1,10 +1,11 @@ -# Copyright 2019-2022 ETH Zurich and the DaCe authors. All rights reserved. +# Copyright 2019-2023 ETH Zurich and the DaCe authors. All rights reserved. """ Tests the use of Reference data descriptors. """ import dace +from dace.transformation.passes.analysis import FindReferenceSources import numpy as np -def test_reference_branch(): +def _create_branch_sdfg(): sdfg = dace.SDFG('refbranch') sdfg.add_array('A', [20], dace.float64) sdfg.add_array('B', [20], dace.float64) @@ -29,6 +30,11 @@ def test_reference_branch(): r = finish.add_read('ref') w = finish.add_write('out') finish.add_nedge(r, w, dace.Memlet('ref')) + return sdfg + + +def test_reference_branch(): + sdfg = _create_branch_sdfg() A = np.random.rand(20) B = np.random.rand(20) @@ -41,5 +47,16 @@ def test_reference_branch(): assert np.allclose(out, A) +def test_reference_sources_pass(): + sdfg = _create_branch_sdfg() + sources = FindReferenceSources().apply_pass(sdfg, {}) + assert len(sources) == 1 # There is only one SDFG + sources = sources[0] + assert len(sources) == 1 and 'ref' in sources # There is one reference + sources = sources['ref'] + assert sources == {dace.Memlet('A[0:20]', volume=1), dace.Memlet('B[0:20]', volume=1)} + + if __name__ == '__main__': test_reference_branch() + test_reference_sources_pass()