Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions cpp/code/compute_fn.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// ------------------------------
// Dependencies

// standard dependencies
#include <stdint.h>
#include <string>
#include <iostream>

// arrow dependencies
#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <arrow/util/hashing.h>

#include "common.h"


// >> aliases for types in standard library
using std::shared_ptr;
using std::vector;

// >> commonly used arrow types
// |> general programming support
using arrow::Result;
using arrow::Status;
using arrow::Datum;

// |> arrow data types and helpers
using arrow::Int64Builder;
using arrow::Array;
using arrow::ArraySpan;


// >> aliases for types used to define a custom function (e.g. `NamedScalarFn`)
// |> kernel parameters
using arrow::compute::KernelContext;
using arrow::compute::ExecSpan;
using arrow::compute::ExecResult;

// |> for defining compute functions and their compute kernels
using arrow::compute::FunctionDoc;
using arrow::compute::InputType;
using arrow::compute::OutputType;
using arrow::compute::Arity;
using arrow::compute::ScalarFunction;

// |> for adding to a function registry or using `CallFunction`
using arrow::compute::FunctionRegistry;
using arrow::compute::ExecContext;


// ------------------------------
// Structs and Classes

// >> Documentation for a compute function
/**
* Create a const instance of `FunctionDoc` that contains 3 attributes:
* 1. Short description
* 2. Long description (can be multiple lines, each limited to 78 characters in width)
* 3. Name of input arguments
*/
const FunctionDoc named_scalar_fn_doc {
"Unary function that calculates a hash for each element of the input"
,("This function uses the xxHash algorithm.\n"
"The result contains a 64-bit hash value for each input element.")
,{ "input_array" }
};


// >> Kernel implementations for a compute function
/**
* Create implementations that will be associated with our compute function. When a
* compute function is invoked, the compute API framework will delegate execution to an
* associated kernel that matches: (1) input argument types/shapes and (2) output argument
* types/shapes.
*
* Kernel implementations may be functions or may be methods (functions within a class or
* struct).
*/
struct NamedScalarFn {

/**
* A kernel implementation that expects a single array as input, and outputs an array of
* int64 values. We write this implementation knowing what function we want to
* associate it with ("NamedScalarFn"), but that association is made later (see
* `RegisterScalarFnKernels()` below).
*/
static Status
Exec(KernelContext *ctx, const ExecSpan &input_arg, ExecResult *out) {
StartRecipe("DefineAComputeKernel");

// Validate inputs
if (input_arg.num_values() != 1 or !input_arg[0].is_array()) {
return Status::Invalid("Unsupported argument types or shape");
}

// The input ArraySpan manages data as 3 buffers; the data buffer has index `1`
constexpr int bufndx_data = 1;
const int64_t *hash_inputs = input_arg[0].array.GetValues<int64_t>(bufndx_data);
const auto input_len = input_arg[0].array.length;

// Allocate an Arrow buffer for output
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> hash_buffer,
AllocateBuffer(input_len * sizeof(int64_t)));

// Call hashing function, using both prime multipliers from xxHash
int64_t *hash_results = reinterpret_cast<int64_t*>(hash_buffer->mutable_data());
for (int val_ndx = 0; val_ndx < input_len; ++val_ndx) {
hash_results[val_ndx] = (
ScalarHelper<int64_t, 0>::ComputeHash(hash_inputs[val_ndx])
+ ScalarHelper<int64_t, 1>::ComputeHash(hash_inputs[val_ndx])
);
}

// Use ArrayData (not ArraySpan) for ownership of result buffer
out->value = ArrayData{int64(), input_len, {nullptr, std::move(hash_buffer)}};

EndRecipe("DefineAComputeKernel");
return Status::OK();
}
};


// ------------------------------
// Functions


