From 1beaaadd972a4b85be22e93092fbfd2a714dcdb7 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:23:11 -0500 Subject: [PATCH 01/14] abstract functionality of pyop2 types to be overloaded by backends --- pyop2/compilation.py | 2 +- pyop2/global_kernel.py | 60 +++++++++-------------------------------- pyop2/parloop.py | 61 ++++++++++++++---------------------------- pyop2/types/dat.py | 59 +++++++++++++++++++++++++--------------- pyop2/types/dataset.py | 6 +++++ pyop2/types/glob.py | 27 +++++++++++-------- pyop2/types/map.py | 3 ++- pyop2/types/set.py | 20 ++++++++------ 8 files changed, 106 insertions(+), 132 deletions(-) diff --git a/pyop2/compilation.py b/pyop2/compilation.py index f1967166f..1d6813ab5 100644 --- a/pyop2/compilation.py +++ b/pyop2/compilation.py @@ -610,7 +610,7 @@ def load(jitmodule, extension, fn_name, cppargs=(), ldargs=(), :kwarg comm: Optional communicator to compile the code on (only rank 0 compiles code) (defaults to COMM_WORLD). """ - from pyop2.global_kernel import GlobalKernel + from pyop2.global_kernel import AbstractGlobalKernel as GlobalKernel if isinstance(jitmodule, str): class StrCode(object): diff --git a/pyop2/global_kernel.py b/pyop2/global_kernel.py index 2e7339d21..1ec05c0ea 100644 --- a/pyop2/global_kernel.py +++ b/pyop2/global_kernel.py @@ -1,20 +1,18 @@ import collections.abc import ctypes +import abc from dataclasses import dataclass import itertools -import os from typing import Optional, Tuple -import loopy as lp -from petsc4py import PETSc import numpy as np -from pyop2 import compilation, mpi +from pyop2 import mpi from pyop2.caching import Cached from pyop2.configuration import configuration from pyop2.datatypes import IntType, as_ctypes from pyop2.types import IterationRegion -from pyop2.utils import cached_property, get_petsc_dir +from pyop2.utils import cached_property # We set eq=False to force identity-based hashing. This is required for when @@ -203,7 +201,7 @@ def pack(self): return MatPack -class GlobalKernel(Cached): +class AbstractGlobalKernel(Cached, abc.ABC): """Class representing the generated code for the global computation. :param local_kernel: :class:`pyop2.LocalKernel` instance representing the @@ -321,48 +319,6 @@ def builder(self): builder.add_argument(arg) return builder - @cached_property - def code_to_compile(self): - """Return the C/C++ source code as a string.""" - from pyop2.codegen.rep2loopy import generate - - wrapper = generate(self.builder) - code = lp.generate_code_v2(wrapper) - - if self.local_kernel.cpp: - from loopy.codegen.result import process_preambles - preamble = "".join(process_preambles(getattr(code, "device_preambles", []))) - device_code = "\n\n".join(str(dp.ast) for dp in code.device_programs) - return preamble + "\nextern \"C\" {\n" + device_code + "\n}\n" - return code.device_code() - - @PETSc.Log.EventDecorator() - @mpi.collective - def compile(self, comm): - """Compile the kernel. - - :arg comm: The communicator the compilation is collective over. - :returns: A ctypes function pointer for the compiled function. - """ - extension = "cpp" if self.local_kernel.cpp else "c" - cppargs = ( - tuple("-I%s/include" % d for d in get_petsc_dir()) - + tuple("-I%s" % d for d in self.local_kernel.include_dirs) - + ("-I%s" % os.path.abspath(os.path.dirname(__file__)),) - ) - ldargs = ( - tuple("-L%s/lib" % d for d in get_petsc_dir()) - + tuple("-Wl,-rpath,%s/lib" % d for d in get_petsc_dir()) - + ("-lpetsc", "-lm") - + tuple(self.local_kernel.ldargs) - ) - - return compilation.load(self, extension, self.name, - cppargs=cppargs, - ldargs=ldargs, - restype=ctypes.c_int, - comm=comm) - @cached_property def argtypes(self): """Return the ctypes datatypes of the compiled function.""" @@ -383,3 +339,11 @@ def num_flops(self, iterset): elif region not in {IterationRegion.TOP, IterationRegion.BOTTOM}: size = layers - 1 return size * self.local_kernel.num_flops + + @abc.abstractproperty + def code_to_compile(self): + """Return the C/C++ source code as a string.""" + + @abc.abstractmethod + def compile(self): + pass diff --git a/pyop2/parloop.py b/pyop2/parloop.py index 8384268cf..658dd5958 100644 --- a/pyop2/parloop.py +++ b/pyop2/parloop.py @@ -13,7 +13,7 @@ from pyop2.datatypes import as_numpy_dtype from pyop2.exceptions import KernelTypeError, MapValueError, SetTypeError from pyop2.global_kernel import (GlobalKernelArg, DatKernelArg, MixedDatKernelArg, - MatKernelArg, MixedMatKernelArg, GlobalKernel) + MatKernelArg, MixedMatKernelArg) from pyop2.local_kernel import LocalKernel, CStringLocalKernel, CoffeeLocalKernel, LoopyLocalKernel from pyop2.types import (Access, Global, Dat, DatView, MixedDat, Mat, Set, MixedSet, ExtrudedSet, Subset, Map, MixedMap) @@ -122,7 +122,7 @@ def map_kernel_args(self): return tuple(itertools.chain(*itertools.product(rmap._kernel_args_, cmap._kernel_args_))) -class Parloop: +class AbstractParloop: """A parallel loop invocation. :arg global_knl: The :class:`GlobalKernel` to be executed. @@ -221,7 +221,7 @@ def __call__(self): def zero_global_increments(self): """Zero any global increments every time the loop is executed.""" for g in self.reduced_globals.keys(): - g._data[...] = 0 + g.data[...] = 0 def replace_lgmaps(self): """Swap out any lgmaps for any :class:`MatParloopArg` instances @@ -345,38 +345,13 @@ def _l2g_idxs(self): seen.add(pl_arg.data) return tuple(indices) - @PETSc.Log.EventDecorator("ParLoopRednBegin") - @mpi.collective + @abc.abstractmethod def reduction_begin(self): """Begin reductions.""" - requests = [] - for idx in self._reduction_idxs: - glob = self.arguments[idx].data - mpi_op = {Access.INC: mpi.MPI.SUM, - Access.MIN: mpi.MPI.MIN, - Access.MAX: mpi.MPI.MAX}.get(self.accesses[idx]) - - if mpi.MPI.VERSION >= 3: - requests.append(self.comm.Iallreduce(glob._data, glob._buf, op=mpi_op)) - else: - self.comm.Allreduce(glob._data, glob._buf, op=mpi_op) - return tuple(requests) - - @PETSc.Log.EventDecorator("ParLoopRednEnd") - @mpi.collective + + @abc.abstractmethod def reduction_end(self, requests): """Finish reductions.""" - if mpi.MPI.VERSION >= 3: - for idx, req in zip(self._reduction_idxs, requests): - req.Wait() - glob = self.arguments[idx].data - glob._data[:] = glob._buf - else: - assert len(requests) == 0 - - for idx in self._reduction_idxs: - glob = self.arguments[idx].data - glob._data[:] = glob._buf @cached_property def _reduction_idxs(self): @@ -388,7 +363,7 @@ def _reduction_idxs(self): def finalize_global_increments(self): """Finalise global increments.""" for tmp, glob in self.reduced_globals.items(): - glob.data._data += tmp._data + glob.data.data[:] += tmp.data_ro @mpi.collective def update_arg_data_state(self): @@ -446,11 +421,12 @@ def prepare_reduced_globals(cls, arguments, global_knl): :class:`Global` in parallel produces the right result. The same is not needed for MAX and MIN because they commute with the reduction. """ + from pyop2.op2 import compute_backend arguments = list(arguments) reduced_globals = {} for i, (lk_arg, gk_arg, pl_arg) in enumerate(cls.zip_arguments(global_knl, arguments)): if isinstance(gk_arg, GlobalKernelArg) and lk_arg.access == Access.INC: - tmp = Global(gk_arg.dim, data=np.zeros_like(pl_arg.data.data_ro), dtype=lk_arg.dtype) + tmp = compute_backend.Global(gk_arg.dim, data=np.zeros_like(pl_arg.data._data), dtype=lk_arg.dtype) reduced_globals[tmp] = pl_arg arguments[i] = GlobalParloopArg(tmp) @@ -606,6 +582,7 @@ def LegacyParloop(local_knl, iterset, *args, **kwargs): raise SetTypeError("Iteration set is of the wrong type") # finish building the local kernel + from pyop2.op2 import compute_backend local_knl.accesses = tuple(a.access for a in args) if isinstance(local_knl, (CStringLocalKernel, CoffeeLocalKernel)): local_knl.dtypes = tuple(a.data.dtype for a in args) @@ -614,14 +591,14 @@ def LegacyParloop(local_knl, iterset, *args, **kwargs): extruded = iterset._extruded constant_layers = extruded and iterset.constant_layers subset = isinstance(iterset, Subset) - global_knl = GlobalKernel(local_knl, global_knl_args, - extruded=extruded, - constant_layers=constant_layers, - subset=subset, - **kwargs) + global_knl = compute_backend.GlobalKernel(local_knl, global_knl_args, + extruded=extruded, + constant_layers=constant_layers, + subset=subset, + **kwargs) parloop_args = tuple(a.parloop_arg for a in args) - return Parloop(global_knl, iterset, parloop_args) + return compute_backend.Parloop(global_knl, iterset, parloop_args) def par_loop(*args, **kwargs): @@ -635,8 +612,10 @@ def parloop(knl, *args, **kwargs): For a description of the possible arguments to this function see :class:`Parloop` and :func:`LegacyParloop`. """ - if isinstance(knl, GlobalKernel): - Parloop(knl, *args, **kwargs)() + from pyop2.global_kernel import AbstractGlobalKernel + if isinstance(knl, AbstractGlobalKernel): + from pyop2.op2 import compute_backend + compute_backend.Parloop(knl, *args, **kwargs)() elif isinstance(knl, LocalKernel): LegacyParloop(knl, *args, **kwargs)() else: diff --git a/pyop2/types/dat.py b/pyop2/types/dat.py index bb65db77e..f5f35ce89 100644 --- a/pyop2/types/dat.py +++ b/pyop2/types/dat.py @@ -1,4 +1,3 @@ -import abc import contextlib import ctypes import itertools @@ -15,13 +14,14 @@ mpi, utils ) +from pyop2.offload_utils import OffloadMixin from pyop2.types.access import Access -from pyop2.types.dataset import DataSet, GlobalDataSet, MixedDataSet +from pyop2.types.dataset import DataSet, GlobalDataSet from pyop2.types.data_carrier import DataCarrier, EmptyDataMixin, VecAccessMixin from pyop2.types.set import ExtrudedSet, GlobalSet, Set -class AbstractDat(DataCarrier, EmptyDataMixin, abc.ABC): +class AbstractDat(DataCarrier, EmptyDataMixin): """OP2 vector data. A :class:`Dat` holds values on every element of a :class:`DataSet`.o @@ -342,12 +342,11 @@ def _op_kernel(self, op, globalp, dtype): return self._op_kernel_cache.setdefault(key, Kernel(knl, name)) def _op(self, other, op): - from pyop2.types.glob import Global + from pyop2.op2 import compute_backend from pyop2.parloop import parloop - - ret = Dat(self.dataset, None, self.dtype) + ret = compute_backend.Dat(self.dataset, None, self.dtype) if np.isscalar(other): - other = Global(1, data=other) + other = compute_backend.Global(1, data=other) globalp = True else: self._check_shape(other) @@ -391,12 +390,12 @@ def _iop_kernel(self, op, globalp, other_is_self, dtype): return self._iop_kernel_cache.setdefault(key, Kernel(knl, name)) def _iop(self, other, op): + from pyop2.op2 import compute_backend from pyop2.parloop import parloop - from pyop2.types.glob import Global globalp = False if np.isscalar(other): - other = Global(1, data=other) + other = compute_backend.Global(1, data=other) globalp = True elif other is not self: self._check_shape(other) @@ -440,10 +439,10 @@ def inner(self, other): """ from pyop2.parloop import parloop - from pyop2.types.glob import Global + from pyop2.op2 import compute_backend self._check_shape(other) - ret = Global(1, data=0, dtype=self.dtype) + ret = compute_backend.Global(1, data=0, dtype=self.dtype) parloop(self._inner_kernel(other.dtype), self.dataset.set, self(Access.READ), other(Access.READ), ret(Access.INC)) return ret.data_ro[0] @@ -459,7 +458,8 @@ def norm(self): return sqrt(self.inner(self).real) def __pos__(self): - pos = Dat(self) + from pyop2.op2 import compute_backend + pos = compute_backend.Dat(self) return pos def __add__(self, other): @@ -492,8 +492,9 @@ def _neg_kernel(self): def __neg__(self): from pyop2.parloop import parloop + from pyop2.op2 import compute_backend - neg = Dat(self.dataset, dtype=self.dtype) + neg = compute_backend.Dat(self.dataset, dtype=self.dtype) parloop(self._neg_kernel, self.dataset.set, neg(Access.WRITE), self(Access.READ)) return neg @@ -670,7 +671,11 @@ def data_ro_with_halos(self): return full[idx] -class Dat(AbstractDat, VecAccessMixin): +class Dat(AbstractDat, VecAccessMixin, OffloadMixin): + + @utils.cached_property + def can_be_represented_as_petscvec(self): + return ((self.dtype == PETSc.ScalarType) and self.cdim > 0) def __init__(self, *args, **kwargs): AbstractDat.__init__(self, *args, **kwargs) @@ -681,14 +686,17 @@ def __init__(self, *args, **kwargs): @utils.cached_property def _vec(self): assert self.dtype == PETSc.ScalarType, \ - "Can't create Vec with type %s, must be %s" % (self.dtype, PETSc.ScalarType) + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) # Can't duplicate layout_vec of dataset, because we then # carry around extra unnecessary data. # But use getSizes to save an Allreduce in computing the # global size. size = self.dataset.layout_vec.getSizes() data = self._data[:size[0]] - return PETSc.Vec().createWithArray(data, size=size, bsize=self.cdim, comm=self.comm) + vec = PETSc.Vec().createWithArray(data, size=size, + bsize=self.cdim, comm=self.comm) + return vec @contextlib.contextmanager def vec_context(self, access): @@ -717,12 +725,13 @@ class MixedDat(AbstractDat, VecAccessMixin): def __init__(self, mdset_or_dats): from pyop2.types.glob import Global + from pyop2.op2 import compute_backend def what(x): if isinstance(x, (Global, GlobalDataSet, GlobalSet)): - return Global + return compute_backend.Global elif isinstance(x, (Dat, DataSet, Set)): - return Dat + return compute_backend.Dat else: raise ex.DataSetTypeError("Huh?!") if isinstance(mdset_or_dats, MixedDat): @@ -771,7 +780,8 @@ def split(self): @utils.cached_property def dataset(self): r""":class:`MixedDataSet`\s this :class:`MixedDat` is defined on.""" - return MixedDataSet(tuple(s.dataset for s in self._dats)) + from pyop2.op2 import compute_backend + return compute_backend.MixedDataSet(tuple(s.dataset for s in self._dats)) @utils.cached_property def _data(self): @@ -907,6 +917,7 @@ def inner(self, other): return ret def _op(self, other, op): + from pyop2.op2 import compute_backend ret = [] if np.isscalar(other): for s in self: @@ -915,7 +926,7 @@ def _op(self, other, op): self._check_shape(other) for s, o in zip(self, other): ret.append(op(s, o)) - return MixedDat(ret) + return compute_backend.MixedDat(ret) def _iop(self, other, op): if np.isscalar(other): @@ -928,16 +939,20 @@ def _iop(self, other, op): return self def __pos__(self): + from pyop2.op2 import compute_backend + ret = [] for s in self: ret.append(s.__pos__()) - return MixedDat(ret) + return compute_backend.MixedDat(ret) def __neg__(self): + from pyop2.op2 import compute_backend + ret = [] for s in self: ret.append(s.__neg__()) - return MixedDat(ret) + return compute_backend.MixedDat(ret) def __add__(self, other): """Pointwise addition of fields.""" diff --git a/pyop2/types/dataset.py b/pyop2/types/dataset.py index 635b130e3..202080d0f 100644 --- a/pyop2/types/dataset.py +++ b/pyop2/types/dataset.py @@ -182,9 +182,11 @@ def local_ises(self): @utils.cached_property def layout_vec(self): """A PETSc Vec compatible with the dof layout of this DataSet.""" + from pyop2.op2 import compute_backend vec = PETSc.Vec().create(comm=self.comm) size = (self.size * self.cdim, None) vec.setSizes(size, bsize=self.cdim) + vec.setType(compute_backend.PETScVecType) vec.setUp() return vec @@ -311,9 +313,11 @@ def local_ises(self): @utils.cached_property def layout_vec(self): """A PETSc Vec compatible with the dof layout of this DataSet.""" + from pyop2.op2 import compute_backend vec = PETSc.Vec().create(comm=self.comm) size = (self.size * self.cdim, None) vec.setSizes(size, bsize=self.cdim) + vec.setType(compute_backend.PETScVecType) vec.setUp() return vec @@ -452,10 +456,12 @@ def __repr__(self): @utils.cached_property def layout_vec(self): """A PETSc Vec compatible with the dof layout of this MixedDataSet.""" + from pyop2.op2 import compute_backend vec = PETSc.Vec().create(comm=self.comm) # Compute local and global size from sizes of layout vecs lsize, gsize = map(sum, zip(*(d.layout_vec.sizes for d in self))) vec.setSizes((lsize, gsize), bsize=1) + vec.setType(compute_backend.PETScVecType) vec.setUp() return vec diff --git a/pyop2/types/glob.py b/pyop2/types/glob.py index 05fa0b4f5..7472a435d 100644 --- a/pyop2/types/glob.py +++ b/pyop2/types/glob.py @@ -10,12 +10,12 @@ mpi, utils ) +from pyop2.offload_utils import OffloadMixin from pyop2.types.access import Access -from pyop2.types.dataset import GlobalDataSet from pyop2.types.data_carrier import DataCarrier, EmptyDataMixin, VecAccessMixin -class Global(DataCarrier, EmptyDataMixin, VecAccessMixin): +class Global(DataCarrier, EmptyDataMixin, VecAccessMixin, OffloadMixin): """OP2 global value. @@ -90,15 +90,16 @@ def __getitem__(self, idx): def __str__(self): return "OP2 Global Argument: %s with dim %s and value %s" \ - % (self._name, self._dim, self._data) + % (self._name, self._dim, self.data_ro) def __repr__(self): - return "Global(%r, %r, %r, %r)" % (self._dim, self._data, - self._data.dtype, self._name) + return "Global(%r, %r, %r, %r)" % (self._dim, self.data_ro, + self.data.dtype, self._name) @utils.cached_property def dataset(self): - return GlobalDataSet(self) + from pyop2.op2 import compute_backend + return compute_backend.GlobalDataSet(self) @property def shape(self): @@ -266,7 +267,8 @@ def inner(self, other): @utils.cached_property def _vec(self): assert self.dtype == PETSc.ScalarType, \ - "Can't create Vec with type %s, must be %s" % (self.dtype, PETSc.ScalarType) + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) # Can't duplicate layout_vec of dataset, because we then # carry around extra unnecessary data. # But use getSizes to save an Allreduce in computing the @@ -288,7 +290,10 @@ def vec_context(self, access): """A context manager for a :class:`PETSc.Vec` from a :class:`Global`. :param access: Access descriptor: READ, WRITE, or RW.""" - yield self._vec - if access is not Access.READ: - data = self._data - self.comm.Bcast(data, 0) + raise NotImplementedError() + + def ensure_availability_on_host(self): + raise NotImplementedError() + + def ensure_availability_on_device(self): + raise NotImplementedError() diff --git a/pyop2/types/map.py b/pyop2/types/map.py index 5bb955380..aa8ec45d0 100644 --- a/pyop2/types/map.py +++ b/pyop2/types/map.py @@ -10,10 +10,11 @@ exceptions as ex, utils ) +from pyop2.offload_utils import OffloadMixin from pyop2.types.set import GlobalSet, MixedSet, Set -class Map: +class Map(OffloadMixin): """OP2 map, a relation between two :class:`Set` objects. diff --git a/pyop2/types/set.py b/pyop2/types/set.py index 42ce266f9..e73d0e9a7 100644 --- a/pyop2/types/set.py +++ b/pyop2/types/set.py @@ -11,6 +11,7 @@ mpi, utils ) +from pyop2.offload_utils import OffloadMixin class Set: @@ -156,7 +157,9 @@ def __call__(self, *indices): indices = indices[0] if np.isscalar(indices): indices = [indices] - return Subset(self, indices) + + from pyop2.op2 import compute_backend + return compute_backend.Subset(self, indices) def __contains__(self, dset): """Indicate whether a given DataSet is compatible with this Set.""" @@ -168,8 +171,8 @@ def __contains__(self, dset): def __pow__(self, e): """Derive a :class:`DataSet` with dimension ``e``""" - from pyop2.types import DataSet - return DataSet(self, dim=e) + from pyop2.op2 import compute_backend + return compute_backend.DataSet(self, dim=e) @utils.cached_property def layers(self): @@ -281,7 +284,7 @@ def __hash__(self): return hash(type(self)) -class ExtrudedSet(Set): +class ExtrudedSet(Set, OffloadMixin): """OP2 ExtrudedSet. @@ -369,7 +372,7 @@ def layers_array(self): return self._layers -class Subset(ExtrudedSet): +class Subset(ExtrudedSet, OffloadMixin): """OP2 subset. @@ -441,7 +444,8 @@ def __call__(self, *indices): indices = indices[0] if np.isscalar(indices): indices = [indices] - return Subset(self, indices) + from pyop2.op2 import compute_backend + return compute_backend.Subset(self, indices) @utils.cached_property def superset(self): @@ -613,8 +617,8 @@ def __len__(self): def __pow__(self, e): """Derive a :class:`MixedDataSet` with dimensions ``e``""" - from pyop2.types import MixedDataSet - return MixedDataSet(self._sets, e) + from pyop2.op2 import compute_backend + return compute_backend.MixedDataSet(self._sets, e) def __str__(self): return "OP2 MixedSet composed of Sets: %s" % (self._sets,) From 0031b5dcbfcf7817acc95fb895ad7023ab55fabc Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:25:16 -0500 Subject: [PATCH 02/14] generalize loopy codegen to allow OpenCL/CUDA targets --- pyop2/codegen/rep2loopy.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/pyop2/codegen/rep2loopy.py b/pyop2/codegen/rep2loopy.py index d01c6ee76..75182e716 100644 --- a/pyop2/codegen/rep2loopy.py +++ b/pyop2/codegen/rep2loopy.py @@ -399,7 +399,7 @@ def bounds(exprs): return dict((op, (names[op], dep)) for op, dep in deps.items()) -def generate(builder, wrapper_name=None): +def generate(builder, wrapper_name=None, include_math=True, include_petsc=True, include_complex=True): if builder.layer_index is not None: outer_inames = frozenset([builder._loop_index.name, builder.layer_index.name]) @@ -546,7 +546,16 @@ def renamer(expr): # register kernel kernel = builder.kernel headers = set(kernel.headers) - headers = headers | set(["#include ", "#include ", "#include "]) + + if include_math: + headers.add("#include ") + + if include_petsc: + headers.add("#include ") + + if include_complex: + headers.add("#include ") + if PETSc.Log.isActive(): headers = headers | set(["#include "]) preamble = "\n".join(sorted(headers)) @@ -621,15 +630,22 @@ def statement_assign(expr, context): if isinstance(lvalue, Indexed): context.index_ordering.append(tuple(i.name for i in lvalue.index_ordering())) lvalue, rvalue = tuple(expression(c, context.parameters) for c in expr.children) - within_inames = context.within_inames[expr] + if isinstance(expr.label, (PreUnpackInst, UnpackInst)): + tag = "scatter" + elif isinstance(expr.label, PackInst): + tag = "gather" + else: + raise NotImplementedError() + within_inames = context.within_inames[expr] id, depends_on = context.instruction_dependencies[expr] predicates = frozenset(context.conditions) return loopy.Assignment(lvalue, rvalue, within_inames=within_inames, within_inames_is_final=True, predicates=predicates, id=id, - depends_on=depends_on, depends_on_is_final=True) + depends_on=depends_on, depends_on_is_final=True, + tags=frozenset([tag])) @statement.register(FunctionCall) From a11eccff21902e08051eb336b8c37a46fff3ace8 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:26:17 -0500 Subject: [PATCH 03/14] defines an AbstractComputeBackend type --- pyop2/backends/__init__.py | 43 ++++++++++++++++++++++++++++++++++++++ setup.py | 2 +- 2 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 pyop2/backends/__init__.py diff --git a/pyop2/backends/__init__.py b/pyop2/backends/__init__.py new file mode 100644 index 000000000..680ad9d3a --- /dev/null +++ b/pyop2/backends/__init__.py @@ -0,0 +1,43 @@ +class _not_implemented: # noqa + """Not Implemented""" + + +class AbstractComputeBackend: + """ + Abstract class to record all the backend specific implementation of + :mod:`pyop2`'s data structures. + """ + GlobalKernel = _not_implemented + Parloop = _not_implemented + Set = _not_implemented + ExtrudedSet = _not_implemented + MixedSet = _not_implemented + Subset = _not_implemented + DataSet = _not_implemented + MixedDataSet = _not_implemented + Map = _not_implemented + MixedMap = _not_implemented + Dat = _not_implemented + MixedDat = _not_implemented + DatView = _not_implemented + Mat = _not_implemented + Global = _not_implemented + GlobalDataSet = _not_implemented + PETScVecType = _not_implemented + + def __getattribute__(self, key): + val = super().__getattribute__(key) + if val is _not_implemented: + raise NotImplementedError(f"'{val}' is not implemented for backend" + f" '{type(self).__name__}'.") + return val + + def turn_on_offloading(self): + raise NotImplementedError() + + def turn_off_offloading(self): + raise NotImplementedError() + + @property + def cache_key(self): + raise NotImplementedError() diff --git a/setup.py b/setup.py index 32a20fa16..8b03748e6 100644 --- a/setup.py +++ b/setup.py @@ -147,7 +147,7 @@ def run(self): install_requires=install_requires, dependency_links=dep_links, test_requires=test_requires, - packages=['pyop2', 'pyop2.codegen', 'pyop2.types'], + packages=['pyop2', 'pyop2.backends', 'pyop2.codegen', 'pyop2.types'], package_data={ 'pyop2': ['assets/*', '*.h', '*.pxd', '*.pyx', 'codegen/c/*.c']}, scripts=glob('scripts/*'), From 7fcc40ab85377f1cffccd5c7ec24caa03327829c Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:28:10 -0500 Subject: [PATCH 04/14] implements CPU backend --- pyop2/backends/cpu.py | 219 ++++++++++++++++++++++++++++++++++++++++++ pyop2/op2.py | 14 ++- 2 files changed, 230 insertions(+), 3 deletions(-) create mode 100644 pyop2/backends/cpu.py diff --git a/pyop2/backends/cpu.py b/pyop2/backends/cpu.py new file mode 100644 index 000000000..ce79c5262 --- /dev/null +++ b/pyop2/backends/cpu.py @@ -0,0 +1,219 @@ +from pyop2.types.dat import Dat as BaseDat, MixedDat, DatView +from pyop2.types.set import Set, ExtrudedSet, Subset, MixedSet +from pyop2.types.dataset import DataSet, GlobalDataSet, MixedDataSet +from pyop2.types.map import Map, MixedMap +from pyop2.parloop import AbstractParloop +from pyop2.global_kernel import AbstractGlobalKernel +from pyop2.types.access import READ, INC, MIN, MAX +from pyop2.types.mat import Mat +from pyop2.types.glob import Global as BaseGlobal +from pyop2.backends import AbstractComputeBackend +from petsc4py import PETSc +from pyop2 import ( + compilation, + mpi, + utils +) + +import ctypes +import os +import loopy as lp +from contextlib import contextmanager +import numpy as np + + +class Dat(BaseDat): + @utils.cached_property + def _vec(self): + assert self.dtype == PETSc.ScalarType, \ + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) + # Can't duplicate layout_vec of dataset, because we then + # carry around extra unnecessary data. + # But use getSizes to save an Allreduce in computing the + # global size. + size = self.dataset.layout_vec.getSizes() + data = self._data[:size[0]] + vec = PETSc.Vec().createWithArray(data, size=size, + bsize=self.cdim, comm=self.comm) + return vec + + @contextmanager + def vec_context(self, access): + # PETSc Vecs have a state counter and cache norm computations + # to return immediately if the state counter is unchanged. + # Since we've updated the data behind their back, we need to + # change that state counter. + self._vec.stateIncrease() + yield self._vec + if access is not READ: + self.halo_valid = False + + def ensure_availability_on_device(self): + from pyop2.op2 import compute_backend + assert compute_backend is cpu_backend + # data transfer is noop for CPU backend + + def ensure_availability_on_host(self): + from pyop2.op2 import compute_backend + assert compute_backend is cpu_backend + # data transfer is noop for CPU backend + + +class Global(BaseGlobal): + @utils.cached_property + def _vec(self): + assert self.dtype == PETSc.ScalarType, \ + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) + # Can't duplicate layout_vec of dataset, because we then + # carry around extra unnecessary data. + # But use getSizes to save an Allreduce in computing the + # global size. + data = self._data + size = self.dataset.layout_vec.getSizes() + if self.comm.rank == 0: + return PETSc.Vec().createWithArray(data, size=size, + bsize=self.cdim, + comm=self.comm) + else: + return PETSc.Vec().createWithArray(np.empty(0, dtype=self.dtype), + size=size, + bsize=self.cdim, + comm=self.comm) + + @contextmanager + def vec_context(self, access): + """A context manager for a :class:`PETSc.Vec` from a :class:`Global`. + + :param access: Access descriptor: READ, WRITE, or RW.""" + yield self._vec + if access is not READ: + data = self._data + self.comm.Bcast(data, 0) + + def ensure_availability_on_device(self): + from pyop2.op2 import compute_backend + assert compute_backend is cpu_backend + # data transfer is noop for CPU backend + + def ensure_availability_on_host(self): + from pyop2.op2 import compute_backend + assert compute_backend is cpu_backend + # data transfer is noop for CPU backend + + +class GlobalKernel(AbstractGlobalKernel): + + @utils.cached_property + def code_to_compile(self): + """Return the C/C++ source code as a string.""" + from pyop2.codegen.rep2loopy import generate + + wrapper = generate(self.builder) + code = lp.generate_code_v2(wrapper) + + if self.local_kernel.cpp: + from loopy.codegen.result import process_preambles + preamble = "".join( + process_preambles(getattr(code, "device_preambles", []))) + device_code = "\n\n".join(str(dp.ast) for dp in code.device_programs) + return preamble + '\nextern "C" {\n' + device_code + "\n}\n" + return code.device_code() + + @PETSc.Log.EventDecorator() + @mpi.collective + def compile(self, comm): + """Compile the kernel. + + :arg comm: The communicator the compilation is collective over. + :returns: A ctypes function pointer for the compiled function. + """ + extension = "cpp" if self.local_kernel.cpp else "c" + cppargs = ( + tuple("-I%s/include" % d for d in utils.get_petsc_dir()) + + tuple("-I%s" % d for d in self.local_kernel.include_dirs) + + ("-I%s" % os.path.abspath(os.path.dirname(__file__)),) + ) + ldargs = ( + tuple("-L%s/lib" % d for d in utils.get_petsc_dir()) + + tuple("-Wl,-rpath,%s/lib" % d for d in utils.get_petsc_dir()) + + ("-lpetsc", "-lm") + + tuple(self.local_kernel.ldargs) + ) + + return compilation.load(self, extension, self.name, + cppargs=cppargs, + ldargs=ldargs, + restype=ctypes.c_int, + comm=comm) + + +class Parloop(AbstractParloop): + @PETSc.Log.EventDecorator("ParLoopRednBegin") + @mpi.collective + def reduction_begin(self): + """Begin reductions.""" + requests = [] + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + mpi_op = {INC: mpi.MPI.SUM, + MIN: mpi.MPI.MIN, + MAX: mpi.MPI.MAX}.get(self.accesses[idx]) + + if mpi.MPI.VERSION >= 3: + requests.append(self.comm.Iallreduce(glob._data, + glob._buf, + op=mpi_op)) + else: + self.comm.Allreduce(glob._data, glob._buf, op=mpi_op) + return tuple(requests) + + @PETSc.Log.EventDecorator("ParLoopRednEnd") + @mpi.collective + def reduction_end(self, requests): + """Finish reductions.""" + if mpi.MPI.VERSION >= 3: + mpi.MPI.Request.Waitall(requests) + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + else: + assert len(requests) == 0 + + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + + +class CPUBackend(AbstractComputeBackend): + GlobalKernel = GlobalKernel + Parloop = Parloop + Set = Set + ExtrudedSet = ExtrudedSet + MixedSet = MixedSet + Subset = Subset + DataSet = DataSet + MixedDataSet = MixedDataSet + Map = Map + MixedMap = MixedMap + Dat = Dat + MixedDat = MixedDat + DatView = DatView + Mat = Mat + Global = Global + GlobalDataSet = GlobalDataSet + PETScVecType = PETSc.Vec.Type.STANDARD + + def turn_on_offloading(self): + pass + + def turn_off_offloading(self): + pass + + @property + def cache_key(self): + return (type(self),) + + +cpu_backend = CPUBackend() diff --git a/pyop2/op2.py b/pyop2/op2.py index f4e9be0f9..a496e3763 100644 --- a/pyop2/op2.py +++ b/pyop2/op2.py @@ -50,12 +50,15 @@ from pyop2.local_kernel import CStringLocalKernel, LoopyLocalKernel, CoffeeLocalKernel, Kernel # noqa: F401 from pyop2.global_kernel import (GlobalKernelArg, DatKernelArg, MixedDatKernelArg, # noqa: F401 - MatKernelArg, MixedMatKernelArg, MapKernelArg, GlobalKernel) + MatKernelArg, MixedMatKernelArg, MapKernelArg) from pyop2.parloop import (GlobalParloopArg, DatParloopArg, MixedDatParloopArg, # noqa: F401 - MatParloopArg, MixedMatParloopArg, Parloop, parloop, par_loop) + MatParloopArg, MixedMatParloopArg, AbstractParloop, + parloop, par_loop) from pyop2.parloop import (GlobalLegacyArg, DatLegacyArg, MixedDatLegacyArg, # noqa: F401 MatLegacyArg, MixedMatLegacyArg, LegacyParloop, ParLoop) +from pyop2.backends.cpu import cpu_backend + import loopy __all__ = ['configuration', 'READ', 'WRITE', 'RW', 'INC', 'MIN', 'MAX', @@ -64,7 +67,7 @@ 'set_log_level', 'MPI', 'init', 'exit', 'Kernel', 'Set', 'ExtrudedSet', 'MixedSet', 'Subset', 'DataSet', 'GlobalDataSet', 'MixedDataSet', 'Halo', 'Dat', 'MixedDat', 'Mat', 'Global', 'Map', 'MixedMap', - 'Sparsity', 'parloop', 'Parloop', 'ParLoop', 'par_loop', + 'Sparsity', 'parloop', 'AbstractParloop', 'ParLoop', 'par_loop', 'DatView', 'PermutedMap'] @@ -121,3 +124,8 @@ def exit(): configuration.reset() global _initialised _initialised = False + + +compute_backend = cpu_backend + +GlobalKernel = cpu_backend.GlobalKernel From 911a88e33a19f2dae1fc726091e3b4b422ce9f73 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:27:16 -0500 Subject: [PATCH 05/14] defines offloading helper types --- pyop2/offload_utils.py | 71 ++++++++++++++++++++++++++++++++++++++++++ pyop2/op2.py | 3 +- 2 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 pyop2/offload_utils.py diff --git a/pyop2/offload_utils.py b/pyop2/offload_utils.py new file mode 100644 index 000000000..a60ee5c47 --- /dev/null +++ b/pyop2/offload_utils.py @@ -0,0 +1,71 @@ +from enum import IntEnum +from contextlib import contextmanager + + +class DataAvailability(IntEnum): + """ + Indicates whether the device or host contains valid data. + """ + AVAILABLE_ON_HOST_ONLY = 1 + AVAILABLE_ON_DEVICE_ONLY = 2 + AVAILABLE_ON_BOTH = 3 + + +class OffloadMixin: + def get_availability(self): + raise NotImplementedError + + def ensure_availability_on_host(self): + raise NotImplementedError + + def ensure_availaibility_on_device(self): + raise NotImplementedError + + def is_available_on_host(self): + return bool(self.get_availability() & AVAILABLE_ON_HOST_ONLY) + + def is_available_on_device(self): + return bool(self.get_availability() & AVAILABLE_ON_DEVICE_ONLY) + + +AVAILABLE_ON_HOST_ONLY = DataAvailability.AVAILABLE_ON_HOST_ONLY +AVAILABLE_ON_DEVICE_ONLY = DataAvailability.AVAILABLE_ON_DEVICE_ONLY +AVAILABLE_ON_BOTH = DataAvailability.AVAILABLE_ON_BOTH + + +def set_offloading_backend(backend): + """ + Sets a backend for offloading operations to. + + By default the operations are executed on the host, to mark operations for + offloading wrap them within a :func:`~pyop2.offload_utils.offloading` + context. + + :arg backend: An instance of :class:`pyop2.backends.AbstractComputeBackend`. + + .. warning:: + + * Must be called before any instance of PyOP2 type is allocated. + * Calling :func:`set_offloading_bacckend` different values of *backend* + over the course of the program is an undefined behavior. (i.e. + preferably avoided) + """ + from pyop2 import op2 + from pyop2.backends import AbstractComputeBackend + assert isinstance(backend, AbstractComputeBackend) + op2.compute_backend = backend + + +@contextmanager +def offloading(): + """ + Operations (such as manipulating a :class:`~pyop2.types.Dat`, calling a + :class:`~pyop2.global_kernel.GlobalKernel`, etc) within the offloading + region will be executed on backend as selected via + :func:`set_offloading_backend`. + """ + from pyop2 import op2 + op2.compute_backend.turn_on_offloading() + yield + op2.compute_backend.turn_off_offloading() + return diff --git a/pyop2/op2.py b/pyop2/op2.py index a496e3763..f10631ac0 100644 --- a/pyop2/op2.py +++ b/pyop2/op2.py @@ -58,6 +58,7 @@ MatLegacyArg, MixedMatLegacyArg, LegacyParloop, ParLoop) from pyop2.backends.cpu import cpu_backend +from pyop2.offload_utils import set_offloading_backend, offloading import loopy @@ -68,7 +69,7 @@ 'MixedSet', 'Subset', 'DataSet', 'GlobalDataSet', 'MixedDataSet', 'Halo', 'Dat', 'MixedDat', 'Mat', 'Global', 'Map', 'MixedMap', 'Sparsity', 'parloop', 'AbstractParloop', 'ParLoop', 'par_loop', - 'DatView', 'PermutedMap'] + 'DatView', 'PermutedMap', 'set_offloading_backend', 'offloading'] _initialised = False From 1438aa569a4863459dae36319181d6bed27067eb Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:27:41 -0500 Subject: [PATCH 06/14] implement GPU codegen helpers --- pyop2/compilation.py | 116 ++++++++++++++++++++++++++++++---- pyop2/configuration.py | 10 ++- pyop2/transforms/__init__.py | 0 pyop2/transforms/gpu_utils.py | 94 +++++++++++++++++++++++++++ pyop2/transforms/snpt.py | 50 +++++++++++++++ setup.py | 3 +- 6 files changed, 258 insertions(+), 15 deletions(-) create mode 100644 pyop2/transforms/__init__.py create mode 100644 pyop2/transforms/gpu_utils.py create mode 100644 pyop2/transforms/snpt.py diff --git a/pyop2/compilation.py b/pyop2/compilation.py index 1d6813ab5..4716615fd 100644 --- a/pyop2/compilation.py +++ b/pyop2/compilation.py @@ -195,6 +195,29 @@ def compilation_comm(comm): return retcomm +def _check_src_hashes(comm, global_kernel): + hsh = md5(str(global_kernel.cache_key[1:]).encode()) + basename = hsh.hexdigest() + dirpart, basename = basename[:2], basename[2:] + cachedir = configuration["cache_dir"] + cachedir = os.path.join(cachedir, dirpart) + + if configuration["check_src_hashes"] or configuration["debug"]: + matching = comm.allreduce(basename, op=_check_op) + if matching != basename: + # Dump all src code to disk for debugging + output = os.path.join(cachedir, "mismatching-kernels") + srcfile = os.path.join(output, "src-rank%d.c" % comm.rank) + if comm.rank == 0: + os.makedirs(output, exist_ok=True) + comm.barrier() + with open(srcfile, "w") as f: + f.write(global_kernel.code_to_compile) + comm.barrier() + raise CompilationError("Generated code differs across ranks" + f" (see output in {output})") + + class Compiler(ABC): """A compiler for shared libraries. @@ -352,19 +375,8 @@ def get_so(self, jitmodule, extension): # atomically (avoiding races). tmpname = os.path.join(cachedir, "%s_p%d.so.tmp" % (basename, pid)) - if configuration['check_src_hashes'] or configuration['debug']: - matching = self.comm.allreduce(basename, op=_check_op) - if matching != basename: - # Dump all src code to disk for debugging - output = os.path.join(configuration["cache_dir"], "mismatching-kernels") - srcfile = os.path.join(output, "src-rank%d.c" % self.comm.rank) - if self.comm.rank == 0: - os.makedirs(output, exist_ok=True) - self.comm.barrier() - with open(srcfile, "w") as f: - f.write(jitmodule.code_to_compile) - self.comm.barrier() - raise CompilationError("Generated code differs across ranks (see output in %s)" % output) + _check_src_hashes(self.comm, jitmodule) + try: # Are we in the cache? return ctypes.CDLL(soname) @@ -696,3 +708,81 @@ def clear_cache(prompt=False): shutil.rmtree(cachedir) else: print("Not removing cached libraries") + + +def _get_code_to_compile(comm, global_kernel): + # Determine cache key + hsh = md5(str(global_kernel.cache_key[1:]).encode()) + basename = hsh.hexdigest() + cachedir = configuration["cache_dir"] + dirpart, basename = basename[:2], basename[2:] + cachedir = os.path.join(cachedir, dirpart) + cname = os.path.join(cachedir, f"{basename}_code.cu") + + _check_src_hashes(comm, global_kernel) + + if os.path.isfile(cname): + # Are we in the cache? + with open(cname, "r") as f: + code_to_compile = f.read() + else: + # No, let"s go ahead and build + if comm.rank == 0: + # No need to do this on all ranks + os.makedirs(cachedir, exist_ok=True) + with progress(INFO, "Compiling wrapper"): + # make sure that compiles successfully before writing to file + code_to_compile = global_kernel.code_to_compile + with open(cname, "w") as f: + f.write(code_to_compile) + comm.barrier() + + return code_to_compile + + +@collective +def get_prepared_cuda_function(comm, global_kernel): + from pycuda.compiler import SourceModule + + # Determine cache key + hsh = md5(str(global_kernel.cache_key[1:]).encode()) + basename = hsh.hexdigest() + cachedir = configuration["cache_dir"] + dirpart, basename = basename[:2], basename[2:] + cachedir = os.path.join(cachedir, dirpart) + + nvcc_opts = ["-use_fast_math", "-w"] + + code_to_compile = _get_code_to_compile(comm, global_kernel) + source_module = SourceModule(code_to_compile, options=nvcc_opts, + cache_dir=cachedir) + + cu_func = source_module.get_function(global_kernel.name) + + type_map = {ctypes.c_void_p: "P", ctypes.c_int: "i"} + argtypes = "".join(type_map[t] for t in global_kernel.argtypes) + cu_func.prepare(argtypes) + + return cu_func + + +@collective +def get_opencl_kernel(comm, global_kernel): + import pyopencl as cl + from pyop2.backends.opencl import opencl_backend + cl_ctx = opencl_backend.context + + # Determine cache key + hsh = md5(str(global_kernel.cache_key[1:]).encode()) + basename = hsh.hexdigest() + cachedir = configuration["cache_dir"] + dirpart, basename = basename[:2], basename[2:] + cachedir = os.path.join(cachedir, dirpart) + + code_to_compile = _get_code_to_compile(comm, global_kernel) + + prg = cl.Program(cl_ctx, code_to_compile).build(options=[], + cache_dir=cachedir) + + cl_knl = cl.Kernel(prg, global_kernel.name) + return cl_knl diff --git a/pyop2/configuration.py b/pyop2/configuration.py index 29717718c..a98cad124 100644 --- a/pyop2/configuration.py +++ b/pyop2/configuration.py @@ -74,6 +74,12 @@ class Configuration(dict): cdim > 1 be built as block sparsities, or dof sparsities. The former saves memory but changes which preconditioners are available for the resulting matrices. (Default yes) + :param gpu_strategy: A :class:str` indicating the transformation strategy + that must be applied to a :class:`pyop2.global_kernel.GlobalKernel` + when offloading to a GPGPU. Can be one of: + - ``"snpt"``: Single-"N" Per Thread. In the transform strategy, the + work of each element of the iteration set over which a global kernel + operates is assigned to a work-item (i.e. a CUDA thread) """ # name, env variable, type, default, write once cache_dir = os.path.join(gettempdir(), "pyop2-cache-uid%s" % os.getuid()) @@ -113,7 +119,9 @@ class Configuration(dict): "matnest": ("PYOP2_MATNEST", bool, True), "block_sparsity": - ("PYOP2_BLOCK_SPARSITY", bool, True) + ("PYOP2_BLOCK_SPARSITY", bool, True), + "gpu_strategy": + ("PYOP2_GPU_STRATEGY", str, "snpt"), } """Default values for PyOP2 configuration parameters""" diff --git a/pyop2/transforms/__init__.py b/pyop2/transforms/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pyop2/transforms/gpu_utils.py b/pyop2/transforms/gpu_utils.py new file mode 100644 index 000000000..28cd558e1 --- /dev/null +++ b/pyop2/transforms/gpu_utils.py @@ -0,0 +1,94 @@ +import loopy as lp +from pyop2.configuration import configuration + + +def get_loopy_target(target): + if target == "opencl": + return lp.PyOpenCLTarget() + elif target == "cuda": + return lp.CudaTarget() + else: + raise NotImplementedError() + + +def preprocess_t_unit_for_gpu(t_unit): + + # {{{ inline all kernels in t_unit + + kernels_to_inline = { + name for name, clbl in t_unit.callables_table.items() + if isinstance(clbl, lp.CallableKernel)} + + for knl_name in kernels_to_inline: + t_unit = lp.inline_callable_kernel(t_unit, knl_name) + + # }}} + + kernel = t_unit.default_entrypoint + + # changing the address space of temps + def _change_aspace_tvs(tv): + if tv.read_only: + assert tv.initializer is not None + return tv.copy(address_space=lp.AddressSpace.GLOBAL) + else: + return tv.copy(address_space=lp.AddressSpace.PRIVATE) + + new_tvs = {tv_name: _change_aspace_tvs(tv) for tv_name, tv in + kernel.temporary_variables.items()} + kernel = kernel.copy(temporary_variables=new_tvs) + + def insn_needs_atomic(insn): + # updates to global variables are atomic + import pymbolic + if isinstance(insn, lp.Assignment): + if isinstance(insn.assignee, pymbolic.primitives.Subscript): + assignee_name = insn.assignee.aggregate.name + else: + assert isinstance(insn.assignee, pymbolic.primitives.Variable) + assignee_name = insn.assignee.name + + if assignee_name in kernel.arg_dict: + return assignee_name in insn.read_dependency_names() + return False + + new_insns = [] + args_marked_for_atomic = set() + for insn in kernel.instructions: + if insn_needs_atomic(insn): + atomicity = (lp.AtomicUpdate(insn.assignee.aggregate.name), ) + insn = insn.copy(atomicity=atomicity) + args_marked_for_atomic |= set([insn.assignee.aggregate.name]) + + new_insns.append(insn) + + # label args as atomic + new_args = [] + for arg in kernel.args: + if arg.name in args_marked_for_atomic: + new_args.append(arg.copy(for_atomic=True)) + else: + new_args.append(arg) + + kernel = kernel.copy(instructions=new_insns, args=new_args) + + return t_unit.with_kernel(kernel) + + +def apply_gpu_transforms(t_unit, target): + t_unit = t_unit.copy(target=get_loopy_target(target)) + t_unit = preprocess_t_unit_for_gpu(t_unit) + kernel = t_unit.default_entrypoint + transform_strategy = configuration["gpu_strategy"] + + kernel = lp.assume(kernel, "end > start") + + if transform_strategy == "snpt": + from pyop2.transforms.snpt import split_n_across_workgroups + kernel, args_to_make_global = split_n_across_workgroups(kernel, 32) + else: + raise NotImplementedError(f"'{transform_strategy}' transform strategy.") + + t_unit = t_unit.with_kernel(kernel) + + return t_unit, args_to_make_global diff --git a/pyop2/transforms/snpt.py b/pyop2/transforms/snpt.py new file mode 100644 index 000000000..23c145f1c --- /dev/null +++ b/pyop2/transforms/snpt.py @@ -0,0 +1,50 @@ +import loopy as lp + + +def _make_tv_array_arg(tv): + assert tv.address_space != lp.AddressSpace.PRIVATE + arg = lp.ArrayArg(name=tv.name, + dtype=tv.dtype, + shape=tv.shape, + dim_tags=tv.dim_tags, + offset=tv.offset, + dim_names=tv.dim_names, + order=tv.order, + alignment=tv.alignment, + address_space=tv.address_space, + is_output=not tv.read_only, + is_input=tv.read_only) + return arg + + +def split_n_across_workgroups(kernel, workgroup_size): + """ + Returns a transformed version of *kernel* with the workload in the loop + with induction variable 'n' distributed across work-groups of size + *workgroup_size* and each work-item in the work-group performing the work + of a single iteration of 'n'. + """ + + kernel = lp.assume(kernel, "start < end") + kernel = lp.split_iname(kernel, "n", workgroup_size, + outer_tag="g.0", inner_tag="l.0") + + # {{{ making consts as globals: necessary to make the strategy emit valid + # kernels for all forms + + old_temps = kernel.temporary_variables.copy() + args_to_make_global = [tv.initializer.flatten() + for tv in old_temps.values() + if tv.initializer is not None] + + new_temps = {tv.name: tv + for tv in old_temps.values() + if tv.initializer is None} + kernel = kernel.copy(args=kernel.args+[_make_tv_array_arg(tv) + for tv in old_temps.values() + if tv.initializer is not None], + temporary_variables=new_temps) + + # }}} + + return kernel, args_to_make_global diff --git a/setup.py b/setup.py index 8b03748e6..fe274d272 100644 --- a/setup.py +++ b/setup.py @@ -147,7 +147,8 @@ def run(self): install_requires=install_requires, dependency_links=dep_links, test_requires=test_requires, - packages=['pyop2', 'pyop2.backends', 'pyop2.codegen', 'pyop2.types'], + packages=['pyop2', 'pyop2.backends', 'pyop2.codegen', 'pyop2.types', + 'pyop2.transforms'], package_data={ 'pyop2': ['assets/*', '*.h', '*.pxd', '*.pyx', 'codegen/c/*.c']}, scripts=glob('scripts/*'), From 16fe65f87473dece83ecc5de7f7d7c8d502196ab Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:30:39 -0500 Subject: [PATCH 07/14] Implements CUDA backend --- pyop2/backends/cuda.py | 663 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 663 insertions(+) create mode 100644 pyop2/backends/cuda.py diff --git a/pyop2/backends/cuda.py b/pyop2/backends/cuda.py new file mode 100644 index 000000000..2d9d67122 --- /dev/null +++ b/pyop2/backends/cuda.py @@ -0,0 +1,663 @@ +"""OP2 CUDA backend.""" + +import os + +from hashlib import md5 +from contextlib import contextmanager + +from pyop2 import compilation, mpi, utils +from pyop2.offload_utils import (AVAILABLE_ON_HOST_ONLY, + AVAILABLE_ON_DEVICE_ONLY, + AVAILABLE_ON_BOTH, + DataAvailability) +from pyop2.configuration import configuration +from pyop2.types.set import (MixedSet, Subset as BaseSubset, + ExtrudedSet as BaseExtrudedSet, + Set) +from pyop2.types.map import Map as BaseMap, MixedMap +from pyop2.types.dat import Dat as BaseDat, MixedDat, DatView +from pyop2.types.dataset import DataSet, GlobalDataSet, MixedDataSet +from pyop2.types.mat import Mat +from pyop2.types.glob import Global as BaseGlobal +from pyop2.types.access import RW, READ, MIN, MAX, INC +from pyop2.parloop import AbstractParloop +from pyop2.global_kernel import AbstractGlobalKernel +from pyop2.backends import AbstractComputeBackend, cpu as cpu_backend +from petsc4py import PETSc + +import numpy +import pycuda.driver as cuda +import pycuda.gpuarray as cuda_np +import loopy as lp +from pytools import memoize_method +from dataclasses import dataclass +from typing import Tuple +import ctypes + + +class Map(BaseMap): + + def __init__(self, *args, **kwargs): + super(Map, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _cuda_values(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cuda_np.to_gpu(self._values) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + self._cuda_values + + def ensure_availability_on_host(self): + # Map once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if cuda_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self._cuda_values.gpudata,) + else: + return super(Map, self)._kernel_args_ + + +class ExtrudedSet(BaseExtrudedSet): + """ + ExtrudedSet for CUDA. + """ + + def __init__(self, *args, **kwargs): + super(ExtrudedSet, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def cuda_layers_array(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cuda_np.to_gpu(self.layers_array) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + self.cuda_layers_array + + def ensure_availability_on_host(self): + # ExtrudedSet once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if cuda_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self.cuda_layers_array.gpudata,) + else: + return super(ExtrudedSet, self)._kernel_args_ + + +class Subset(BaseSubset): + """ + Subset for CUDA. + """ + def __init__(self, *args, **kwargs): + super(Subset, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + def get_availability(self): + return self._availability_flag + + @utils.cached_property + def _cuda_indices(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cuda_np.to_gpu(self._indices) + + def ensure_availability_on_device(self): + self._cuda_indices + + def ensure_availability_on_host(self): + # Subset once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if cuda_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self._cuda_indices.gpudata,) + else: + return super(Subset, self)._kernel_args_ + + +class Dat(BaseDat): + """ + Dat for CUDA. + """ + def __init__(self, *args, **kwargs): + super(Dat, self).__init__(*args, **kwargs) + # _availability_flag: only used when Dat cannot be represented as a + # petscvec; when Dat can be represented as a petscvec the availability + # flag is directly read from the petsc vec. + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _cuda_data(self): + """ + Only used when the Dat's data cannot be represented as a petsc Vec. + """ + self._availability_flag = AVAILABLE_ON_BOTH + if self.can_be_represented_as_petscvec: + with self.vec as petscvec: + return cuda_np.GPUArray(shape=self._data.shape, + dtype=self._data.dtype, + gpudata=petscvec.getCUDAHandle("r"), + strides=self._data.strides) + else: + return cuda_np.to_gpu(self._data) + + def zero(self, subset=None): + if subset is None: + self.data[:] = 0*self.data + self.halo_valid = True + else: + raise NotImplementedError + + def copy(self, other, subset=None): + raise NotImplementedError + + @utils.cached_property + def _vec(self): + assert self.dtype == PETSc.ScalarType, \ + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) + # Can't duplicate layout_vec of dataset, because we then + # carry around extra unnecessary data. + # But use getSizes to save an Allreduce in computing the + # global size. + size = self.dataset.layout_vec.getSizes() + data = self._data[:size[0]] + return PETSc.Vec().createCUDAWithArrays(data, size=size, + bsize=self.cdim, + comm=self.comm) + + def get_availability(self): + if self.can_be_represented_as_petscvec: + return DataAvailability(self._vec.getOffloadMask()) + else: + return self._availability_flag + + def ensure_availability_on_device(self): + if self.can_be_represented_as_petscvec: + if not cuda_backend.offloading: + raise NotImplementedError("PETSc limitation: can ensure availability" + " on GPU only within an offloading" + " context.") + # perform a host->device transfer if needed + self._vec.getCUDAHandle("r") + else: + if not self.is_available_on_device(): + self._cuda_data.set(self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + def ensure_availability_on_host(self): + if self.can_be_represented_as_petscvec: + # perform a device->host transfer if needed + self._vec.getArray(readonly=True) + else: + if not self.is_available_on_host(): + self._cuda_data.get(self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + @contextmanager + def vec_context(self, access): + r"""A context manager for a :class:`PETSc.Vec` from a :class:`Dat`. + + :param access: Access descriptor: READ, WRITE, or RW.""" + # PETSc Vecs have a state counter and cache norm computations + # to return immediately if the state counter is unchanged. + # Since we've updated the data behind their back, we need to + # change that state counter. + self._vec.stateIncrease() + + if cuda_backend.offloading: + self.ensure_availability_on_device() + self._vec.bindToCPU(False) + else: + self.ensure_availability_on_host() + self._vec.bindToCPU(True) + + yield self._vec + + if access is not READ: + self.halo_valid = False + + @property + def _kernel_args_(self): + if self.can_be_represented_as_petscvec: + if cuda_backend.offloading: + self.ensure_availability_on_device() + # tell petsc that we have updated the data in the CL buffer + with self.vec as v: + v.restoreCUDAHandle(v.getCUDAHandle()) + return (self._cuda_data.gpudata,) + else: + self.ensure_availability_on_host() + # tell petsc that we have updated the data on the host + with self.vec as v: + v.stateIncrease() + return (self._data.ctypes.data, ) + else: + if cuda_backend.offloading: + self.ensure_availability_on_device() + + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return (self._cuda_data.gpudata, ) + else: + self.ensure_availability_on_host() + + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return (self._data.ctypes.data, ) + + @mpi.collective + @property + def data(self): + if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: + raise RuntimeError("Illegal access: no data associated with this Dat!") + self.halo_valid = False + + if cuda_backend.offloading: + + self.ensure_availability_on_device() + + # {{{ marking data on the host as invalid + + if self.can_be_represented_as_petscvec: + # report to petsc that we are updating values on the device + with self.vec as petscvec: + petscvec.restoreCUDAHandle(petscvec.getCUDAHandle("w")) + else: + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + + # }}} + v = self._cuda_data[:self.dataset.size] + else: + self.ensure_availability_on_host() + v = self._data[:self.dataset.size].view() + v.setflags(write=True) + + # {{{ marking data on the device as invalid + + if self.can_be_represented_as_petscvec: + # report to petsc that we are altering data on the CPU + with self.vec as petscvec: + petscvec.stateIncrease() + else: + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + # }}} + + return v + + @property + @mpi.collective + def data_with_halos(self): + self.global_to_local_begin(RW) + self.global_to_local_end(RW) + return self.data + + @property + @mpi.collective + def data_ro(self): + + if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: + raise RuntimeError("Illegal access: no data associated with this Dat!") + + if cuda_backend.offloading: + self.ensure_availability_on_device() + v = self._cuda_data[:self.dataset.size].view() + # FIXME: PyCUDA doesn't support 'read-only' arrays yet + else: + self.ensure_availability_on_host() + v = self._data[:self.dataset.size].view() + v.setflags(write=False) + + return v + + @property + @mpi.collective + def data_ro_with_halos(self): + self.global_to_local_begin(READ) + self.global_to_local_end(READ) + + if cuda_backend.offloading: + self.ensure_availability_on_device() + v = self._cuda_data.view() + # FIXME: PyCUDA doesn't support 'read-only' arrays yet + else: + self.ensure_availability_on_host() + v = self._data.view() + v.setflags(write=False) + + return v + + +class Global(BaseGlobal): + """ + Global for CUDA. + """ + + def __init__(self, *args, **kwargs): + super(Global, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _cuda_data(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cuda_np.to_gpu(self._data) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + if not self.is_available_on_device(): + self._cuda_data.set(self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + def ensure_availability_on_host(self): + if not self.is_available_on_host(): + self._cuda_data.get(ary=self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + @property + def _kernel_args_(self): + if cuda_backend.offloading: + self.ensure_availability_on_device() + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return (self._cuda_data.gpudata,) + else: + self.ensure_availability_on_host() + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return super(Global, self)._kernel_args_ + + @mpi.collective + @property + def data(self): + + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Global!") + + if cuda_backend.offloading: + self.ensure_availability_on_device() + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return self._cuda_data + else: + self.ensure_availability_on_host() + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return self._data + + @property + def data_ro(self): + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Global!") + + if cuda_backend.offloading: + self.ensure_availability_on_device() + view = self._cuda_data.view() + # FIXME: PyCUDA doesn't support read-only arrays yet + return view + else: + self.ensure_availability_on_host() + view = self._data.view() + view.setflags(write=False) + return view + + @utils.cached_property + def _vec(self): + raise NotImplementedError() + + @contextmanager + def vec_context(self, access): + raise NotImplementedError() + + +@dataclass(frozen=True) +class CUFunctionWithExtraArgs: + """ + A partial :class:`pycuda.driver.Function` with bound arguments *args_to_append* + that are appended to the arguments passed in :meth:`__call__`. + """ + cu_func: cuda.Function + args_to_append: Tuple[cuda_np.GPUArray, ...] + + def __call__(self, grid, block, *args): + if all(grid + block): + self.cu_func.prepared_call(grid, block, + *args, + *[arg_to_append.gpudata + for arg_to_append in self.args_to_append]) + + +class GlobalKernel(AbstractGlobalKernel): + @classmethod + def _cache_key(cls, *args, **kwargs): + key = super()._cache_key(*args, **kwargs) + key = key + (configuration["gpu_strategy"], "cuda") + return key + + @utils.cached_property + def encoded_cache_key(self): + return md5(str(self.cache_key[1:]).encode()).hexdigest() + + @utils.cached_property + def computational_grid_expr_cache_file_path(self): + cachedir = configuration["cache_dir"] + return os.path.join(cachedir, f"{self.encoded_cache_key}_grid_params.py") + + @utils.cached_property + def extra_args_cache_file_path(self): + cachedir = configuration["cache_dir"] + return os.path.join(cachedir, f"{self.encoded_cache_key}_extra_args.npz") + + @memoize_method + def get_grid_size(self, start, end): + """ + Returns a :class:`tuple` of the form ``(grid, block)``, where *grid* is + the 2-:class:`tuple` corresponding to the CUDA computational grid size + and ``block`` is the thread block size. Refer to + `CUDA docs `_ + for a detailed explanation of these terms. + """ # noqa: E501 + fpath = self.computational_grid_expr_cache_file_path + + with open(fpath, "r") as f: + globals_dict = {} + exec(f.read(), globals_dict) + get_grid_sizes = globals_dict["get_grid_sizes"] + + grid, block = get_grid_sizes(start=start, end=end) + + # TODO: PyCUDA doesn't allow numpy int's for grid dimensions. Remove + # these casts after PyCUDA has been patched. + return (tuple(int(dim) for dim in grid), + tuple(int(dim) for dim in block)) + + @memoize_method + def get_extra_args(self): + """ + Returns device buffers corresponding to array literals baked into + :attr:`local_kernel`. + """ + fpath = self.extra_args_cache_file_path + npzfile = numpy.load(fpath) + assert npzfile["ids"].ndim == 1 + assert len(npzfile.files) == len(npzfile["ids"]) + 1 + extra_args_np = [npzfile[arg_id] + for arg_id in npzfile["ids"]] + + return tuple([cuda_np.to_gpu(arg_np) for arg_np in extra_args_np]) + + @utils.cached_property + def argtypes(self): + result = super().argtypes + return result + (ctypes.c_voidp,) * len(self.get_extra_args()) + + @utils.cached_property + def code_to_compile(self): + from pyop2.codegen.rep2loopy import generate + from pyop2.transforms.gpu_utils import apply_gpu_transforms + from pymbolic.interop.ast import to_evaluatable_python_function + + t_unit = generate(self.builder, + include_math=False, + include_petsc=False, + include_complex=False) + + # Make temporary variables with initializers kernel's arguments. + t_unit, extra_args = apply_gpu_transforms(t_unit, "cuda") + + ary_ids = [f"_op2_arg_{i}" + for i in range(len(extra_args))] + + numpy.savez(self.extra_args_cache_file_path, + ids=numpy.array(ary_ids), + **{ary_id: extra_arg + for ary_id, extra_arg in zip(ary_ids, extra_args)}) + + # {{{ save python code to get grid sizes + + with open(self.computational_grid_expr_cache_file_path, "w") as f: + glens, llens = (t_unit + .default_entrypoint + .get_grid_size_upper_bounds_as_exprs(t_unit + .callables_table)) + glens = glens + (1,) * (2 - len(glens)) # cuda expects 2d grid shape + llens = llens + (1,) * (3 - len(llens)) # cuda expects 3d block shape + f.write(to_evaluatable_python_function((glens, llens), "get_grid_sizes")) + + # }}} + + code = lp.generate_code_v2(t_unit).device_code() + + return code + + @mpi.collective + def __call__(self, comm, *args): + key = id(comm) + try: + func = self._func_cache[key] + except KeyError: + func = self.compile(comm) + self._func_cache[key] = func + + grid, block = self.get_grid_size(args[0], args[1]) + func(grid, block, *args) + + @mpi.collective + def compile(self, comm): + cu_func = compilation.get_prepared_cuda_function(comm, + self) + return CUFunctionWithExtraArgs(cu_func, self.get_extra_args()) + + +class Parloop(AbstractParloop): + @PETSc.Log.EventDecorator("ParLoopRednBegin") + @mpi.collective + def reduction_begin(self): + """Begin reductions.""" + requests = [] + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + mpi_op = {INC: mpi.MPI.SUM, + MIN: mpi.MPI.MIN, + MAX: mpi.MPI.MAX}.get(self.accesses[idx]) + + if mpi.MPI.VERSION >= 3: + # FIXME: Get rid of this D2H for CUDA-Aware MPI. + glob.ensure_availability_on_host() + requests.append(self.comm.Iallreduce(glob._data, + glob._buf, + op=mpi_op)) + else: + # FIXME: Get rid of this D2H for CUDA-Aware MPI. + glob.ensure_availability_on_host() + self.comm.Allreduce(glob._data, glob._buf, op=mpi_op) + return tuple(requests) + + @PETSc.Log.EventDecorator("ParLoopRednEnd") + @mpi.collective + def reduction_end(self, requests): + """Finish reductions.""" + if mpi.MPI.VERSION >= 3: + mpi.MPI.Request.Waitall(requests) + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + glob._availability_flag = AVAILABLE_ON_HOST_ONLY + # FIXME: Get rid of this H2D for CUDA-Aware MPI. + glob.ensure_availability_on_device() + else: + assert len(requests) == 0 + + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + glob._availability_flag = AVAILABLE_ON_HOST_ONLY + # FIXME: Get rid of this H2D for CUDA-Aware MPI. + glob.ensure_availability_on_device() + + +class CUDABackend(AbstractComputeBackend): + Parloop_offloading = Parloop + Parloop_no_offloading = cpu_backend.Parloop + GlobalKernel_offloading = GlobalKernel + GlobalKernel_no_offloading = cpu_backend.GlobalKernel + + Parloop = cpu_backend.Parloop + GlobalKernel = cpu_backend.GlobalKernel + Set = Set + ExtrudedSet = ExtrudedSet + MixedSet = MixedSet + Subset = Subset + DataSet = DataSet + MixedDataSet = MixedDataSet + Map = Map + MixedMap = MixedMap + Dat = Dat + MixedDat = MixedDat + DatView = DatView + Mat = Mat + Global = Global + GlobalDataSet = GlobalDataSet + PETScVecType = PETSc.Vec.Type.CUDA + + def __init__(self): + self.offloading = False + + def turn_on_offloading(self): + self.offloading = True + self.Parloop = self.Parloop_offloading + self.GlobalKernel = self.GlobalKernel_offloading + + def turn_off_offloading(self): + self.offloading = False + self.Parloop = self.Parloop_no_offloading + self.GlobalKernel = self.GlobalKernel_no_offloading + + @property + def cache_key(self): + return (type(self), self.offloading) + + +cuda_backend = CUDABackend() From 4639a81067b92824e71598facb6f0be342505c15 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Mon, 11 Jul 2022 21:30:48 -0500 Subject: [PATCH 08/14] Implements OpenCL backend --- pyop2/backends/opencl.py | 681 +++++++++++++++++++++++++++++++++++++++ pyop2/offload_utils.py | 2 + 2 files changed, 683 insertions(+) create mode 100644 pyop2/backends/opencl.py diff --git a/pyop2/backends/opencl.py b/pyop2/backends/opencl.py new file mode 100644 index 000000000..a041860cb --- /dev/null +++ b/pyop2/backends/opencl.py @@ -0,0 +1,681 @@ +"""OP2 OpenCL backend.""" + +import os +from hashlib import md5 +from contextlib import contextmanager + +from pyop2 import compilation, mpi, utils +from pyop2.offload_utils import (AVAILABLE_ON_HOST_ONLY, + AVAILABLE_ON_DEVICE_ONLY, + AVAILABLE_ON_BOTH, + DataAvailability) +from pyop2.configuration import configuration +from pyop2.types.set import (MixedSet, Subset as BaseSubset, + ExtrudedSet as BaseExtrudedSet, + Set) +from pyop2.types.map import Map as BaseMap, MixedMap +from pyop2.types.dat import Dat as BaseDat, MixedDat, DatView +from pyop2.types.dataset import DataSet, GlobalDataSet, MixedDataSet +from pyop2.types.mat import Mat +from pyop2.types.glob import Global as BaseGlobal +from pyop2.types.access import RW, READ, MIN, MAX, INC +from pyop2.parloop import AbstractParloop +from pyop2.global_kernel import AbstractGlobalKernel +from pyop2.backends import AbstractComputeBackend, cpu as cpu_backend +from petsc4py import PETSc + +import numpy +import loopy as lp +from pytools import memoize_method +from dataclasses import dataclass +from typing import Tuple +import ctypes + +import pyopencl as cl +import pyopencl.array as cla + + +def read_only_clarray_setitem(self, *args, **kwargs): + # emulates np.ndarray.setitem for numpy arrays with read-only flags + raise ValueError("assignment destination is read-only") + + +class Map(BaseMap): + + def __init__(self, *args, **kwargs): + super(Map, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _opencl_values(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cla.to_device(opencl_backend.queue, self._values) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + self._opencl_values + + def ensure_availability_on_host(self): + # Map once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if opencl_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self._opencl_values.data,) + else: + return super(Map, self)._kernel_args_ + + +class ExtrudedSet(BaseExtrudedSet): + """ + ExtrudedSet for OpenCL. + """ + + def __init__(self, *args, **kwargs): + super(ExtrudedSet, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def opencl_layers_array(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cla.to_device(opencl_backend.queue, self.layers_array) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + self.opencl_layers_array + + def ensure_availability_on_host(self): + # ExtrudedSet once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if opencl_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self.opencl_layers_array.data,) + else: + return super(ExtrudedSet, self)._kernel_args_ + + +class Subset(BaseSubset): + """ + Subset for OpenCL. + """ + def __init__(self, *args, **kwargs): + super(Subset, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + def get_availability(self): + return self._availability_flag + + @utils.cached_property + def _opencl_indices(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cla.to_device(opencl_backend.queue, self._indices) + + def ensure_availability_on_device(self): + self._opencl_indices + + def ensure_availability_on_host(self): + # Subset once initialized is not over-written so always available + # on host. + pass + + @property + def _kernel_args_(self): + if opencl_backend.offloading: + if not self.is_available_on_device(): + self.ensure_availability_on_device() + + return (self._opencl_indices.data,) + else: + return super(Subset, self)._kernel_args_ + + +class Dat(BaseDat): + """ + Dat for OpenCL. + """ + def __init__(self, *args, **kwargs): + super(Dat, self).__init__(*args, **kwargs) + # _availability_flag: only used when Dat cannot be represented as a + # petscvec; when Dat can be represented as a petscvec the availability + # flag is directly read from the petsc vec. + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _opencl_data(self): + """ + Only used when the Dat's data cannot be represented as a petsc Vec. + """ + self._availability_flag = AVAILABLE_ON_BOTH + if self.can_be_represented_as_petscvec: + with self.vec as petscvec: + return cla.Array(cq=opencl_backend.queue, + shape=self._data.shape, + dtype=self._data.dtype, + data=cl.Buffer.from_int_ptr( + petscvec.getCLMemHandle("r"), + retain=False), + strides=self._data.strides) + else: + return cla.to_device(opencl_backend.queue, self._data) + + def zero(self, subset=None): + if subset is None: + self.data[:] = 0 + self.halo_valid = True + else: + raise NotImplementedError + + def copy(self, other, subset=None): + raise NotImplementedError + + @utils.cached_property + def _vec(self): + assert self.dtype == PETSc.ScalarType, \ + "Can't create Vec with type %s, must be %s" % (self.dtype, + PETSc.ScalarType) + # Can't duplicate layout_vec of dataset, because we then + # carry around extra unnecessary data. + # But use getSizes to save an Allreduce in computing the + # global size. + size = self.dataset.layout_vec.getSizes() + data = self._data[:size[0]] + return PETSc.Vec().createViennaCLWithArrays(data, size=size, + bsize=self.cdim, + comm=self.comm) + + def get_availability(self): + if self.can_be_represented_as_petscvec: + return DataAvailability(self._vec.getOffloadMask()) + else: + return self._availability_flag + + def ensure_availability_on_device(self): + if self.can_be_represented_as_petscvec: + if not opencl_backend.offloading: + raise NotImplementedError("PETSc limitation: can ensure availability" + " on GPU only within an offloading" + " context.") + # perform a host->device transfer if needed + self._vec.getCLMemHandle("r") + else: + if not self.is_available_on_device(): + self._opencl_data.set(self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + def ensure_availability_on_host(self): + if self.can_be_represented_as_petscvec: + # perform a device->host transfer if needed + self._vec.getArray(readonly=True) + else: + if not self.is_available_on_host(): + self._opencl_data.get(opencl_backend.queue, self._data) + self._availability_flag = AVAILABLE_ON_BOTH + + @contextmanager + def vec_context(self, access): + r"""A context manager for a :class:`PETSc.Vec` from a :class:`Dat`. + + :param access: Access descriptor: READ, WRITE, or RW.""" + # PETSc Vecs have a state counter and cache norm computations + # to return immediately if the state counter is unchanged. + # Since we've updated the data behind their back, we need to + # change that state counter. + self._vec.stateIncrease() + + if opencl_backend.offloading: + self.ensure_availability_on_device() + self._vec.bindToCPU(False) + else: + self.ensure_availability_on_host() + self._vec.bindToCPU(True) + + yield self._vec + + if access is not READ: + self.halo_valid = False + + @property + def _kernel_args_(self): + if self.can_be_represented_as_petscvec: + if opencl_backend.offloading: + self.ensure_availability_on_device() + # tell petsc that we have updated the data in the CL buffer + with self.vec as v: + v.getCLMemHandle() + v.restoreCLMemHandle() + + return (self._opencl_data.data,) + else: + self.ensure_availability_on_host() + # tell petsc that we have updated the data on the host + with self.vec as v: + v.stateIncrease() + return (self._data.ctypes.data, ) + else: + if opencl_backend.offloading: + self.ensure_availability_on_device() + + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return (self._opencl_data.data, ) + else: + self.ensure_availability_on_host() + + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return (self._data.ctypes.data, ) + + @mpi.collective + @property + def data(self): + if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: + raise RuntimeError("Illegal access: no data associated with this Dat!") + + self.halo_valid = False + + if opencl_backend.offloading: + + self.ensure_availability_on_device() + + # {{{ marking data on the host as invalid + + if self.can_be_represented_as_petscvec: + # report to petsc that we are updating values on the device + with self.vec as petscvec: + petscvec.getCLMemHandle("w") + petscvec.restoreCLMemHandle() + else: + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + + # }}} + v = self._opencl_data[:self.dataset.size] + else: + self.ensure_availability_on_host() + v = self._data[:self.dataset.size].view() + v.setflags(write=True) + + # {{{ marking data on the device as invalid + + if self.can_be_represented_as_petscvec: + # report to petsc that we are altering data on the CPU + with self.vec as petscvec: + petscvec.stateIncrease() + else: + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + # }}} + + return v + + @property + @mpi.collective + def data_with_halos(self): + self.global_to_local_begin(RW) + self.global_to_local_end(RW) + return self.data + + @property + @mpi.collective + def data_ro(self): + + if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: + raise RuntimeError("Illegal access: no data associated with this Dat!") + + self.halo_valid = False + + if opencl_backend.offloading: + self.ensure_availability_on_device() + v = self._opencl_data[:self.dataset.size].view() + v.setitem = read_only_clarray_setitem + else: + self.ensure_availability_on_host() + v = self._data[:self.dataset.size].view() + v.setflags(write=False) + + return v + + @property + @mpi.collective + def data_ro_with_halos(self): + self.global_to_local_begin(READ) + self.global_to_local_end(READ) + + if opencl_backend.offloading: + self.ensure_availability_on_device() + v = self._opencl_data.view() + v.setitem = read_only_clarray_setitem + else: + self.ensure_availability_on_host() + v = self._data.view() + v.setflags(write=False) + + return v + + +class Global(BaseGlobal): + """ + Global for OpenCL. + """ + + def __init__(self, *args, **kwargs): + super(Global, self).__init__(*args, **kwargs) + self._availability_flag = AVAILABLE_ON_HOST_ONLY + + @utils.cached_property + def _opencl_data(self): + self._availability_flag = AVAILABLE_ON_BOTH + return cla.to_device(opencl_backend.queue, self._data) + + def get_availability(self): + return self._availability_flag + + def ensure_availability_on_device(self): + if not self.is_available_on_device(): + self._opencl_data.set(ary=self._data, + queue=opencl_backend.queue) + self._availability_flag = AVAILABLE_ON_BOTH + + def ensure_availability_on_host(self): + if not self.is_available_on_host(): + self._opencl_data.get(ary=self._data, + queue=opencl_backend.queue) + self._availability_flag = AVAILABLE_ON_BOTH + + @property + def _kernel_args_(self): + if opencl_backend.offloading: + self.ensure_availability_on_device() + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return (self._opencl_data.data,) + else: + self.ensure_availability_on_host() + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return super(Global, self)._kernel_args_ + + @mpi.collective + @property + def data(self): + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Global!") + + if opencl_backend.offloading: + self.ensure_availability_on_device() + self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + return self._opencl_data + else: + self.ensure_availability_on_host() + self._availability_flag = AVAILABLE_ON_HOST_ONLY + return self._data + + @property + def data_ro(self): + if len(self._data) == 0: + raise RuntimeError("Illegal access: No data associated with" + " this Global!") + + if opencl_backend.offloading: + self.ensure_availability_on_device() + view = self._opencl_data.view() + view.setitem = read_only_clarray_setitem + return view + else: + self.ensure_availability_on_host() + view = self._data.view() + view.setflags(write=False) + return view + + @utils.cached_property + def _vec(self): + raise NotImplementedError() + + @contextmanager + def vec_context(self, access): + raise NotImplementedError() + + +@dataclass(frozen=True) +class CLKernelWithExtraArgs: + cl_kernel: cl.Kernel + args_to_append: Tuple[cla.Array, ...] + + def __call__(self, grid, block, start, end, *args): + if all(grid + block): + self.cl_kernel(opencl_backend.queue, + grid, block, + numpy.int32(start), + numpy.int32(end), + *args, + *[arg_to_append.data + for arg_to_append in self.args_to_append], + g_times_l=True) + + +class GlobalKernel(AbstractGlobalKernel): + @classmethod + def _cache_key(cls, *args, **kwargs): + key = super()._cache_key(*args, **kwargs) + key = key + (configuration["gpu_strategy"], "opencl") + return key + + @utils.cached_property + def encoded_cache_key(self): + return md5(str(self.cache_key[1:]).encode()).hexdigest() + + @utils.cached_property + def computational_grid_expr_cache_file_path(self): + cachedir = configuration["cache_dir"] + return os.path.join(cachedir, f"{self.encoded_cache_key}_grid_params.py") + + @utils.cached_property + def extra_args_cache_file_path(self): + cachedir = configuration["cache_dir"] + return os.path.join(cachedir, f"{self.encoded_cache_key}_extra_args.npz") + + @memoize_method + def get_grid_size(self, start, end): + fpath = self.computational_grid_expr_cache_file_path + + with open(fpath, "r") as f: + globals_dict = {} + exec(f.read(), globals_dict) + get_grid_sizes = globals_dict["get_grid_sizes"] + + return get_grid_sizes(start=start, end=end) + + @memoize_method + def get_extra_args(self): + """ + Returns device buffers corresponding to array literals baked into + :attr:`local_kernel`. + """ + fpath = self.extra_args_cache_file_path + npzfile = numpy.load(fpath) + assert npzfile["ids"].ndim == 1 + assert len(npzfile.files) == len(npzfile["ids"]) + 1 + extra_args_np = [npzfile[arg_id] + for arg_id in npzfile["ids"]] + return tuple(cla.to_device(opencl_backend.queue, arg) + for arg in extra_args_np) + + @utils.cached_property + def argtypes(self): + result = super().argtypes + return result + (ctypes.c_voidp,) * len(self.get_extra_args()) + + @utils.cached_property + def code_to_compile(self): + from pyop2.codegen.rep2loopy import generate + from pyop2.transforms.gpu_utils import apply_gpu_transforms + from pymbolic.interop.ast import to_evaluatable_python_function + + t_unit = generate(self.builder, + include_math=False, + include_petsc=False, + include_complex=False) + + # Make temporary variables with initializers kernel's arguments. + t_unit, extra_args = apply_gpu_transforms(t_unit, "opencl") + + ary_ids = [f"_op2_arg_{i}" + for i in range(len(extra_args))] + + numpy.savez(self.extra_args_cache_file_path, + ids=numpy.array(ary_ids), + **{ary_id: extra_arg + for ary_id, extra_arg in zip(ary_ids, extra_args)}) + + # {{{ save python code to get grid sizes + + with open(self.computational_grid_expr_cache_file_path, "w") as f: + glens, llens = (t_unit + .default_entrypoint + .get_grid_size_upper_bounds_as_exprs(t_unit + .callables_table)) + f.write(to_evaluatable_python_function((glens, llens), "get_grid_sizes")) + + code = lp.generate_code_v2(t_unit).device_code() + + # }}} + + return code + + @mpi.collective + def __call__(self, comm, *args): + key = id(comm) + try: + func = self._func_cache[key] + except KeyError: + func = self.compile(comm) + self._func_cache[key] = func + + grid, block = self.get_grid_size(args[0], args[1]) + func(grid, block, *args) + + @mpi.collective + def compile(self, comm): + cl_knl = compilation.get_opencl_kernel(comm, + self) + return CLKernelWithExtraArgs(cl_knl, self.get_extra_args()) + + +class Parloop(AbstractParloop): + @PETSc.Log.EventDecorator("ParLoopRednBegin") + @mpi.collective + def reduction_begin(self): + """Begin reductions.""" + requests = [] + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + mpi_op = {INC: mpi.MPI.SUM, + MIN: mpi.MPI.MIN, + MAX: mpi.MPI.MAX}.get(self.accesses[idx]) + + if mpi.MPI.VERSION >= 3: + glob.ensure_availability_on_host() + requests.append(self.comm.Iallreduce(glob._data, + glob._buf, + op=mpi_op)) + else: + glob.ensure_availability_on_host() + self.comm.Allreduce(glob._data, glob._buf, op=mpi_op) + return tuple(requests) + + @PETSc.Log.EventDecorator("ParLoopRednEnd") + @mpi.collective + def reduction_end(self, requests): + """Finish reductions.""" + if mpi.MPI.VERSION >= 3: + mpi.MPI.Request.Waitall(requests) + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + glob._availability_flag = AVAILABLE_ON_HOST_ONLY + glob.ensure_availability_on_device() + else: + assert len(requests) == 0 + + for idx in self._reduction_idxs: + glob = self.arguments[idx].data + glob._data[:] = glob._buf + glob._availability_flag = AVAILABLE_ON_HOST_ONLY + glob.ensure_availability_on_device() + + +class OpenCLBackend(AbstractComputeBackend): + Parloop_offloading = Parloop + Parloop_no_offloading = cpu_backend.Parloop + GlobalKernel_offloading = GlobalKernel + GlobalKernel_no_offloading = cpu_backend.GlobalKernel + + Parloop = cpu_backend.Parloop + GlobalKernel = cpu_backend.GlobalKernel + Set = Set + ExtrudedSet = ExtrudedSet + MixedSet = MixedSet + Subset = Subset + DataSet = DataSet + MixedDataSet = MixedDataSet + Map = Map + MixedMap = MixedMap + Dat = Dat + MixedDat = MixedDat + DatView = DatView + Mat = Mat + Global = Global + GlobalDataSet = GlobalDataSet + PETScVecType = PETSc.Vec.Type.VIENNACL + + def __init__(self): + self.offloading = False + + @utils.cached_property + def context(self): + # create a dummy vector and extract its underlying context + x = PETSc.Vec().create(PETSc.COMM_WORLD) + x.setType("viennacl") + x.setSizes(size=1) + ctx_ptr = x.getCLContextHandle() + return cl.Context.from_int_ptr(ctx_ptr, retain=False) + + @utils.cached_property + def queue(self): + # TODO: Instruct the user to pass + # -viennacl_backend opencl + # -viennacl_opencl_device_type gpu + # create a dummy vector and extract its associated command queue + x = PETSc.Vec().create(PETSc.COMM_WORLD) + x.setType("viennacl") + x.setSizes(size=1) + queue_ptr = x.getCLQueueHandle() + return cl.CommandQueue.from_int_ptr(queue_ptr, retain=False) + + def turn_on_offloading(self): + self.offloading = True + self.Parloop = self.Parloop_offloading + self.GlobalKernel = self.GlobalKernel_offloading + + def turn_off_offloading(self): + self.offloading = False + self.Parloop = self.Parloop_no_offloading + self.GlobalKernel = self.GlobalKernel_no_offloading + + @property + def cache_key(self): + return (type(self), self.offloading) + + +opencl_backend = OpenCLBackend() diff --git a/pyop2/offload_utils.py b/pyop2/offload_utils.py index a60ee5c47..7ad1f1c16 100644 --- a/pyop2/offload_utils.py +++ b/pyop2/offload_utils.py @@ -22,9 +22,11 @@ def ensure_availaibility_on_device(self): raise NotImplementedError def is_available_on_host(self): + # bitwise op to detect both AVAILABLE_ON_HOST and AVAILABLE_ON_BOTH return bool(self.get_availability() & AVAILABLE_ON_HOST_ONLY) def is_available_on_device(self): + # bitwise op to detect both AVAILABLE_ON_DEVICE and AVAILABLE_ON_BOTH return bool(self.get_availability() & AVAILABLE_ON_DEVICE_ONLY) From 5d3a8b7276619d5b28c47e0484f832207bd81f81 Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Wed, 13 Jul 2022 13:45:44 -0500 Subject: [PATCH 09/14] adds a CI job: test_opencl --- .github/workflows/ci.yml | 66 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 45df5ed57..2c9f08b71 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -94,3 +94,69 @@ jobs: jekyll: false env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + test_opencl: + runs-on: ubuntu-latest + env: + CC: mpicc + PETSC_DIR: ${{ github.workspace }}/petsc + PETSC_ARCH: default + PETSC_CONFIGURE_OPTIONS: --with-debugging=1 --with-shared-libraries=1 --with-c2html=0 --with-fortran-bindings=0 --with-opencl=1 --download-viennacl --with-viennacl=1 + + steps: + - name: Install system dependencies + shell: bash + run: | + sudo apt update + sudo apt install build-essential mpich libmpich-dev \ + libblas-dev liblapack-dev gfortran + sudo apt install ocl-icd-opencl-dev pocl-opencl-icd + + - name: Set correct Python version + uses: actions/setup-python@v2 + with: + python-version: '3.6' + + - name: Clone PETSc + uses: actions/checkout@v2 + with: + repository: firedrakeproject/petsc + path: ${{ env.PETSC_DIR }} + + - name: Build and install PETSc + shell: bash + working-directory: ${{ env.PETSC_DIR }} + run: | + ./configure ${PETSC_CONFIGURE_OPTIONS} + make + + - name: Build and install petsc4py + shell: bash + working-directory: ${{ env.PETSC_DIR }}/src/binding/petsc4py + run: | + python -m pip install --upgrade cython numpy + python -m pip install --no-deps . + + - name: Checkout PyOP2 + uses: actions/checkout@v2 + with: + path: PyOP2 + + - name: Install PyOP2 + shell: bash + working-directory: PyOP2 + run: | + python -m pip install pip==20.2 # pip 20.2 needed for loopy install to work. + + # xargs is used to force installation of requirements in the order we specified. + xargs -l1 python -m pip install < requirements-ext.txt + xargs -l1 python -m pip install < requirements-git.txt + python -m pip install pulp + python -m pip install -U flake8 + python -m pip install pyopencl + python -m pip install . + + - name: Run tests + shell: bash + working-directory: PyOP2 + run: pytest test -v --tb=native From 96f54cec1ca7407227686e29695eeedfa274640d Mon Sep 17 00:00:00 2001 From: Kaushik Kulkarni Date: Wed, 13 Jul 2022 14:08:36 -0500 Subject: [PATCH 10/14] adds test_opencl_offloading --- test/unit/test_opencl_offloading.py | 119 ++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 test/unit/test_opencl_offloading.py diff --git a/test/unit/test_opencl_offloading.py b/test/unit/test_opencl_offloading.py new file mode 100644 index 000000000..b39f11a1c --- /dev/null +++ b/test/unit/test_opencl_offloading.py @@ -0,0 +1,119 @@ +import pytest +pytest.importorskip("pyopencl") + +import sys +import petsc4py +petsc4py.init(sys.argv + + "-viennacl_backend opencl".split() + + "-viennacl_opencl_device_type cpu".split()) +from pyop2 import op2 +import pyopencl.array as cla +import numpy as np +import loopy as lp +lp.set_caching_enabled(False) + + +def allclose(a, b, rtol=1e-05, atol=1e-08): + """ + Prefer this routine over np.allclose(...) to allow pycuda/pyopencl arrays + """ + return bool(abs(a - b) < (atol + rtol * abs(b))) + + +def pytest_generate_tests(metafunc): + if "backend" in metafunc.fixturenames: + from pyop2.backends.opencl import opencl_backend + metafunc.parametrize("backend", [opencl_backend]) + + +def test_new_backend_raises_not_implemented_error(): + from pyop2.backends import AbstractComputeBackend + unimplemented_backend = AbstractComputeBackend() + + attrs = ["GlobalKernel", "Parloop", "Set", "ExtrudedSet", "MixedSet", + "Subset", "DataSet", "MixedDataSet", "Map", "MixedMap", "Dat", + "MixedDat", "DatView", "Mat", "Global", "GlobalDataSet", + "PETScVecType"] + + for attr in attrs: + with pytest.raises(NotImplementedError): + getattr(unimplemented_backend, attr) + + +def test_dat_with_petscvec_representation(backend): + op2.set_offloading_backend(backend) + + nelems = 9 + data = np.random.rand(nelems) + set_ = op2.compute_backend.Set(nelems) + dset = op2.compute_backend.DataSet(set_, 1) + dat = op2.compute_backend.Dat(dset, data.copy()) + + assert isinstance(dat.data_ro, np.ndarray) + dat.data[:] *= 3 + + with op2.offloading(): + assert isinstance(dat.data_ro, cla.Array) + dat.data[:] *= 2 + + assert isinstance(dat.data_ro, np.ndarray) + np.testing.assert_allclose(dat.data_ro, 6*data) + + +def test_dat_not_as_petscvec(backend): + op2.set_offloading_backend(backend) + + nelems = 9 + data = np.random.randint(low=-10, high=10, + size=nelems, + dtype=np.int64) + set_ = op2.compute_backend.Set(nelems) + dset = op2.compute_backend.DataSet(set_, 1) + dat = op2.compute_backend.Dat(dset, data.copy()) + + assert isinstance(dat.data_ro, np.ndarray) + dat.data[:] *= 3 + + with op2.offloading(): + assert isinstance(dat.data_ro, cla.Array) + dat.data[:] *= 2 + + assert isinstance(dat.data_ro, np.ndarray) + np.testing.assert_allclose(dat.data_ro, 6*data) + + +def test_global_reductions(backend): + op2.set_offloading_backend(backend) + + sum_knl = lp.make_function( + "{ : }", + """ + g[0] = g[0] + x[0] + """, + [lp.GlobalArg("g,x", + dtype="float64", + shape=lp.auto, + )], + name="sum_knl", + target=lp.CWithGNULibcTarget(), + lang_version=(2018, 2)) + + rng = np.random.default_rng() + nelems = 4_000 + data_to_sum = rng.random(nelems) + + with op2.offloading(): + + set_ = op2.compute_backend.Set(4_000) + dset = op2.compute_backend.DataSet(set_, 1) + + dat = op2.compute_backend.Dat(dset, data_to_sum.copy()) + glob = op2.compute_backend.Global(1, 0, np.float64, "g") + + op2.parloop(op2.Kernel(sum_knl, "sum_knl"), + set_, + glob(op2.INC), + dat(op2.READ)) + + assert isinstance(glob.data_ro, cla.Array) + assert allclose(glob.data_ro[0], data_to_sum.sum()) From 2924b35367c0db5b7f4d72b1da80b5358bff90fb Mon Sep 17 00:00:00 2001 From: JDBetteridge Date: Thu, 28 Jul 2022 21:27:18 +0100 Subject: [PATCH 11/14] WIP: New idea --- pyop2/array.py | 102 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 pyop2/array.py diff --git a/pyop2/array.py b/pyop2/array.py new file mode 100644 index 000000000..d6ceb0201 --- /dev/null +++ b/pyop2/array.py @@ -0,0 +1,102 @@ +class Status(enum.Enum): + """where is the up-to-date version?""" + ON_HOST = enum.auto() + ON_DEVICE = enum.auto() + ON_BOTH = enum.auto() + + +""" +e.g. 1 + +d = Dat(...) + +!# this will create a numpy array of zeros + +e.g. 2: + +with offloading(): + d = Dat(...) + +!# this will create a *backend* array of zeros + +e.g. 3: + +parloop(..., target=DEVICE) +""" + + +class Array: + """An array that is available on both host and device, copying values where necessary.""" + def __init__(self, shape, dtype): + self.status = ON_HOST # could change with offloading ctxt mngr + #self._host_data = None + #self._dev_ptr = None + self.initdata(offload) + + def initdata(self, offload=False): + # offloat in {"cpu", "gpu"} + if offload == "cpu": + self._host_data = np.zeros(shape, dtype) + self._dev_ptr = None + elif offload == "gpu": + self._host_data = None + # this if statement is needed because backend is known at the + # start of the program but offload, i.e. where are we currently, is not. + if backend == "opencl": + self._dev_ptr = ... # some incantation or other + elif backend == "cuda": + ... + + def as_petsc_vec(self): + if offload == "cpu": + return PETSc.Vec().createMPI(...) + elif offload == "gpu": + if backend == "cuda": + return PETSc.Vec().createCUDA(..) + ... + + + @property + def data(self): + """numpy array""" + if self._host_data is None: + ... + + if self.status == ON_DEVICE: + self._device2hostcopy() + self.status = ON_HOST + return self._host_data # (a numpy array), initially unallocated then lazily filled + + + @property + def host_ptr(self): + return self.data.ctypes.data + + @property + def dev_ptr_ro(self): + # depend on cuda/openCL/notimplemented (defined at start) + + #if self.status not in {ON_DEVICE, ON_BOTH}: + if self.status == ON_HOST: + self._host2devicecopy() + self.status = ON_BOTH + return self._dev_ptr + + @property + def dev_ptr_w(self): + # depend on cuda/openCL/notimplemented (defined at start) + + #if self.status not in {ON_DEVICE, ON_BOTH}: + if self.status == ON_HOST: + self._host2devicecopy() + self.status = ON_DEVICE + return self._dev_ptr + + +class CudaVec(Vec): + ... + + +class OpenCLVec(Vec): + ... + ... From 6975ab0db5282c6e982e2388b693e44e4d64f568 Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Fri, 29 Jul 2022 13:05:58 +0100 Subject: [PATCH 12/14] Add MirroredArray class --- pyop2/array.py | 203 +++++++++++++++++-------- pyop2/backends/cpu.py | 7 +- pyop2/backends/opencl.py | 294 +++++++++++------------------------- pyop2/codegen/rep2loopy.py | 1 + pyop2/configuration.py | 2 + pyop2/offload_utils.py | 11 +- pyop2/parloop.py | 2 +- pyop2/types/dat.py | 21 +-- pyop2/types/data_carrier.py | 21 +-- pyop2/types/glob.py | 18 +-- pyop2/types/map.py | 11 +- 11 files changed, 271 insertions(+), 320 deletions(-) diff --git a/pyop2/array.py b/pyop2/array.py index d6ceb0201..23cbf7745 100644 --- a/pyop2/array.py +++ b/pyop2/array.py @@ -1,3 +1,15 @@ +import enum + +import numpy as np +import pyopencl +from petsc4py import PETSc + +from pyop2.configuration import configuration +from pyop2.offload_utils import _offloading as offloading +from pyop2.types.access import READ, RW, WRITE, INC +from pyop2.utils import verify_reshape + + class Status(enum.Enum): """where is the up-to-date version?""" ON_HOST = enum.auto() @@ -5,98 +17,155 @@ class Status(enum.Enum): ON_BOTH = enum.auto() -""" -e.g. 1 +ON_HOST = Status.ON_HOST +ON_DEVICE = Status.ON_DEVICE +ON_BOTH = Status.ON_BOTH -d = Dat(...) -!# this will create a numpy array of zeros +class MirroredArray: + """An array that is available on both host and device, copying values where necessary.""" -e.g. 2: + def __init__(self, data, dtype, shape): + # TODO This could be allocated lazily + self.dtype = np.dtype(dtype) + if data is None: + self._host_data = np.zeros(shape, dtype=dtype) + else: + self._host_data = verify_reshape(data, dtype, shape) + self.availability = ON_BOTH + + @classmethod + def new(cls, *args, **kwargs): + if configuration["backend"] == "CPU_ONLY": + return CPUOnlyArray(*args, **kwargs) + elif configuration["backend"] == "OPENCL": + return OpenCLArray(*args, **kwargs) + else: + raise ValueError -with offloading(): - d = Dat(...) + @property + def kernel_arg(self): + # lazy but for now assume that the data is always modified if we access + # the pointer + return self.device_ptr if offloading else self.host_ptr + + # if offloading: + # # N.B. not considering MAX, MIN here + # if access in {RW, WRITE, INC}: + # return self.device_ptr + # else: + # return self.device_ptr_ro + # else: + # # N.B. not considering MAX, MIN here + # if access in {RW, WRITE, INC}: + # return self.host_ptr + # else: + # return self.host_ptr_ro -!# this will create a *backend* array of zeros + @property + def is_available_on_device(self): + return self.availability in {ON_DEVICE, ON_BOTH} -e.g. 3: + def ensure_availability_on_device(self): + if not self.is_available_on_host: + self.host_to_device_copy() -parloop(..., target=DEVICE) -""" + @property + def is_available_on_host(self): + return self.availability in {ON_HOST, ON_BOTH} + def ensure_availability_on_host(self): + if not self.is_available_on_host: + self.device_to_host_copy() -class Array: - """An array that is available on both host and device, copying values where necessary.""" - def __init__(self, shape, dtype): - self.status = ON_HOST # could change with offloading ctxt mngr - #self._host_data = None - #self._dev_ptr = None - self.initdata(offload) - - def initdata(self, offload=False): - # offloat in {"cpu", "gpu"} - if offload == "cpu": - self._host_data = np.zeros(shape, dtype) - self._dev_ptr = None - elif offload == "gpu": - self._host_data = None - # this if statement is needed because backend is known at the - # start of the program but offload, i.e. where are we currently, is not. - if backend == "opencl": - self._dev_ptr = ... # some incantation or other - elif backend == "cuda": - ... - - def as_petsc_vec(self): - if offload == "cpu": - return PETSc.Vec().createMPI(...) - elif offload == "gpu": - if backend == "cuda": - return PETSc.Vec().createCUDA(..) - ... + @property + def vec(self): + raise NotImplementedError + # if offload == "cpu": + # return PETSc.Vec().createMPI(...) + # elif offload == "gpu": + # if backend == "cuda": + # return PETSc.Vec().createCUDA(..) + # ... @property def data(self): - """numpy array""" - if self._host_data is None: - ... - - if self.status == ON_DEVICE: - self._device2hostcopy() - self.status = ON_HOST - return self._host_data # (a numpy array), initially unallocated then lazily filled + self.ensure_availability_on_host() + self.availability = ON_HOST + v = self._host_data.view() + v.setflags(write=True) + return v + @property + def data_ro(self): + self.ensure_availability_on_host() + v = self._host_data.view() + v.setflags(write=False) + return v @property def host_ptr(self): + self.ensure_availability_on_host() + self.availability = ON_HOST return self.data.ctypes.data @property - def dev_ptr_ro(self): - # depend on cuda/openCL/notimplemented (defined at start) + def host_ptr_ro(self): + self.ensure_availability_on_host() + return self.data.ctypes.data + + +class CPUOnlyArray(MirroredArray): - #if self.status not in {ON_DEVICE, ON_BOTH}: - if self.status == ON_HOST: - self._host2devicecopy() - self.status = ON_BOTH - return self._dev_ptr + @property + def device_ptr(self): + return self.host_ptr @property - def dev_ptr_w(self): - # depend on cuda/openCL/notimplemented (defined at start) + def device_ptr_ro(self): + return self.host_ptr_ro + + def host_to_device_copy(self): + pass + + def device_to_host_copy(self): + pass - #if self.status not in {ON_DEVICE, ON_BOTH}: - if self.status == ON_HOST: - self._host2devicecopy() - self.status = ON_DEVICE - return self._dev_ptr +if configuration["backend"] == "OPENCL": + # TODO: Instruct the user to pass + # -viennacl_backend opencl + # -viennacl_opencl_device_type gpu + # create a dummy vector and extract its associated command queue + x = PETSc.Vec().create(PETSc.COMM_WORLD) + x.setType("viennacl") + x.setSizes(size=1) + queue_ptr = x.getCLQueueHandle() + cl_queue = pyopencl.CommandQueue.from_int_ptr(queue_ptr, retain=False) -class CudaVec(Vec): - ... +class OpenCLArray(MirroredArray): + + def __init__(self, data, shape, dtype): + if data is None: + self._device_data = pyopencl.array.empty(cl_queue, shape, dtype) + else: + self._device_data = pyopencl.array.to_device(cl_queue, data) + + @property + def device_ptr(self): + self.ensure_availability_on_device() + self.availability = ON_DEVICE + return self._device_data.data + + @property + def device_ptr_ro(self): + self.ensure_availability_on_device() + return self._device_data.data + + def host_to_device_copy(self): + self._device_data.set(self._host_data) -class OpenCLVec(Vec): - ... - ... + def device_to_host_copy(self): + self._device_data.get(self._host_data) diff --git a/pyop2/backends/cpu.py b/pyop2/backends/cpu.py index ce79c5262..1326620f2 100644 --- a/pyop2/backends/cpu.py +++ b/pyop2/backends/cpu.py @@ -110,6 +110,7 @@ def code_to_compile(self): """Return the C/C++ source code as a string.""" from pyop2.codegen.rep2loopy import generate + # import pdb; pdb.set_trace() wrapper = generate(self.builder) code = lp.generate_code_v2(wrapper) @@ -162,11 +163,11 @@ def reduction_begin(self): MAX: mpi.MPI.MAX}.get(self.accesses[idx]) if mpi.MPI.VERSION >= 3: - requests.append(self.comm.Iallreduce(glob._data, + requests.append(self.comm.Iallreduce(glob.data, glob._buf, op=mpi_op)) else: - self.comm.Allreduce(glob._data, glob._buf, op=mpi_op) + self.comm.Allreduce(glob.data, glob._buf, op=mpi_op) return tuple(requests) @PETSc.Log.EventDecorator("ParLoopRednEnd") @@ -177,7 +178,7 @@ def reduction_end(self, requests): mpi.MPI.Request.Waitall(requests) for idx in self._reduction_idxs: glob = self.arguments[idx].data - glob._data[:] = glob._buf + glob._array.data[:] = glob._buf else: assert len(requests) == 0 diff --git a/pyop2/backends/opencl.py b/pyop2/backends/opencl.py index a041860cb..5b61ca682 100644 --- a/pyop2/backends/opencl.py +++ b/pyop2/backends/opencl.py @@ -13,13 +13,13 @@ from pyop2.types.set import (MixedSet, Subset as BaseSubset, ExtrudedSet as BaseExtrudedSet, Set) -from pyop2.types.map import Map as BaseMap, MixedMap +from pyop2.types.map import Map, MixedMap from pyop2.types.dat import Dat as BaseDat, MixedDat, DatView from pyop2.types.dataset import DataSet, GlobalDataSet, MixedDataSet from pyop2.types.mat import Mat from pyop2.types.glob import Global as BaseGlobal from pyop2.types.access import RW, READ, MIN, MAX, INC -from pyop2.parloop import AbstractParloop +from pyop2.parloop import Parloop from pyop2.global_kernel import AbstractGlobalKernel from pyop2.backends import AbstractComputeBackend, cpu as cpu_backend from petsc4py import PETSc @@ -40,39 +40,6 @@ def read_only_clarray_setitem(self, *args, **kwargs): raise ValueError("assignment destination is read-only") -class Map(BaseMap): - - def __init__(self, *args, **kwargs): - super(Map, self).__init__(*args, **kwargs) - self._availability_flag = AVAILABLE_ON_HOST_ONLY - - @utils.cached_property - def _opencl_values(self): - self._availability_flag = AVAILABLE_ON_BOTH - return cla.to_device(opencl_backend.queue, self._values) - - def get_availability(self): - return self._availability_flag - - def ensure_availability_on_device(self): - self._opencl_values - - def ensure_availability_on_host(self): - # Map once initialized is not over-written so always available - # on host. - pass - - @property - def _kernel_args_(self): - if opencl_backend.offloading: - if not self.is_available_on_device(): - self.ensure_availability_on_device() - - return (self._opencl_values.data,) - else: - return super(Map, self)._kernel_args_ - - class ExtrudedSet(BaseExtrudedSet): """ ExtrudedSet for OpenCL. @@ -173,16 +140,6 @@ def _opencl_data(self): else: return cla.to_device(opencl_backend.queue, self._data) - def zero(self, subset=None): - if subset is None: - self.data[:] = 0 - self.halo_valid = True - else: - raise NotImplementedError - - def copy(self, other, subset=None): - raise NotImplementedError - @utils.cached_property def _vec(self): assert self.dtype == PETSc.ScalarType, \ @@ -278,91 +235,92 @@ def _kernel_args_(self): self._availability_flag = AVAILABLE_ON_HOST_ONLY return (self._data.ctypes.data, ) - @mpi.collective - @property - def data(self): - if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: - raise RuntimeError("Illegal access: no data associated with this Dat!") - - self.halo_valid = False - - if opencl_backend.offloading: - - self.ensure_availability_on_device() - - # {{{ marking data on the host as invalid - - if self.can_be_represented_as_petscvec: - # report to petsc that we are updating values on the device - with self.vec as petscvec: - petscvec.getCLMemHandle("w") - petscvec.restoreCLMemHandle() - else: - self._availability_flag = AVAILABLE_ON_DEVICE_ONLY - - # }}} - v = self._opencl_data[:self.dataset.size] - else: - self.ensure_availability_on_host() - v = self._data[:self.dataset.size].view() - v.setflags(write=True) - - # {{{ marking data on the device as invalid - - if self.can_be_represented_as_petscvec: - # report to petsc that we are altering data on the CPU - with self.vec as petscvec: - petscvec.stateIncrease() - else: - self._availability_flag = AVAILABLE_ON_HOST_ONLY - - # }}} - - return v - - @property - @mpi.collective - def data_with_halos(self): - self.global_to_local_begin(RW) - self.global_to_local_end(RW) - return self.data - - @property - @mpi.collective - def data_ro(self): - - if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: - raise RuntimeError("Illegal access: no data associated with this Dat!") - - self.halo_valid = False - - if opencl_backend.offloading: - self.ensure_availability_on_device() - v = self._opencl_data[:self.dataset.size].view() - v.setitem = read_only_clarray_setitem - else: - self.ensure_availability_on_host() - v = self._data[:self.dataset.size].view() - v.setflags(write=False) - - return v - - @property - @mpi.collective - def data_ro_with_halos(self): - self.global_to_local_begin(READ) - self.global_to_local_end(READ) - - if opencl_backend.offloading: - self.ensure_availability_on_device() - v = self._opencl_data.view() - v.setitem = read_only_clarray_setitem - else: - self.ensure_availability_on_host() - v = self._data.view() - v.setflags(write=False) - - return v + # @mpi.collective + # @property + # def data(self): + # if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: + # raise RuntimeError("Illegal access: no data associated with this Dat!") + # + # self.halo_valid = False + # return self._data.data + # + # if opencl_backend.offloading: + # + # self.ensure_availability_on_device() + # + # # {{{ marking data on the host as invalid + # + # if self.can_be_represented_as_petscvec: + # # report to petsc that we are updating values on the device + # with self.vec as petscvec: + # petscvec.getCLMemHandle("w") + # petscvec.restoreCLMemHandle() + # else: + # self._availability_flag = AVAILABLE_ON_DEVICE_ONLY + # + # # }}} + # v = self._opencl_data[:self.dataset.size] + # else: + # self.ensure_availability_on_host() + # v = self._data[:self.dataset.size].view() + # v.setflags(write=True) + # + # # {{{ marking data on the device as invalid + # + # if self.can_be_represented_as_petscvec: + # # report to petsc that we are altering data on the CPU + # with self.vec as petscvec: + # petscvec.stateIncrease() + # else: + # self._availability_flag = AVAILABLE_ON_HOST_ONLY + # + # # }}} + # + # return v + # + # @property + # @mpi.collective + # def data_with_halos(self): + # self.global_to_local_begin(RW) + # self.global_to_local_end(RW) + # return self.data + # + # @property + # @mpi.collective + # def data_ro(self): + # + # if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: + # raise RuntimeError("Illegal access: no data associated with this Dat!") + # + # self.halo_valid = False + # + # if opencl_backend.offloading: + # self.ensure_availability_on_device() + # v = self._opencl_data[:self.dataset.size].view() + # v.setitem = read_only_clarray_setitem + # else: + # self.ensure_availability_on_host() + # v = self._data[:self.dataset.size].view() + # v.setflags(write=False) + # + # return v + # + # @property + # @mpi.collective + # def data_ro_with_halos(self): + # self.global_to_local_begin(READ) + # self.global_to_local_end(READ) + # + # if opencl_backend.offloading: + # self.ensure_availability_on_device() + # v = self._opencl_data.view() + # v.setitem = read_only_clarray_setitem + # else: + # self.ensure_availability_on_host() + # v = self._data.view() + # v.setflags(write=False) + # + # return v class Global(BaseGlobal): @@ -405,39 +363,6 @@ def _kernel_args_(self): self._availability_flag = AVAILABLE_ON_HOST_ONLY return super(Global, self)._kernel_args_ - @mpi.collective - @property - def data(self): - if len(self._data) == 0: - raise RuntimeError("Illegal access: No data associated with" - " this Global!") - - if opencl_backend.offloading: - self.ensure_availability_on_device() - self._availability_flag = AVAILABLE_ON_DEVICE_ONLY - return self._opencl_data - else: - self.ensure_availability_on_host() - self._availability_flag = AVAILABLE_ON_HOST_ONLY - return self._data - - @property - def data_ro(self): - if len(self._data) == 0: - raise RuntimeError("Illegal access: No data associated with" - " this Global!") - - if opencl_backend.offloading: - self.ensure_availability_on_device() - view = self._opencl_data.view() - view.setitem = read_only_clarray_setitem - return view - else: - self.ensure_availability_on_host() - view = self._data.view() - view.setflags(write=False) - return view - @utils.cached_property def _vec(self): raise NotImplementedError() @@ -572,49 +497,6 @@ def compile(self, comm): return CLKernelWithExtraArgs(cl_knl, self.get_extra_args()) -class Parloop(AbstractParloop): - @PETSc.Log.EventDecorator("ParLoopRednBegin") - @mpi.collective - def reduction_begin(self): - """Begin reductions.""" - requests = [] - for idx in self._reduction_idxs: - glob = self.arguments[idx].data - mpi_op = {INC: mpi.MPI.SUM, - MIN: mpi.MPI.MIN, - MAX: mpi.MPI.MAX}.get(self.accesses[idx]) - - if mpi.MPI.VERSION >= 3: - glob.ensure_availability_on_host() - requests.append(self.comm.Iallreduce(glob._data, - glob._buf, - op=mpi_op)) - else: - glob.ensure_availability_on_host() - self.comm.Allreduce(glob._data, glob._buf, op=mpi_op) - return tuple(requests) - - @PETSc.Log.EventDecorator("ParLoopRednEnd") - @mpi.collective - def reduction_end(self, requests): - """Finish reductions.""" - if mpi.MPI.VERSION >= 3: - mpi.MPI.Request.Waitall(requests) - for idx in self._reduction_idxs: - glob = self.arguments[idx].data - glob._data[:] = glob._buf - glob._availability_flag = AVAILABLE_ON_HOST_ONLY - glob.ensure_availability_on_device() - else: - assert len(requests) == 0 - - for idx in self._reduction_idxs: - glob = self.arguments[idx].data - glob._data[:] = glob._buf - glob._availability_flag = AVAILABLE_ON_HOST_ONLY - glob.ensure_availability_on_device() - - class OpenCLBackend(AbstractComputeBackend): Parloop_offloading = Parloop Parloop_no_offloading = cpu_backend.Parloop diff --git a/pyop2/codegen/rep2loopy.py b/pyop2/codegen/rep2loopy.py index 75182e716..d40d7f584 100644 --- a/pyop2/codegen/rep2loopy.py +++ b/pyop2/codegen/rep2loopy.py @@ -528,6 +528,7 @@ def renamer(expr): assumptions = assumptions & pwaffd[parameters.layer_start].le_set(pwaffd[parameters.layer_end]) assumptions = reduce(operator.and_, assumptions.get_basic_sets()) + # import pdb; pdb.set_trace() wrapper = loopy.make_kernel(domains, statements, kernel_data=parameters.kernel_data, diff --git a/pyop2/configuration.py b/pyop2/configuration.py index a98cad124..07681cc1c 100644 --- a/pyop2/configuration.py +++ b/pyop2/configuration.py @@ -84,6 +84,8 @@ class Configuration(dict): # name, env variable, type, default, write once cache_dir = os.path.join(gettempdir(), "pyop2-cache-uid%s" % os.getuid()) DEFAULTS = { + "backend": + ("PYOP2_BACKEND", str, "CPU_ONLY"), "cc": ("PYOP2_CC", str, ""), "cxx": diff --git a/pyop2/offload_utils.py b/pyop2/offload_utils.py index 7ad1f1c16..a7323ae20 100644 --- a/pyop2/offload_utils.py +++ b/pyop2/offload_utils.py @@ -2,6 +2,10 @@ from contextlib import contextmanager +_offloading = False +"""Global state indicating whether or not we are running on the host or device""" + + class DataAvailability(IntEnum): """ Indicates whether the device or host contains valid data. @@ -66,8 +70,9 @@ def offloading(): region will be executed on backend as selected via :func:`set_offloading_backend`. """ - from pyop2 import op2 - op2.compute_backend.turn_on_offloading() + global _offloading + + _offloading = True yield - op2.compute_backend.turn_off_offloading() + _offloading = False return diff --git a/pyop2/parloop.py b/pyop2/parloop.py index 658dd5958..c49492781 100644 --- a/pyop2/parloop.py +++ b/pyop2/parloop.py @@ -426,7 +426,7 @@ def prepare_reduced_globals(cls, arguments, global_knl): reduced_globals = {} for i, (lk_arg, gk_arg, pl_arg) in enumerate(cls.zip_arguments(global_knl, arguments)): if isinstance(gk_arg, GlobalKernelArg) and lk_arg.access == Access.INC: - tmp = compute_backend.Global(gk_arg.dim, data=np.zeros_like(pl_arg.data._data), dtype=lk_arg.dtype) + tmp = compute_backend.Global(gk_arg.dim, data=np.zeros_like(pl_arg.data.data), dtype=lk_arg.dtype) reduced_globals[tmp] = pl_arg arguments[i] = GlobalParloopArg(tmp) diff --git a/pyop2/types/dat.py b/pyop2/types/dat.py index f5f35ce89..846e6cad7 100644 --- a/pyop2/types/dat.py +++ b/pyop2/types/dat.py @@ -19,6 +19,7 @@ from pyop2.types.dataset import DataSet, GlobalDataSet from pyop2.types.data_carrier import DataCarrier, EmptyDataMixin, VecAccessMixin from pyop2.types.set import ExtrudedSet, GlobalSet, Set +from pyop2.array import MirroredArray class AbstractDat(DataCarrier, EmptyDataMixin): @@ -78,7 +79,7 @@ def __init__(self, dataset, data=None, dtype=None, name=None): # a dataset dimension of 1. dataset = dataset ** 1 self._shape = (dataset.total_size,) + (() if dataset.cdim == 1 else dataset.dim) - EmptyDataMixin.__init__(self, data, dtype, self._shape) + self._array = MirroredArray.new(data, dtype, self._shape) self._dataset = dataset self.comm = dataset.comm @@ -150,9 +151,7 @@ def data(self): if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: raise RuntimeError("Illegal access: no data associated with this Dat!") self.halo_valid = False - v = self._data[:self.dataset.size].view() - v.setflags(write=True) - return v + return self._array.data[:self.dataset.size] @property @mpi.collective @@ -168,9 +167,7 @@ def data_with_halos(self): self.global_to_local_begin(Access.RW) self.global_to_local_end(Access.RW) self.halo_valid = False - v = self._data.view() - v.setflags(write=True) - return v + return self._array.data @property @mpi.collective @@ -186,9 +183,7 @@ def data_ro(self): """ if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: raise RuntimeError("Illegal access: no data associated with this Dat!") - v = self._data[:self.dataset.size].view() - v.setflags(write=False) - return v + return self._array.data_ro[:self.dataset.size] @property @mpi.collective @@ -206,9 +201,7 @@ def data_ro_with_halos(self): """ self.global_to_local_begin(Access.READ) self.global_to_local_end(Access.READ) - v = self._data.view() - v.setflags(write=False) - return v + return self._array.data_ro def save(self, filename): """Write the data array to file ``filename`` in NumPy format.""" @@ -237,7 +230,7 @@ def shape(self): @utils.cached_property def dtype(self): - return self._dtype + return self._array.dtype @utils.cached_property def nbytes(self): diff --git a/pyop2/types/data_carrier.py b/pyop2/types/data_carrier.py index 73d3974c2..d3f462865 100644 --- a/pyop2/types/data_carrier.py +++ b/pyop2/types/data_carrier.py @@ -8,6 +8,7 @@ utils ) from pyop2.types.access import Access +from pyop2.array import MirroredArray class DataCarrier(abc.ABC): @@ -57,24 +58,16 @@ class EmptyDataMixin(abc.ABC): if it does not already exist. """ def __init__(self, data, dtype, shape): - if data is None: - self._dtype = np.dtype(dtype if dtype is not None else dtypes.ScalarType) - else: - self._numpy_data = utils.verify_reshape(data, dtype, shape, allow_none=True) - self._dtype = self._data.dtype + self._array = MirroredArray.new(data, dtype, shape) + self._dtype = np.dtype(dtype) - @utils.cached_property + @property def _data(self): - """Return the user-provided data buffer, or a zeroed buffer of - the correct size if none was provided.""" - if not self._is_allocated: - self._numpy_data = np.zeros(self.shape, dtype=self._dtype) - return self._numpy_data + return self._array.data @property - def _is_allocated(self): - """Return True if the data buffer has been allocated.""" - return hasattr(self, '_numpy_data') + def _data_ro(self): + return self._array.data class VecAccessMixin(abc.ABC): diff --git a/pyop2/types/glob.py b/pyop2/types/glob.py index 7472a435d..74451594b 100644 --- a/pyop2/types/glob.py +++ b/pyop2/types/glob.py @@ -12,10 +12,11 @@ ) from pyop2.offload_utils import OffloadMixin from pyop2.types.access import Access -from pyop2.types.data_carrier import DataCarrier, EmptyDataMixin, VecAccessMixin +from pyop2.types.data_carrier import DataCarrier, VecAccessMixin +from pyop2.array import MirroredArray -class Global(DataCarrier, EmptyDataMixin, VecAccessMixin, OffloadMixin): +class Global(DataCarrier, VecAccessMixin, OffloadMixin): """OP2 global value. @@ -47,7 +48,8 @@ def __init__(self, dim, data=None, dtype=None, name=None, comm=None): return self._dim = utils.as_tuple(dim, int) self._cdim = np.prod(self._dim).item() - EmptyDataMixin.__init__(self, data, dtype, self._dim) + self._array = MirroredArray.new(data, dtype, self._dim) + self._dtype = np.dtype(dtype) self._buf = np.empty(self.shape, dtype=self.dtype) self._name = name or "global_#x%x" % id(self) self.comm = comm @@ -57,7 +59,7 @@ def __init__(self, dim, data=None, dtype=None, name=None, comm=None): @utils.cached_property def _kernel_args_(self): - return (self._data.ctypes.data, ) + return (self._array.kernel_arg,) @utils.cached_property def _argtypes_(self): @@ -109,9 +111,9 @@ def shape(self): def data(self): """Data array.""" self.increment_dat_version() - if len(self._data) == 0: + if len(self._array.data) == 0: raise RuntimeError("Illegal access: No data associated with this Global!") - return self._data + return self._array.data @property def dtype(self): @@ -120,9 +122,7 @@ def dtype(self): @property def data_ro(self): """Data array.""" - view = self._data.view() - view.setflags(write=False) - return view + return self._array.data_ro @data.setter def data(self, value): diff --git a/pyop2/types/map.py b/pyop2/types/map.py index aa8ec45d0..07fec4f10 100644 --- a/pyop2/types/map.py +++ b/pyop2/types/map.py @@ -12,6 +12,7 @@ ) from pyop2.offload_utils import OffloadMixin from pyop2.types.set import GlobalSet, MixedSet, Set +from pyop2.array import MirroredArray class Map(OffloadMixin): @@ -38,9 +39,9 @@ def __init__(self, iterset, toset, arity, values=None, name=None, offset=None): self._toset = toset self.comm = toset.comm self._arity = arity - self._values = utils.verify_reshape(values, dtypes.IntType, - (iterset.total_size, arity), allow_none=True) - self.shape = (iterset.total_size, arity) + shape = (iterset.total_size, arity) + self._values_array = MirroredArray.new(values, dtypes.IntType, shape) + self.shape = shape self._name = name or "map_#x%x" % id(self) if offset is None or len(offset) == 0: self._offset = None @@ -49,6 +50,10 @@ def __init__(self, iterset, toset, arity, values=None, name=None, offset=None): # A cache for objects built on top of this map self._cache = {} + @property + def _values(self): + return self._values_array.data_ro + @utils.cached_property def _kernel_args_(self): return (self._values.ctypes.data, ) From 9a8baa6451ce4fcba1f318e4759332134d762174 Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Fri, 29 Jul 2022 13:09:59 +0100 Subject: [PATCH 13/14] some more tweaks --- pyop2/backends/opencl.py | 191 +-------------------------------------- pyop2/types/set.py | 7 +- 2 files changed, 8 insertions(+), 190 deletions(-) diff --git a/pyop2/backends/opencl.py b/pyop2/backends/opencl.py index 5b61ca682..2df3073e2 100644 --- a/pyop2/backends/opencl.py +++ b/pyop2/backends/opencl.py @@ -10,8 +10,8 @@ AVAILABLE_ON_BOTH, DataAvailability) from pyop2.configuration import configuration -from pyop2.types.set import (MixedSet, Subset as BaseSubset, - ExtrudedSet as BaseExtrudedSet, +from pyop2.types.set import (MixedSet, Subset, + ExtrudedSet, Set) from pyop2.types.map import Map, MixedMap from pyop2.types.dat import Dat as BaseDat, MixedDat, DatView @@ -40,77 +40,6 @@ def read_only_clarray_setitem(self, *args, **kwargs): raise ValueError("assignment destination is read-only") -class ExtrudedSet(BaseExtrudedSet): - """ - ExtrudedSet for OpenCL. - """ - - def __init__(self, *args, **kwargs): - super(ExtrudedSet, self).__init__(*args, **kwargs) - self._availability_flag = AVAILABLE_ON_HOST_ONLY - - @utils.cached_property - def opencl_layers_array(self): - self._availability_flag = AVAILABLE_ON_BOTH - return cla.to_device(opencl_backend.queue, self.layers_array) - - def get_availability(self): - return self._availability_flag - - def ensure_availability_on_device(self): - self.opencl_layers_array - - def ensure_availability_on_host(self): - # ExtrudedSet once initialized is not over-written so always available - # on host. - pass - - @property - def _kernel_args_(self): - if opencl_backend.offloading: - if not self.is_available_on_device(): - self.ensure_availability_on_device() - - return (self.opencl_layers_array.data,) - else: - return super(ExtrudedSet, self)._kernel_args_ - - -class Subset(BaseSubset): - """ - Subset for OpenCL. - """ - def __init__(self, *args, **kwargs): - super(Subset, self).__init__(*args, **kwargs) - self._availability_flag = AVAILABLE_ON_HOST_ONLY - - def get_availability(self): - return self._availability_flag - - @utils.cached_property - def _opencl_indices(self): - self._availability_flag = AVAILABLE_ON_BOTH - return cla.to_device(opencl_backend.queue, self._indices) - - def ensure_availability_on_device(self): - self._opencl_indices - - def ensure_availability_on_host(self): - # Subset once initialized is not over-written so always available - # on host. - pass - - @property - def _kernel_args_(self): - if opencl_backend.offloading: - if not self.is_available_on_device(): - self.ensure_availability_on_device() - - return (self._opencl_indices.data,) - else: - return super(Subset, self)._kernel_args_ - - class Dat(BaseDat): """ Dat for OpenCL. @@ -206,122 +135,6 @@ def vec_context(self, access): if access is not READ: self.halo_valid = False - @property - def _kernel_args_(self): - if self.can_be_represented_as_petscvec: - if opencl_backend.offloading: - self.ensure_availability_on_device() - # tell petsc that we have updated the data in the CL buffer - with self.vec as v: - v.getCLMemHandle() - v.restoreCLMemHandle() - - return (self._opencl_data.data,) - else: - self.ensure_availability_on_host() - # tell petsc that we have updated the data on the host - with self.vec as v: - v.stateIncrease() - return (self._data.ctypes.data, ) - else: - if opencl_backend.offloading: - self.ensure_availability_on_device() - - self._availability_flag = AVAILABLE_ON_DEVICE_ONLY - return (self._opencl_data.data, ) - else: - self.ensure_availability_on_host() - - self._availability_flag = AVAILABLE_ON_HOST_ONLY - return (self._data.ctypes.data, ) - - # @mpi.collective - # @property - # def data(self): - # if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: - # raise RuntimeError("Illegal access: no data associated with this Dat!") - # - # self.halo_valid = False - # return self._data.data - # - # if opencl_backend.offloading: - # - # self.ensure_availability_on_device() - # - # # {{{ marking data on the host as invalid - # - # if self.can_be_represented_as_petscvec: - # # report to petsc that we are updating values on the device - # with self.vec as petscvec: - # petscvec.getCLMemHandle("w") - # petscvec.restoreCLMemHandle() - # else: - # self._availability_flag = AVAILABLE_ON_DEVICE_ONLY - # - # # }}} - # v = self._opencl_data[:self.dataset.size] - # else: - # self.ensure_availability_on_host() - # v = self._data[:self.dataset.size].view() - # v.setflags(write=True) - # - # # {{{ marking data on the device as invalid - # - # if self.can_be_represented_as_petscvec: - # # report to petsc that we are altering data on the CPU - # with self.vec as petscvec: - # petscvec.stateIncrease() - # else: - # self._availability_flag = AVAILABLE_ON_HOST_ONLY - # - # # }}} - # - # return v - # - # @property - # @mpi.collective - # def data_with_halos(self): - # self.global_to_local_begin(RW) - # self.global_to_local_end(RW) - # return self.data - # - # @property - # @mpi.collective - # def data_ro(self): - # - # if self.dataset.total_size > 0 and self._data.size == 0 and self.cdim > 0: - # raise RuntimeError("Illegal access: no data associated with this Dat!") - # - # self.halo_valid = False - # - # if opencl_backend.offloading: - # self.ensure_availability_on_device() - # v = self._opencl_data[:self.dataset.size].view() - # v.setitem = read_only_clarray_setitem - # else: - # self.ensure_availability_on_host() - # v = self._data[:self.dataset.size].view() - # v.setflags(write=False) - # - # return v - # - # @property - # @mpi.collective - # def data_ro_with_halos(self): - # self.global_to_local_begin(READ) - # self.global_to_local_end(READ) - # - # if opencl_backend.offloading: - # self.ensure_availability_on_device() - # v = self._opencl_data.view() - # v.setitem = read_only_clarray_setitem - # else: - # self.ensure_availability_on_host() - # v = self._data.view() - # v.setflags(write=False) - # - # return v - class Global(BaseGlobal): """ diff --git a/pyop2/types/set.py b/pyop2/types/set.py index e73d0e9a7..8d8b69468 100644 --- a/pyop2/types/set.py +++ b/pyop2/types/set.py @@ -12,6 +12,7 @@ utils ) from pyop2.offload_utils import OffloadMixin +from pyop2.array import MirroredArray class Set: @@ -323,9 +324,13 @@ def __init__(self, parent, layers): layers = np.asarray([[0, layers]], dtype=dtypes.IntType) self.constant_layers = True - self._layers = layers + self._layers_array = MirroredArray.new(layers, dtypes.IntType, layers.shape) self._extruded = True + @property + def _layers(self): + return self._layers_array.data + @utils.cached_property def _kernel_args_(self): return (self.layers_array.ctypes.data, ) From ba02a5b6b03b05af6fdd55ab450b09f5334e3105 Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Fri, 29 Jul 2022 14:45:33 +0100 Subject: [PATCH 14/14] fix --- pyop2/array.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyop2/array.py b/pyop2/array.py index 23cbf7745..86540fe3c 100644 --- a/pyop2/array.py +++ b/pyop2/array.py @@ -67,7 +67,7 @@ def is_available_on_device(self): return self.availability in {ON_DEVICE, ON_BOTH} def ensure_availability_on_device(self): - if not self.is_available_on_host: + if not self.is_available_on_device: self.host_to_device_copy() @property