Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion plugins/sql_functions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,11 @@ struct SqlQueryFunctionApplier: public FunctionApplier {
switch (function->functionConfig.output) {
case FIRST_ROW: {
ExpressionValue result;

auto output = executor->take();
ssize_t offset = function->functionConfig.query.stm->offset;

for (size_t n = 0; output && n < offset; ++n)
output = executor->take();

if (output) {
// MLDB-1329 band-aid fix. This appears to break a circlar
Expand Down
8 changes: 8 additions & 0 deletions sql/execution_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,12 @@ takeAll(std::function<bool (std::shared_ptr<PipelineResults> &)> onResult)
return true;
}

std::shared_ptr<PipelineResults>
ElementExecutor::
takeColumn()
{
throw HttpReturnException(500, "Element Executor " + ML::type_name(*this) + " does not allow taking columns");
}
/*****************************************************************************/
/* PIPELINE ELEMENT */
/*****************************************************************************/
Expand Down Expand Up @@ -397,6 +403,8 @@ std::shared_ptr<PipelineElement>
PipelineElement::
params(std::function<std::shared_ptr<ExpressionValueInfo> (const Utf8String & name)> getParamInfo)
{
ExcAssert(shared_from_this());
ExcAssert(getParamInfo);
return std::make_shared<ParamsElement>(shared_from_this(), std::move(getParamInfo));
}

Expand Down
3 changes: 3 additions & 0 deletions sql/execution_pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ struct ElementExecutor {
/** Take one element from the pipeline. */
virtual std::shared_ptr<PipelineResults> take() = 0;

/** Take one element from the pipeline. */
virtual std::shared_ptr<PipelineResults> takeColumn();

/** Take all elements from the pipeline. inParallel describes whether
the function can be called from multiple threads at once.
*/
Expand Down
Loading