// >> Function registration and kernel association
/**
* A convenience function that shows how we construct an instance of `ScalarFunction` that
* will be registered in a function registry. The instance is constructed with: (1) a
* unique name ("named_scalar_fn"), (2) an "arity" (`Arity::Unary()`), and (3) an instance
* of `FunctionDoc`.
*
* The function name is used to invoke it from a function registry after it has been
* registered. The "arity" is the cardinality of the function's parameters--1 parameter is
* a unary function, 2 parameters is a binary function, etc. Finally, it is helpful to
* associate the function with documentation, which uses the `FunctionDoc` struct.
*/
shared_ptr<ScalarFunction>
RegisterScalarFnKernels() {
StartRecipe("AddKernelToFunction");
// Instantiate a function to be registered
auto fn_named_scalar = std::make_shared<ScalarFunction>(
"named_scalar_fn"
,Arity::Unary()
,std::move(named_scalar_fn_doc)
);

// Associate a function and kernel using `ScalarFunction::AddKernel()`
DCHECK_OK(
fn_named_scalar->AddKernel(
{ InputType(arrow::int64()) }
,OutputType(arrow::int64())
,NamedScalarFn::Exec
)
);
EndRecipe("AddKernelToFunction");

return fn_named_scalar;
}


/**
* A convenience function that shows how we register a custom function with a
* `FunctionRegistry`. To keep this simple and general, this function takes a pointer to a
* FunctionRegistry as an input argument, then invokes `FunctionRegistry::AddFunction()`.
*/
void
RegisterNamedScalarFn(FunctionRegistry *registry) {
StartRecipe("AddFunctionToRegistry");
// scalar_fn has type: shared_ptr<ScalarFunction>
auto scalar_fn = RegisterScalarFnKernels();
DCHECK_OK(registry->AddFunction(std::move(scalar_fn)));
EndRecipe("AddFunctionToRegistry");
}


// >> Convenience functions
/**
* An optional, convenient invocation function to easily call our compute function. This
* executes our compute function by invoking `CallFunction` with the name that we used to
* register the function ("named_scalar_fn" in this case).
*/
ARROW_EXPORT
Result<Datum>
NamedScalarFn(const Datum &input_arg, ExecContext *ctx) {
StartRecipe("InvokeByCallFunction");
auto func_name = "named_scalar_fn";
auto result_datum = CallFunction(func_name, { input_arg }, ctx);
EndRecipe("InvokeByCallFunction");

return result_datum;
}


Result<shared_ptr<Array>>
BuildIntArray() {
vector<int64_t> col_vals { 0, 1, 1, 2, 3, 5, 8, 13, 21, 34 };

Int64Builder builder;
ARROW_RETURN_NOT_OK(builder.Reserve(col_vals.size()));
ARROW_RETURN_NOT_OK(builder.AppendValues(col_vals));
return builder.Finish();
}


class ComputeFunctionTest : public ::testing::Test {};

TEST(ComputeFunctionTest, TestRegisterAndCallFunction) {
// >> Register the function first
auto fn_registry = arrow::compute::GetFunctionRegistry();
RegisterNamedScalarFn(fn_registry);

// >> Then we can call the function
StartRecipe("InvokeByConvenienceFunction");
auto build_result = BuildIntArray();
if (not build_result.ok()) {
std::cerr << build_result.status().message() << std::endl;
return 1;
}

Datum col_data { *build_result };
auto fn_result = NamedScalarFn(col_data);
if (not fn_result.ok()) {
std::cerr << fn_result.status().message() << std::endl;
return 2;
}

auto result_data = fn_result->make_array();
std::cout << "Success:" << std::endl;
std::cout << "\t" << result_data->ToString() << std::endl;
EndRecipe("InvokeByConvenienceFunction");

// If we want to peek at the input data
std::cout << col_data.make_array()->ToString() << std::endl;

return 0;
}
151 changes: 151 additions & 0 deletions cpp/source/compute.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.

====================================
Defining and Using Compute Functions
====================================

This section contains (or will contain) a number of recipes illustrating how to
define new "compute functions" or how to use existing ones. Arrow contains a "Compute
API," which primarily consists of a "registry" of functions that can be invoked.
Currently, Arrow populates a default registry with a variety of useful functions. The
recipes provided in this section show some approaches to define a compute function as well
as how to invoke a compute function by name, given a registry.


.. contents::

Invoke a Compute Function
=========================

When invoking a compute function, the function must exist in a function registry. In this
recipe, we use `CallFunction()` to invoke the function with name "named_scalar_fn".

