diff --git a/plugins/sql_functions.cc b/plugins/sql_functions.cc index 8552d02e6..7dedaa6dc 100644 --- a/plugins/sql_functions.cc +++ b/plugins/sql_functions.cc @@ -176,8 +176,11 @@ struct SqlQueryFunctionApplier: public FunctionApplier { switch (function->functionConfig.output) { case FIRST_ROW: { ExpressionValue result; - auto output = executor->take(); + ssize_t offset = function->functionConfig.query.stm->offset; + + for (size_t n = 0; output && n < offset; ++n) + output = executor->take(); if (output) { // MLDB-1329 band-aid fix. This appears to break a circlar diff --git a/sql/execution_pipeline.cc b/sql/execution_pipeline.cc index dfe3ec2aa..ec3db1443 100644 --- a/sql/execution_pipeline.cc +++ b/sql/execution_pipeline.cc @@ -368,6 +368,12 @@ takeAll(std::function &)> onResult) return true; } +std::shared_ptr +ElementExecutor:: +takeColumn() +{ + throw HttpReturnException(500, "Element Executor " + ML::type_name(*this) + " does not allow taking columns"); +} /*****************************************************************************/ /* PIPELINE ELEMENT */ /*****************************************************************************/ @@ -397,6 +403,8 @@ std::shared_ptr PipelineElement:: params(std::function (const Utf8String & name)> getParamInfo) { + ExcAssert(shared_from_this()); + ExcAssert(getParamInfo); return std::make_shared(shared_from_this(), std::move(getParamInfo)); } diff --git a/sql/execution_pipeline.h b/sql/execution_pipeline.h index eb8b76cca..bd85512e9 100644 --- a/sql/execution_pipeline.h +++ b/sql/execution_pipeline.h @@ -223,6 +223,9 @@ struct ElementExecutor { /** Take one element from the pipeline. */ virtual std::shared_ptr take() = 0; + /** Take one element from the pipeline. */ + virtual std::shared_ptr takeColumn(); + /** Take all elements from the pipeline. inParallel describes whether the function can be called from multiple threads at once. */ diff --git a/sql/execution_pipeline_impl.cc b/sql/execution_pipeline_impl.cc index fbea2a456..9cd0172f2 100644 --- a/sql/execution_pipeline_impl.cc +++ b/sql/execution_pipeline_impl.cc @@ -82,74 +82,123 @@ doGetAllColumns(std::function keep, // << fieldOffset << endl; ExcAssertGreaterEqual(fieldOffset, 0); - std::vector columnsWithInfo; - std::map index; - for (auto & column: knownColumns) { + if (hasUnknownColumns) { + auto exec = [=] (const SqlRowScope & rowScope, const VariableFilter & filter) -> ExpressionValue + { + auto & row = rowScope.as(); - ColumnName outputName = keep(column.columnName); + const ExpressionValue & rowContents = row.values.at(fieldOffset + ROW_CONTENTS); - if (outputName.empty() && !asName.empty()) { - // BAD SMELL - //try with the table alias - outputName = keep(PathElement(asName) + column.columnName); - } + RowValue result; - if (outputName.empty()) { - continue; - } + auto onColumn = [&] (const PathElement & columnName, + const ExpressionValue & value) + { + ColumnName outputName = keep(columnName); + if (outputName.empty() && !asName.empty()) { + //try with the table alias + outputName = keep(PathElement(asName) + columnName); + } + + if (!outputName.empty()) { + auto onAtom = [&] (const Path & columnName, + const Path & prefix, + CellValue atom, + Date ts) + { + result.emplace_back(prefix + columnName, + std::move(atom), ts); + return true; + }; + + value.forEachAtom(onAtom, outputName); + } - KnownColumn out = column; - out.columnName = ColumnName(outputName); - columnsWithInfo.emplace_back(std::move(out)); - index[column.columnName] = ColumnName(outputName); + return true; + }; + + if (!rowContents.empty()) + rowContents.forEachColumn(onColumn); + + ExpressionValue val(std::move(result)); + return val.getFilteredDestructive(filter); + }; + + GetAllColumnsOutput result; + result.info = std::make_shared(columnsWithInfo, SCHEMA_OPEN); + result.exec = exec; + return result; } - - auto exec = [=] (const SqlRowScope & rowScope, const VariableFilter & filter) -> ExpressionValue - { - auto & row = rowScope.as(); + else { - const ExpressionValue & rowContents - = row.values.at(fieldOffset + ROW_CONTENTS); + std::map index; - RowValue result; + for (auto & column: knownColumns) { - auto onColumn = [&] (const PathElement & columnName, - const ExpressionValue & value) + ColumnName outputName = keep(column.columnName); + + if (outputName.empty() && !asName.empty()) { + // BAD SMELL + //try with the table alias + outputName = keep(PathElement(asName) + column.columnName); + } + + if (outputName.empty()) { + continue; + } + + KnownColumn out = column; + out.columnName = ColumnName(outputName); + columnsWithInfo.emplace_back(std::move(out)); + index[column.columnName] = ColumnName(outputName); + } + + auto exec = [=] (const SqlRowScope & rowScope, const VariableFilter & filter) -> ExpressionValue { - auto it = index.find(Path(columnName)); - if (it == index.end()) { - return true; - } + auto & row = rowScope.as(); - auto onAtom = [&] (const Path & columnName, - const Path & prefix, - CellValue atom, - Date ts) + const ExpressionValue & rowContents + = row.values.at(fieldOffset + ROW_CONTENTS); + + RowValue result; + + auto onColumn = [&] (const PathElement & columnName, + const ExpressionValue & value) { - result.emplace_back(prefix + columnName, - std::move(atom), ts); + auto it = index.find(Path(columnName)); + if (it == index.end()) { + return true; + } + + auto onAtom = [&] (const Path & columnName, + const Path & prefix, + CellValue atom, + Date ts) + { + result.emplace_back(prefix + columnName, + std::move(atom), ts); + return true; + }; + + value.forEachAtom(onAtom, it->second); + return true; }; - // TODO: lots of optimizations possible here... - value.forEachAtom(onAtom, it->second); + if (!rowContents.empty()) + rowContents.forEachColumn(onColumn); - return true; + ExpressionValue val(std::move(result)); + return val.getFilteredDestructive(filter); }; - if (!rowContents.empty()) - rowContents.forEachColumn(onColumn); - - ExpressionValue val(std::move(result)); - return val.getFilteredDestructive(filter); - }; - - GetAllColumnsOutput result; - result.info = std::make_shared(columnsWithInfo, SCHEMA_CLOSED); - result.exec = exec; - return result; + GetAllColumnsOutput result; + result.info = std::make_shared(columnsWithInfo, SCHEMA_CLOSED); + result.exec = exec; + return result; + } } BoundFunction @@ -289,6 +338,31 @@ take() return result; } +std::shared_ptr +GenerateRowsExecutor:: +takeColumn() +{ + auto result = source->take(); + + if (!result) + return result; + + auto column = columnGenerator(currentDone); + + if (column.columnName.empty()) + return nullptr; + + //cerr << "got column " << column.columnName << " " + // << jsonEncodeStr(column.rows) << endl; + + result->values.emplace_back(column.columnName, + Date::notADate()); + result->values.emplace_back(std::move(column.rows)); + + ++currentDone; + return result; +} + void GenerateRowsExecutor:: restart() @@ -356,6 +430,7 @@ start(const BoundParameters & getParam) const parent->orderBy, 0 /* offset */, -1 /* limit */); result->params = getParam; + result->columnGenerator = parent->from.getColumn; ExcAssert(result->params); return result; } @@ -450,7 +525,7 @@ outputAdded() const { SubSelectExecutor:: SubSelectExecutor(std::shared_ptr boundSelect, - const BoundParameters & getParam) + const BoundParameters & getParam) : columnIndex(0) { pipeline = boundSelect->start(getParam); } @@ -466,11 +541,85 @@ take() return subResult; } +std::shared_ptr +SubSelectExecutor:: +takeColumn() +{ + //We must do the terrible thing and query everything to build a transposed view. + if (!rows) { + + rows = make_shared > >(); + + while (1){ + auto subResult = pipeline->take(); + + if (!subResult) + break; + + rows->push_back(subResult); + + auto pushColumn = [&] (const PathElement & columnName, const ExpressionValue & val) { + columnNames.push_back(columnName); + return true; + }; + + ssize_t valuesOffset = subResult->values.size() - 1; + + ExpressionValue& values = subResult->values[valuesOffset]; + + values.forEachColumn(pushColumn); + } + + sort( columnNames.begin(), columnNames.end() ); + columnNames.erase( unique( columnNames.begin(), columnNames.end() ), columnNames.end() ); + + columnIndex = 0; + } + + if (columnIndex >= columnNames.size()) + return nullptr; + + PathElement subColumnName = columnNames[columnIndex++]; + + std::shared_ptr result = make_shared(); + + result->values.emplace_back(subColumnName.toUtf8String(), Date::notADate()); //column name becomes the rowname + + RowValue resultValues; + + for (auto& r : *rows) { + + ssize_t valuesOffset = r->values.size() - 1; + ssize_t rowNameOffset = valuesOffset - 1; + + ExpressionValue& rowName = r->values[rowNameOffset]; //rowName becomes the colum name + Path rowNamePath = Path::parse(rowName.toUtf8String()); + + auto pushResult = [&] (const PathElement & columnName, const ExpressionValue & val) { + if (columnName == subColumnName) { + resultValues.emplace_back(rowNamePath, val.coerceToAtom(), val.getEffectiveTimestamp()); + } + + return true; + }; + + ExpressionValue& values = r->values[valuesOffset]; + values.forEachColumn(pushResult); + } + + result->values.emplace_back(resultValues); + + return result; +} + void SubSelectExecutor:: restart() { pipeline->restart(); + rows.reset(); + columnNames.clear(); + columnIndex = 0; } @@ -872,6 +1021,18 @@ JoinElement(std::shared_ptr root, ->from(right, boundRight, when, selectAll, rightCondition, condition.right.orderBy) ->select(rightEmbedding); + + //for takeColumn + leftRaw= root + ->where(constantWhere) + ->from(left, boundLeft, when, selectAll, leftCondition, + condition.left.orderBy); + + rightRaw = root + ->where(constantWhere) + ->from(right, boundRight, when, selectAll, rightCondition, + condition.right.orderBy); + } std::shared_ptr @@ -881,6 +1042,8 @@ bind() const return std::make_shared(root->bind(), leftImpl->bind(), rightImpl->bind(), + leftRaw->bind(), + rightRaw->bind(), condition, joinQualification); } @@ -894,11 +1057,15 @@ JoinElement::CrossJoinExecutor:: CrossJoinExecutor(const Bound * parent, std::shared_ptr root, std::shared_ptr left, - std::shared_ptr right) + std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw) : parent(parent), root(std::move(root)), left(std::move(left)), - right(std::move(right)) + right(std::move(right)), + leftRaw(std::move(leftRaw)), + rightRaw(std::move(rightRaw)) { ExcAssert(parent && this->root && this->left && this->right); l = this->left->take(); @@ -1008,10 +1175,22 @@ take() } } +std::shared_ptr +JoinElement::CrossJoinExecutor:: +takeColumn() +{ + if (!transpose_) + transpose_ = make_shared(*this, leftRaw, rightRaw); + + return transpose_->take(); +} + void JoinElement::CrossJoinExecutor:: restart() { + transpose_.reset(); + left->restart(); right->restart(); l = left->take(); @@ -1027,11 +1206,15 @@ JoinElement::EquiJoinExecutor:: EquiJoinExecutor(const Bound * parent, std::shared_ptr root, std::shared_ptr left, - std::shared_ptr right) + std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw) : parent(parent), root(std::move(root)), left(std::move(left)), - right(std::move(right)) + right(std::move(right)), + leftRaw(std::move(leftRaw)), + rightRaw(std::move(rightRaw)) { l = this->left->take(); r = this->right->take(); @@ -1253,18 +1436,133 @@ take() return nullptr; } +std::shared_ptr +JoinElement::EquiJoinExecutor:: +takeColumn() +{ + if (!transpose_) + transpose_ = make_shared(*this, leftRaw, rightRaw); + + return transpose_->take(); +} + void JoinElement::EquiJoinExecutor:: restart() { - //cerr << "**** equijoin restart" << endl; left->restart(); right->restart(); + leftRaw->restart(); + rightRaw->restart(); + + transpose_.reset(); + l = left->take(); r = right->take(); takeMoreInput(); } +JoinElement::JoinTransposeExecutor:: +JoinTransposeExecutor(ElementExecutor& joinExecutor, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw) + : joinExecutor(joinExecutor), leftRaw(std::move(leftRaw)), rightRaw(std::move(rightRaw)) +{ + side = 0; + + //Transpose of joins should be avoided. + //We need to build the full row list and column index before we can proceed + while (1){ + auto res = joinExecutor.take(); + if (!res) + break; + + //TODO: This will not work in so many cases. Use the scope + Utf8String leftNameUtf8 = ""; + if (!res->values.at(0).empty()) + leftNameUtf8 = res->values.at(0).toUtf8String(); + size_t i = 2; + for (; i + 2 < res->values.size(); i+=2) { + if (i == 2) + leftNameUtf8 = "[" + leftNameUtf8 + "]"; + + leftNameUtf8 += res->values.at(0).empty() ? "-[]" : + "-[" + res->values.at(i).toUtf8String() + "]"; + } + + RowName leftName = RowName::parse(leftNameUtf8); + + Utf8String rightNameUtf8 = ""; + if (!res->values.at(i).empty()) + rightNameUtf8 = res->values.at(i).toUtf8String(); + RowName rightName = RowName::parse(rightNameUtf8); + + RowName rowName = RowName::parse("[" + leftNameUtf8 + "]-[" + rightNameUtf8 + "]"); + + leftRowIndex[leftName].push_back(rowName); + rightRowIndex[rightName].push_back(rowName); + } + +} + +std::shared_ptr +JoinElement::JoinTransposeExecutor:: +take() +{ + std::shared_ptr columnResult; + + SideRowIndex* index = &leftRowIndex; + if (side == 0){ + columnResult = this->leftRaw->takeColumn(); + if (!columnResult) + ++side; + } + if (side == 1){ + index = &rightRowIndex; + columnResult = this->rightRaw->takeColumn(); + if (!columnResult) + ++side; + } + + if (!columnResult) + return columnResult; + + //For each row in the return column, filter out those that weren't joined + ssize_t columnsOffset = columnResult->values.size() -1; + const ExpressionValue& rows = columnResult->values[columnsOffset]; + StructValue values; + + auto onRow = [&] (const PathElement & columnName, const ExpressionValue & val) { + RowHash rowHash = RowName(columnName); + + // Does this row appear in the output? If not, nothing to + // do with it + auto it = index->find(rowHash); + if (it == index->end()) { + return true; + } + + //copy the value + //but we need the actual rowname + if (it->second.size() == 1) { + values.emplace_back(it->second[0][0], std::move(val)); + } + else { + // Can't move the value to avoid it becoming null + for (const RowName & outputRowName: it->second) { + values.emplace_back(outputRowName[0], val); + } + } + + return true; + }; + + rows.forEachColumn(onRow); + columnResult->values[columnsOffset] = std::move(values); + + return columnResult; +} + /*****************************************************************************/ /* BOUND JOIN EXECUTOR */ @@ -1274,11 +1572,15 @@ JoinElement::Bound:: Bound(std::shared_ptr root, std::shared_ptr left, std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw, AnnotatedJoinCondition condition, JoinQualification joinQualification) : root_(std::move(root)), left_(std::move(left)), right_(std::move(right)), + leftRaw_(std::move(leftRaw)), + rightRaw_(std::move(rightRaw)), outputScope_(createOutputScope()), crossWhere_(condition.crossWhere->bind(*outputScope_)), condition_(std::move(condition)), @@ -1329,14 +1631,18 @@ start(const BoundParameters & getParam) const (this, root_->start(getParam), left_->start(getParam), - right_->start(getParam)); + right_->start(getParam), + leftRaw_->start(getParam), + rightRaw_->start(getParam)); case AnnotatedJoinCondition::EQUIJOIN: return std::make_shared (this, root_->start(getParam), left_->start(getParam), - right_->start(getParam)); + right_->start(getParam), + leftRaw_->start(getParam), + rightRaw_->start(getParam)); default: throw HttpReturnException(400, "Can't execute that kind of join", @@ -1451,7 +1757,7 @@ FromElement(std::shared_ptr root_, ExcAssert(this->root); UnboundEntities unbound = from->getUnbound(); - //cerr << "unbound for from = " << jsonEncode(unbound) << endl; + //cerr << "unbound for from = " << jsonEncode(unbound) << endl; if (from->getType() == "join") { std::shared_ptr join @@ -1483,6 +1789,23 @@ FromElement(std::shared_ptr root_, impl.reset(new SubSelectElement(root, subSelect->statement, orderBy, getParamInfo, from->getAs())); } + else if (from->getType() == "datasetFunction") { + std::shared_ptr function + = std::dynamic_pointer_cast(from); + ExcAssert(this->root); + ExcAssert(function); + + GetParamInfo getParamInfo = [&] (const Utf8String & paramName) + -> std::shared_ptr + { + throw HttpReturnException(500, "No query parameter " + paramName); + }; + + if (params_) + getParamInfo = params_; + + impl.reset(new DatasetFunctionElement(this->root, function, getParamInfo)); + } else { #if 0 if (!unbound.params.empty()) @@ -1566,6 +1889,7 @@ FromElement(std::shared_ptr root_, orderBy)); } else { + // Need to bound here to get the dataset auto rootBound = root->bind(); auto scope = rootBound->outputScope(); @@ -2264,6 +2588,339 @@ outputScope() const return outputScope_; } +/*****************************************************************************/ +/* DATASET FUNCTION ELEMENT */ +/*****************************************************************************/ + +DatasetFunctionElement:: +DatasetFunctionElement(std::shared_ptr root, + std::shared_ptr function, + GetParamInfo getParamInfo) + : source_(std::move(root)), function_(function) +{ + ExcAssert(function->args.size() <= 2); + ExcAssert(function->args.size() > 0); + ExcAssert(source_); + ExcAssert(getParamInfo); + + if (function_->functionName == "sample") { + throw HttpReturnException(500, "Dataset sampling not implemented for sql.query or cross joins"); + } + else if (function_->functionName == "merge") { + ExcAssert(function->args.size() == 2); + + pipeline = source_ + ->params(getParamInfo) + ->from(function->args[0], WhenExpression::TRUE, + SelectExpression::STAR, SelectExpression::TRUE, + OrderByExpression(), getParamInfo) + ->select("rowName()") + ->sort(OrderByExpression::parse("rowName()")) + ->select("rowName()") + ->select("*"); + + pipelineRight = source_ + ->params(getParamInfo) + ->from(function->args[1], WhenExpression::TRUE, + SelectExpression::STAR, SelectExpression::TRUE, + OrderByExpression(), getParamInfo) + ->select("rowName()") + ->sort(OrderByExpression::parse("rowName()")) + ->select("rowName()") + ->select("*"); + } + else + { + pipeline = source_ + ->params(getParamInfo) + ->from(function->args[0], WhenExpression::TRUE, + SelectExpression::STAR, SelectExpression::TRUE, + OrderByExpression(), getParamInfo); + } +} + +std::shared_ptr +DatasetFunctionElement:: +bind() const +{ + if (pipelineRight) + return std::make_shared(source_->bind(), + pipeline->bind(), + pipelineRight->bind(), + function_->getAs(), + function_->functionName); + else + return std::make_shared(source_->bind(), + pipeline->bind(), + function_->getAs(), + function_->functionName); +} + +/*****************************************************************************/ +/* TRANSPOSE LEXICAL SCOPE */ +/*****************************************************************************/ + +TransposeLexicalScope:: +TransposeLexicalScope(std::shared_ptr inner, + std::shared_ptr rowValueInfo, + Utf8String asName) + : TableLexicalScope(rowValueInfo, asName), + inner(inner) { + +} + +GetAllColumnsOutput +TransposeLexicalScope:: +doGetAllColumns(std::function keep, + int fieldOffset) +{ + //cerr << "transpose lexical scope get columns: fieldOffset = " + // << fieldOffset << endl; + + ExcAssertGreaterEqual(fieldOffset, 0); + + std::vector columnsWithInfo; + + auto exec = [=] (const SqlRowScope & rowScope, const VariableFilter & filter) -> ExpressionValue + { + auto & row = rowScope.as(); + + const ExpressionValue & rowContents + = row.values.at(fieldOffset + ROW_CONTENTS); + + RowValue result; + + auto onColumn = [&] (const PathElement & columnName, + const ExpressionValue & value) + { + //We have to check it in here because we dont know all the rownames in + ColumnName outputName = keep(columnName); + + if (outputName.empty() && !asName.empty()) { + + //try with the table alias + outputName = keep(PathElement(asName) + columnName); + } + + if (outputName.empty()) + return true; + + auto onAtom = [&] (const Path & columnName, + const Path & prefix, + CellValue atom, + Date ts) + { + result.emplace_back(prefix + columnName, + std::move(atom), ts); + return true; + }; + + // TODO: lots of optimizations possible here... + value.forEachAtom(onAtom, outputName); + + return true; + }; + + if (!rowContents.empty()) + rowContents.forEachColumn(onColumn); + + ExpressionValue val(std::move(result)); + return val.getFilteredDestructive(filter); + }; + + GetAllColumnsOutput result; + result.info = std::make_shared(columnsWithInfo, SCHEMA_OPEN); + result.exec = exec; + return result; +} + +ColumnGetter +TransposeLexicalScope:: +doGetColumn(const ColumnName & columnName, int fieldOffset) +{ + return TableLexicalScope::doGetColumn(columnName, fieldOffset); +} + +std::set +TransposeLexicalScope:: +tableNames() const { + return {asName}; +} + +std::vector > +TransposeLexicalScope:: +outputAdded() const { + + return TableLexicalScope::outputAdded(); +} + +/*****************************************************************************/ +/* DATASET FUNCTION ELEMENT TRANSPOSE EXECUTOR */ +/*****************************************************************************/ + +DatasetFunctionElement::TransposeExecutor:: +TransposeExecutor(std::shared_ptr subpipeline) : + subpipeline_(subpipeline) +{ + +} + +std::shared_ptr +DatasetFunctionElement::TransposeExecutor:: +take() +{ + return subpipeline_->takeColumn(); +} + +std::shared_ptr +DatasetFunctionElement::TransposeExecutor:: +takeColumn() +{ + return subpipeline_->take(); +} + +void +DatasetFunctionElement::TransposeExecutor:: +restart() +{ + subpipeline_->restart(); +} + +/*****************************************************************************/ +/* DATASET FUNCTION ELEMENT MERGE EXECUTOR */ +/*****************************************************************************/ + +DatasetFunctionElement::MergeExecutor:: +MergeExecutor(std::shared_ptr subpipelineLeft, std::shared_ptr subpipelineRight) : + subpipelineLeft_(subpipelineLeft), subpipelineRight_(subpipelineRight) +{ + left = subpipelineLeft_->take(); + right = subpipelineRight_->take(); +} + +std::shared_ptr +DatasetFunctionElement::MergeExecutor:: +take() +{ + std::shared_ptr result; + while (left && right) { + + auto& leftRowName = left->values[left->values.size() - 2]; + auto& rightRowName = right->values[right->values.size() - 2]; + + if (leftRowName < rightRowName) { + left = subpipelineLeft_->take(); + } + else if (leftRowName > rightRowName) { + right = subpipelineRight_->take(); + } + else{ + + RowValue row; + left->values[left->values.size() - 1].appendToRow(ColumnName(), row); + right->values[right->values.size() - 1].appendToRow(ColumnName(), row); + + result = left; + result->values.push_back(leftRowName); + result->values.push_back(ExpressionValue(row)); + + left = subpipelineLeft_->take(); + right = subpipelineRight_->take(); + + break; + } + } + + return result; +} + +void +DatasetFunctionElement::MergeExecutor:: +restart() +{ + subpipelineLeft_->restart(); + subpipelineRight_->restart(); + left = subpipelineLeft_->take(); + right = subpipelineRight_->take(); +} + +/*****************************************************************************/ +/* BOUND DATASET FUNCTION ELEMENT */ +/*****************************************************************************/ + +DatasetFunctionElement::Bound:: +Bound(std::shared_ptr source, + std::shared_ptr subpipeline, + const Utf8String& asName, + const Utf8String& functionName) + : source_(std::move(source)), + subpipeline_(std::move(subpipeline)), + asName_(asName), + functionName_(functionName), + outputScope_(createOuputScope()) +{ +} + +DatasetFunctionElement::Bound:: +Bound(std::shared_ptr source, + std::shared_ptr subpipeline, + std::shared_ptr subpipelineRight, + const Utf8String& asName, + const Utf8String& functionName) + : source_(std::move(source)), + subpipeline_(std::move(subpipeline)), + subpipelineRight_(std::move(subpipelineRight)), + asName_(asName), + functionName_(functionName), + outputScope_(createOuputScope()) +{ +} + +std::shared_ptr +DatasetFunctionElement::Bound:: +createOuputScope() +{ + //For Transpose to get a complete schema we would need all the row names from the sub pipeline. + //TODO: for Merge we should get the complete schema + std::vector columns; + std::shared_ptr rowValueInfo = std::make_shared(columns, SCHEMA_OPEN); + + if (functionName_ == "transpose") + return source_->outputScope()->tableScope(std::make_shared(subpipeline_->outputScope(), + rowValueInfo, asName_)); + else if (functionName_ == "merge") + return subpipeline_->outputScope()->tableScope(std::make_shared(rowValueInfo, asName_)); + else + throw HttpReturnException(400, "Unknown dataset function in pipeline executor", + "function", functionName_); +} + +std::shared_ptr +DatasetFunctionElement::Bound:: +start(const BoundParameters & getParam) const +{ + if (functionName_ == "transpose") + return std::make_shared(subpipeline_->start(getParam)); + else if (functionName_ == "merge") + return std::make_shared(subpipeline_->start(getParam), subpipelineRight_->start(getParam)); + else + throw HttpReturnException(400, "Unknown dataset function in pipeline executor", + "function", functionName_); +} + +std::shared_ptr +DatasetFunctionElement::Bound:: +boundSource() const +{ + return source_; +} + +std::shared_ptr +DatasetFunctionElement::Bound:: +outputScope() const +{ + return outputScope_; +} } // namespace MLDB } // namespace Datacratic diff --git a/sql/execution_pipeline_impl.h b/sql/execution_pipeline_impl.h index e6b148350..8ea56ef9e 100644 --- a/sql/execution_pipeline_impl.h +++ b/sql/execution_pipeline_impl.h @@ -9,6 +9,7 @@ #include "execution_pipeline.h" #include "join_utils.h" +#include "mldb/utils/compact_vector.h" namespace Datacratic { namespace MLDB { @@ -66,6 +67,7 @@ struct GenerateRowsExecutor: public ElementExecutor { std::shared_ptr dataset; BasicRowGenerator generator; + std::function columnGenerator; BoundParameters params; std::vector current; @@ -76,6 +78,8 @@ struct GenerateRowsExecutor: public ElementExecutor { virtual std::shared_ptr take(); + std::shared_ptr takeColumn(); + virtual void restart(); }; @@ -177,7 +181,13 @@ struct SubSelectExecutor: public ElementExecutor { virtual std::shared_ptr take(); + virtual std::shared_ptr takeColumn(); + virtual void restart(); + + std::shared_ptr > > rows; + std::vector columnNames; + int columnIndex; }; @@ -322,8 +332,11 @@ struct JoinElement: public PipelineElement { std::shared_ptr leftImpl; std::shared_ptr rightImpl; + std::shared_ptr leftRaw; + std::shared_ptr rightRaw; struct Bound; + struct JoinTransposeExecutor; /** Execution runs over all left rows for each right row. The complexity is therefore O(left rows) * O(right rows). The canonical example of this @@ -333,15 +346,21 @@ struct JoinElement: public PipelineElement { CrossJoinExecutor(const Bound * parent, std::shared_ptr root, std::shared_ptr left, - std::shared_ptr right); + std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw); const Bound * parent; - std::shared_ptr root, left, right; + std::shared_ptr root, left, right, leftRaw, rightRaw; std::shared_ptr l,r; + + std::shared_ptr transpose_; virtual std::shared_ptr take(); + virtual std::shared_ptr takeColumn(); + void restart(); }; @@ -358,17 +377,22 @@ struct JoinElement: public PipelineElement { EquiJoinExecutor(const Bound * parent, std::shared_ptr root, std::shared_ptr left, - std::shared_ptr right); + std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw); const Bound * parent; - std::shared_ptr root, left, right; - + std::shared_ptr root, left, right, leftRaw, rightRaw; std::shared_ptr l,r; + std::shared_ptr transpose_; + void takeMoreInput(); virtual std::shared_ptr take(); + virtual std::shared_ptr takeColumn(); + virtual void restart(); }; @@ -381,12 +405,16 @@ struct JoinElement: public PipelineElement { Bound(std::shared_ptr root, std::shared_ptr left, std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw, AnnotatedJoinCondition condition, JoinQualification joinQualification); std::shared_ptr root_; std::shared_ptr left_; std::shared_ptr right_; + std::shared_ptr leftRaw_; + std::shared_ptr rightRaw_; std::shared_ptr outputScope_; BoundSqlExpression crossWhere_; AnnotatedJoinCondition condition_; @@ -409,6 +437,27 @@ struct JoinElement: public PipelineElement { output context is the same as its input context. */ virtual std::shared_ptr outputScope() const; + + }; + + struct JoinTransposeExecutor { + + JoinTransposeExecutor(ElementExecutor& joinExecutor, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw); + + std::shared_ptr take(); + + ElementExecutor& joinExecutor; + std::shared_ptr leftRaw; + std::shared_ptr rightRaw; + + int side; + + typedef std::map > SideRowIndex; + + SideRowIndex leftRowIndex; + SideRowIndex rightRowIndex; }; std::shared_ptr @@ -801,5 +850,100 @@ struct ParamsElement: public PipelineElement { std::shared_ptr bind() const; }; +/*****************************************************************************/ +/* TRANSPOSE LEXICAL SCOPE */ +/*****************************************************************************/ + +struct TransposeLexicalScope: public TableLexicalScope { + + TransposeLexicalScope(std::shared_ptr inner, + std::shared_ptr rowInfo, + Utf8String asName_); + + std::shared_ptr inner; + + virtual ColumnGetter + doGetColumn(const ColumnName & columnName, int fieldOffset); + + virtual GetAllColumnsOutput + doGetAllColumns(std::function keep, int fieldOffset); + + virtual std::set tableNames() const; + + virtual std::vector > + outputAdded() const; +}; + +/*****************************************************************************/ +/* DATASET FUNCTION ELEMENT */ +/*****************************************************************************/ + +struct DatasetFunctionElement : public PipelineElement { + + DatasetFunctionElement(std::shared_ptr source, + std::shared_ptr function, + GetParamInfo getParamInfo); + + std::shared_ptr source_; + std::shared_ptr function_; + + std::shared_ptr pipeline; + std::shared_ptr pipelineRight; + + struct TransposeExecutor: public ElementExecutor { + TransposeExecutor(std::shared_ptr subpipeline); + virtual std::shared_ptr take(); + virtual std::shared_ptr takeColumn(); + virtual void restart(); + + std::shared_ptr subpipeline_; + }; + + struct MergeExecutor: public ElementExecutor { + MergeExecutor(std::shared_ptr subpipelineLeft, + std::shared_ptr subpipelineRight); + virtual std::shared_ptr take(); + virtual void restart(); + + std::shared_ptr subpipelineLeft_; + std::shared_ptr subpipelineRight_; + + std::shared_ptr left; + std::shared_ptr right; + }; + + struct Bound: public BoundPipelineElement { + Bound(std::shared_ptr source, + std::shared_ptr subpipeline, + const Utf8String& asName, + const Utf8String& functionName); + Bound(std::shared_ptr source, + std::shared_ptr subpipeline, + std::shared_ptr subpipelineRight, + const Utf8String& asName, + const Utf8String& functionName); + + std::shared_ptr source_; + std::shared_ptr subpipeline_; + std::shared_ptr subpipelineRight_; + Utf8String asName_; + Utf8String functionName_; + std::shared_ptr outputScope_; + + std::shared_ptr + start(const BoundParameters & getParam) const; + + virtual std::shared_ptr + boundSource() const; + + virtual std::shared_ptr outputScope() const; + + std::shared_ptr createOuputScope(); + }; + + std::shared_ptr bind() const; + +}; + } // namespace MLDB } // namespace Datacratic diff --git a/sql/sql_expression.h b/sql/sql_expression.h index 288210b5c..1a26000fa 100644 --- a/sql/sql_expression.h +++ b/sql/sql_expression.h @@ -233,6 +233,9 @@ struct TableOperations { ssize_t limit)> runQuery; + std::function + getColumn; + /// What aliases (sub-dataset names) does this dataset contain? /// Normally used in a join std::function () > getChildAliases; diff --git a/sql/table_expression_operations.cc b/sql/table_expression_operations.cc index 88c08fdbc..cf6513863 100644 --- a/sql/table_expression_operations.cc +++ b/sql/table_expression_operations.cc @@ -54,6 +54,16 @@ bindDataset(std::shared_ptr dataset, Utf8String asName) offset, limit); }; + //Todo: Column Generator? To avoid doing the getColumnNames all over again + result.table.getColumn = [=] (ssize_t offset) -> MatrixColumn + { + std::vector columnNames = cols->getColumnNames(); + if (offset >= columnNames.size()) + return MatrixColumn(); + else + return cols->getColumn(columnNames[offset]); + }; + result.table.getChildAliases = [=] () { std::vector aliases; @@ -537,6 +547,7 @@ BoundTableExpression DatasetFunctionExpression:: bind(SqlBindingScope & context) const { + //binding args std::vector boundArgs; for (auto arg : args) boundArgs.push_back(arg->bind(context)); @@ -546,6 +557,7 @@ bind(SqlBindingScope & context) const expValOptions = options->constantValue(); } + //getting function auto fn = context.doGetDatasetFunction(functionName, boundArgs, expValOptions, asName); if (!fn) diff --git a/testing/MLDB-1500-transpose-query.js b/testing/MLDB-1500-transpose-query.js index c03f5e11b..dc0e75224 100644 --- a/testing/MLDB-1500-transpose-query.js +++ b/testing/MLDB-1500-transpose-query.js @@ -184,5 +184,26 @@ mldb.log(res.json.error) assertEqual(res.json.error, "Non-aggregator 'horizontal_count({*})' with GROUP BY clause is not allowed", "Did not get the expected error"); +//MLDB-1575 + +res = mldb.put('/v1/functions/bop3', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose((select $x))" + } +}) + +expected = [ + [ "_rowName", "bop3({x : 'a'}).result" ], + [ "result", "a" ] +]; + +res = mldb.get('/v1/query', { q: "select bop3({x : 'a'})", format: 'table' }); + +mldb.log(res.json) + +assertEqual(mldb.diff(expected, res.json, false /* strict */), {}, + "output was not the same as expected output in pipeline executor"); + "success" diff --git a/testing/MLDB-1675-pipeline-dataset-function.py b/testing/MLDB-1675-pipeline-dataset-function.py new file mode 100644 index 000000000..8de377fe1 --- /dev/null +++ b/testing/MLDB-1675-pipeline-dataset-function.py @@ -0,0 +1,216 @@ +# MLDB-1639-join-where.py +# Mathieu Marquis Bolduc, 2016-16-05 +# This file is part of MLDB. Copyright 2016 Datacratic. All rights reserved. +# + +import unittest +import json + +mldb = mldb_wrapper.wrap(mldb) # noqa + +class DatasetFunctionTest(MldbUnitTest): + + @classmethod + def setUpClass(self): + # create a dummy dataset + ds = mldb.create_dataset({ "id": "dataset1", "type": "sparse.mutable" }) + ds.record_row("a",[["x", "toy story", 0]]) + ds.commit() + + ds = mldb.create_dataset({ "id": "dataset2", "type": "sparse.mutable" }) + ds.record_row("row_a",[["x", "toy story", 0],["y", "123456", 0]]) + ds.record_row("row_b",[["x", "terminator", 0]]) + ds.commit() + + ds = mldb.create_dataset({ "id": "dataset3", "type": "sparse.mutable" }) + ds.record_row("row_a",[["z", 0.1, 0]]) + ds.record_row("row_b",[["z", 0.2, 0]]) + ds.commit() + + ds = mldb.create_dataset({ "id": "dataset4", "type": "sparse.mutable" }) + ds.record_row("result",[["z", 0.1, 0]]) + ds.commit() + + def test_transpose_dataset(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(dataset2)" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName","bop().row_a","bop().row_b"],["result","toy story","terminator"]] + + self.assertTableResultEquals(res, expected) + + def test_transpose_subselect(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose((select $k as blah))" + } + }) + + res = mldb.query("select bop({k : 'x'})") + mldb.log(res) + + expected = [["_rowName", "bop({k : 'x'}).result"],["result","x"]] + + self.assertTableResultEquals(res, expected) + + def test_transpose_join(self): + + res = mldb.query("select * from transpose(dataset1 as t1 JOIN dataset2 as t2 on t1.x = t2.x)"); + + mldb.log(res) + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(dataset1 as t1 JOIN dataset2 as t2 on t1.x = t2.x)" + } + }) + + res = mldb.put('/v1/functions/bop2', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(dataset1 as t1 JOIN dataset2 as t2 on t1.x = t2.x) offset 2" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName","bop().[a]-[row_a]"],["result","toy story"]] + + res = mldb.query("select bop2()") + mldb.log(res) + + expected = [["_rowName","bop2().[a]-[row_a]"], ["result", "123456"]] + + self.assertTableResultEquals(res, expected) + + def test_transpose_join_cross(self): + + res = mldb.query("select * from transpose(dataset1 as t1 JOIN dataset2 as t2)"); + + mldb.log(res) + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(dataset1 as t1 JOIN dataset2 as t2)" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName", "bop().[a]-[row_a]", "bop().[a]-[row_b]"], + ["result", "toy story", "toy story"]] + + self.assertTableResultEquals(res, expected) + + def test_transpose_transpose(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(transpose(dataset2))" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName","bop().x"],["result","terminator"]] + + self.assertTableResultEquals(res, expected) + + def test_transpose_transpose(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(transpose(dataset2))" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName","bop().x"],["result","terminator"]] + + self.assertTableResultEquals(res, expected) + + def test_merge_dataset(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from merge(dataset2, dataset3)" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName","bop().x","bop().y","bop().z"],["result","toy story","123456",0.1]] + + self.assertTableResultEquals(res, expected) + + def test_merge_subselect(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from merge(dataset4, (select $k as blah))" + } + }) + + res = mldb.query("select bop({k : 'x'})") + mldb.log(res) + + expected = [["_rowName","bop({k : 'x'}).blah","bop({k : 'x'}).z"],["result","x",0.1]] + + self.assertTableResultEquals(res, expected) + + def test_row_dataset(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from row_dataset({x: 1, y:2, z: 'three'})" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName","bop().column","bop().value"],["result","x",1]] + + self.assertTableResultEquals(res, expected) + + @unittest.expectedFailure #not yet implemented + def test_sampled_dataset(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "SELECT x.* FROM sample(dataset2, {rows: 1, withReplacement: FALSE}) AS x" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [] # to be filled + + self.assertTableResultEquals(res, expected) + +mldb.run_tests() \ No newline at end of file diff --git a/testing/testing.mk b/testing/testing.mk index 5ada03f3a..83e9af803 100644 --- a/testing/testing.mk +++ b/testing/testing.mk @@ -270,6 +270,7 @@ $(eval $(call mldb_unit_test,MLDB-1452-like-operator.py)) $(eval $(call mldb_unit_test,MLDB-1440_sqlexpr_ignore_unknown_param.py)) $(eval $(call mldb_unit_test,MLDB-1595-count-distinct.py)) $(eval $(call mldb_unit_test,MLDB-1639-join-where.py)) +$(eval $(call mldb_unit_test,MLDB-1675-pipeline-dataset-function.py)) $(eval $(call mldb_unit_test,MLDB-1661-function-name-conflict.py)) $(eval $(call mldb_unit_test,MLDB-1710-left-right-rowname.py))