.. recipe:: ../code/compute_fn.cc InvokeByCallFunction
:caption: Use CallFunction() to invoke a compute function by name
:dedent: 2

.. note::
This method allows us to specify arguments as a vector and a custom ExecContext.

If an `ExecContext` is not passed to `CallFunction` (it is null), then the default
FunctionRegistry will be used to call the function from.

If we have defined a convenience function that wraps `CallFunction()`, then we can call
that function instead. Various compute functions provided by Arrow have these convenience
functions defined, such as `Add` or `Subtract`.

.. recipe:: ../code/compute_fn.cc InvokeByConvenienceFunction
:caption: Use a convenience invocation function to call a compute function
:dedent: 2


Adding a Custom Compute Function
================================

To make a custom compute function available, there are 3 primary steps:

1. Define kernels for the function (these implement the actual logic)

2. Associate the kernels with a function object

3. Add the function object to a function registry


Define Function Kernels
-----------------------

A kernel is a particular function that implements desired logic for a compute function.
There are at least a couple of types of function kernels, such as initialization kernels
and execution kernels. An initialization kernel prepares the initial state of a compute
function, while an execution kernel executes the main processing logic of the compute
function. The body of a function kernel may use other functions, but the kernel function
itself is a singular instance that will be associated with the desired compute function.
While compute functions can be associated with an initialization and execution kernel
pair, this recipe only shows the definition of an execution kernel.

The signature of an execution kernel is relatively standardized: it returns a `Status` and
takes a context, some arguments, and a pointer to an output result. The context wraps an
`ExecContext` and other metadata about the environment in which the kernel function should
be executed. The input arguments are contained within an `ExecSpan` (newly added in place
of `ExecBatch`), which holds non-owning references to argument data. Finally, the
`ExecResult` pointed to should be set to an appropriate `ArraySpan` or `ArrayData`
instance, depending on ownership semantics of the kernel's output.

.. recipe:: ../code/compute_fn.cc DefineAComputeKernel
:caption: Define an example compute kernel that uses ScalarHelper from hashing.h to hash
input values
:dedent: 2

This recipe shows basic validation of `input_arg` which contains a vector of input
arguments. Then, the input `Array` is accessed from `input_arg` and a `Buffer` is
allocated to hold output results. After the main loop is completed, the allocated `Buffer`
is wrapped in an `ArrayData` instance and referenced by `out`.


Associate Kernels with a Function
---------------------------------

The process of adding kernels to a compute function is easy: (1) create an appropriate
`Function` instance--`ScalarFunction` in this case--and (2) call the `AddKernel` function.
The more difficult part of this process is repeating for the desired data types and
knowing how the signatures work.

.. recipe:: ../code/compute_fn.cc AddKernelToFunction
:caption: Instantiate a ScalarFunction and add our execution kernel to it
:dedent: 2

A `ScalarFunction` represents a "scalar" or "element-wise" compute function (see
documentation on the Compute API). The signature used in this recipe passes:

1. A function name (to be used when calling it)

2. An "Arity" meaning how many input arguments it takes (like cardinality)

3. A `FunctionDoc` instance (to associate some documentation programmatically)

Then, `AddKernel` expects:

1. A vector of data types for each input argument

2. An output data type for the result

3. The function to be used as the execution kernel

4. The function to be used as the initialization kernel (optional)

Note that the constructor for `ScalarFunction` is more interested in how many arguments to
expect, and some information about the compute function itself; whereas, the function to
add a kernel specifies data types and the functions to call at runtime.


Add Function to Registry
------------------------

Finally, adding the function to a registry is wonderfully straightforward.

.. recipe:: ../code/compute_fn.cc AddFunctionToRegistry
:caption: Use convenience function to get a ScalarFunction with associated kernels, then
add it to the given FunctionRegistry
:dedent: 2

In this recipe, we simply wrap the logic in a convenience function that: (1) creates a
`ScalarFunction`, (2) adds our execution kernel to the compute function, and (3) returns
the compute function. Then, we add the compute function to some registry. This recipe
takes the `FunctionRegistry` as an argument so that it is easy to call from the same place
that the Arrow codebase registers other provided functions. Otherwise, we can add our
compute function to the default registry, or a custom registry.