From 2d1e3d420c42729b0dee45d72354401ce4fd6b22 Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Wed, 25 May 2016 13:22:59 -0400 Subject: [PATCH 01/12] MLDB-1675 do dataset transpose without leaving pipeline executor --- sql/execution_pipeline.cc | 8 + sql/execution_pipeline.h | 3 + sql/execution_pipeline_impl.cc | 315 ++++++++++++++++++++++++++- sql/execution_pipeline_impl.h | 78 +++++++ sql/sql_expression.h | 3 + sql/table_expression_operations.cc | 12 + testing/MLDB-1500-transpose-query.js | 15 ++ 7 files changed, 432 insertions(+), 2 deletions(-) diff --git a/sql/execution_pipeline.cc b/sql/execution_pipeline.cc index 9b14c3a1e..80c36e1b9 100644 --- a/sql/execution_pipeline.cc +++ b/sql/execution_pipeline.cc @@ -362,6 +362,12 @@ takeAll(std::function &)> onResult) return true; } +std::shared_ptr +ElementExecutor:: +takeColumn() +{ + throw HttpReturnException(500, "Element Executor " + ML::type_name(*this) + " does not allow taking columns"); +} /*****************************************************************************/ /* PIPELINE ELEMENT */ /*****************************************************************************/ @@ -391,6 +397,8 @@ std::shared_ptr PipelineElement:: params(std::function (const Utf8String & name)> getParamInfo) { + ExcAssert(shared_from_this()); + ExcAssert(getParamInfo); return std::make_shared(shared_from_this(), std::move(getParamInfo)); } diff --git a/sql/execution_pipeline.h b/sql/execution_pipeline.h index eb8b76cca..bd85512e9 100644 --- a/sql/execution_pipeline.h +++ b/sql/execution_pipeline.h @@ -223,6 +223,9 @@ struct ElementExecutor { /** Take one element from the pipeline. */ virtual std::shared_ptr take() = 0; + /** Take one element from the pipeline. */ + virtual std::shared_ptr takeColumn(); + /** Take all elements from the pipeline. inParallel describes whether the function can be called from multiple threads at once. */ diff --git a/sql/execution_pipeline_impl.cc b/sql/execution_pipeline_impl.cc index b12d9fe11..cc1e2bb1e 100644 --- a/sql/execution_pipeline_impl.cc +++ b/sql/execution_pipeline_impl.cc @@ -286,6 +286,31 @@ take() return result; } +std::shared_ptr +GenerateRowsExecutor:: +takeColumn() +{ + auto result = source->take(); + + if (!result) + return result; + + auto column = columnGenerator(currentDone); + + if (column.columnName.empty()) + return nullptr; + + //cerr << "got column " << column.columnName << " " + // << jsonEncodeStr(column.rows) << endl; + + result->values.emplace_back(column.columnName, + Date::notADate()); + result->values.emplace_back(std::move(column.rows)); + + ++currentDone; + return result; +} + void GenerateRowsExecutor:: restart() @@ -353,6 +378,7 @@ start(const BoundParameters & getParam) const parent->orderBy, 0 /* offset */, -1 /* limit */); result->params = getParam; + result->columnGenerator = parent->from.getColumn; ExcAssert(result->params); return result; } @@ -435,7 +461,7 @@ outputAdded() const { SubSelectExecutor:: SubSelectExecutor(std::shared_ptr boundSelect, - const BoundParameters & getParam) + const BoundParameters & getParam) : columnIndex(0) { pipeline = boundSelect->start(getParam); } @@ -451,11 +477,80 @@ take() return subResult; } +std::shared_ptr +SubSelectExecutor:: +takeColumn() +{ + //We must do the terrible thing and query everything to build a transposed view. + if (!rows) { + + rows = make_shared > >(); + + while (1){ + auto subResult = pipeline->take(); + + if (!subResult) + break; + + rows->push_back(subResult); + + auto pushColumn = [&] (const PathElement & columnName, const ExpressionValue & val) { + columnNames.push_back(columnName); + return true; + }; + + ExpressionValue& values = subResult->values[1]; //TODO: will it always be 1? + values.forEachColumn(pushColumn); + } + + sort( columnNames.begin(), columnNames.end() ); + columnNames.erase( unique( columnNames.begin(), columnNames.end() ), columnNames.end() ); + + columnIndex = 0; + } + + if (columnIndex >= columnNames.size()) + return nullptr; + + PathElement subColumnName = columnNames[columnIndex++]; + + std::shared_ptr result = make_shared(); + + result->values.emplace_back(subColumnName.toUtf8String(), Date::notADate()); //column name becomes the rowname + + RowValue resultValues; + + for (auto& r : *rows) { + + ExpressionValue& rowName = r->values[0]; //rowName becomes the colum name + Path rowNamePath; + rowNamePath.parse(rowName.toUtf8String()); + + auto pushResult = [&] (const PathElement & columnName, const ExpressionValue & val) { + if (columnName == subColumnName) { + resultValues.emplace_back(rowNamePath, val.coerceToAtom(), val.getEffectiveTimestamp()); + } + + return true; + }; + + ExpressionValue& values = r->values[1]; //TODO: will it always be 1? + values.forEachColumn(pushResult); + } + + result->values.emplace_back(resultValues); + + return result; +} + void SubSelectExecutor:: restart() { pipeline->restart(); + rows.reset(); + columnNames.clear(); + columnIndex = 0; } @@ -1348,7 +1443,9 @@ FromElement(std::shared_ptr root_, ExcAssert(this->root); UnboundEntities unbound = from->getUnbound(); - //cerr << "unbound for from = " << jsonEncode(unbound) << endl; + //cerr << "unbound for from = " << jsonEncode(unbound) << endl; + + cerr << from->getType() << endl; if (from->getType() == "join") { std::shared_ptr join @@ -1380,6 +1477,22 @@ FromElement(std::shared_ptr root_, impl.reset(new SubSelectElement(root, subSelect->statement, getParamInfo, from->getAs())); } + else if (from->getType() == "datasetFunction") { + std::shared_ptr function = std::dynamic_pointer_cast(from); + ExcAssert(this->root); + ExcAssert(function); + + GetParamInfo getParamInfo = [&] (const Utf8String & paramName) + -> std::shared_ptr + { + throw HttpReturnException(500, "No query parameter " + paramName); + }; + + if (params_) + getParamInfo = params_; + + impl.reset(new DatasetFunctionElement(this->root, function, getParamInfo)); + } else { #if 0 if (!unbound.params.empty()) @@ -1463,6 +1576,7 @@ FromElement(std::shared_ptr root_, orderBy)); } else { + // Need to bound here to get the dataset auto rootBound = root->bind(); auto scope = rootBound->outputScope(); @@ -2098,6 +2212,203 @@ outputScope() const return outputScope_; } +/*****************************************************************************/ +/* DATASET FUNCTION ELEMENT */ +/*****************************************************************************/ + +DatasetFunctionElement:: +DatasetFunctionElement(std::shared_ptr root, std::shared_ptr function, GetParamInfo getParamInfo) + : source_(std::move(root)), function_(function) +{ + ExcAssert(function->args.size() == 1); + ExcAssert(source_); + ExcAssert(getParamInfo); + + pipeline = source_ + ->params(getParamInfo) + ->from(function->args[0], WhenExpression::TRUE, + SelectExpression::STAR, SelectExpression::TRUE, + OrderByExpression(), getParamInfo); +} + +std::shared_ptr +DatasetFunctionElement:: +bind() const +{ + return std::make_shared(source_->bind(), pipeline->bind(), function_->getAs()); +} + +/*****************************************************************************/ +/* TRANSPOSE LEXICAL SCOPE */ +/*****************************************************************************/ + +TransposeLexicalScope:: +TransposeLexicalScope(std::shared_ptr inner, std::shared_ptr rowValueInfo, Utf8String asName) + : TableLexicalScope(rowValueInfo, asName), + inner(inner) { + +} + +GetAllColumnsOutput +TransposeLexicalScope:: +doGetAllColumns(std::function keep, + int fieldOffset) +{ + //cerr << "transpose lexical scope get columns: fieldOffset = " + // << fieldOffset << endl; + + ExcAssertGreaterEqual(fieldOffset, 0); + + std::vector columnsWithInfo; + + auto exec = [=] (const SqlRowScope & rowScope, const VariableFilter & filter) -> ExpressionValue + { + auto & row = rowScope.as(); + + const ExpressionValue & rowContents + = row.values.at(fieldOffset + ROW_CONTENTS); + + RowValue result; + + auto onColumn = [&] (const PathElement & columnName, + const ExpressionValue & value) + { + //We have to check it in here because we dont know all the rownames in + ColumnName outputName = keep(columnName); + + if (outputName.empty() && !asName.empty()) { + + //try with the table alias + outputName = keep(PathElement(asName) + columnName); + } + + if (outputName.empty()) + return true; + + auto onAtom = [&] (const Path & columnName, + const Path & prefix, + CellValue atom, + Date ts) + { + result.emplace_back(prefix + columnName, + std::move(atom), ts); + return true; + }; + + // TODO: lots of optimizations possible here... + value.forEachAtom(onAtom, outputName); + + return true; + }; + + if (!rowContents.empty()) + rowContents.forEachColumn(onColumn); + + ExpressionValue val(std::move(result)); + return val.getFilteredDestructive(filter); + }; + + GetAllColumnsOutput result; + result.info = std::make_shared(columnsWithInfo, SCHEMA_OPEN); + result.exec = exec; + return result; +} + +ColumnGetter +TransposeLexicalScope:: +doGetColumn(const ColumnName & columnName, int fieldOffset) +{ + return TableLexicalScope::doGetColumn(columnName, fieldOffset); +} + +std::set +TransposeLexicalScope:: +tableNames() const { + return {asName}; +} + +std::vector > +TransposeLexicalScope:: +outputAdded() const { + + return TableLexicalScope::outputAdded(); +} + +/*****************************************************************************/ +/* DATASET FUNCTION ELEMENT TRANSPOSE EXECUTOR */ +/*****************************************************************************/ + +DatasetFunctionElement::TransposeExecutor:: +TransposeExecutor(std::shared_ptr subpipeline) : + subpipeline_(subpipeline) +{ + +} + +std::shared_ptr +DatasetFunctionElement::TransposeExecutor:: +take() +{ + return subpipeline_->takeColumn(); +} + +void +DatasetFunctionElement::TransposeExecutor:: +restart() +{ + subpipeline_->restart(); +} + +/*****************************************************************************/ +/* BOUND DATASET FUNCTION ELEMENT */ +/*****************************************************************************/ + +DatasetFunctionElement::Bound:: +Bound(std::shared_ptr source, + std::shared_ptr subpipeline, + const Utf8String& asName) + : source_(std::move(source)), + subpipeline_(std::move(subpipeline)), + asName_(asName), + outputScope_(createOuputScope()) +{ +} + +std::shared_ptr +DatasetFunctionElement::Bound:: +createOuputScope() +{ + //To get a complete scheme we would need all the row names from the sub pipeline. + std::vector columns; + + std::shared_ptr rowValueInfo = std::make_shared(columns, SCHEMA_OPEN); + + auto tableScope = source_->outputScope() + ->tableScope(std::make_shared(subpipeline_->outputScope(), rowValueInfo, asName_)); + + return tableScope; +} + +std::shared_ptr +DatasetFunctionElement::Bound:: +start(const BoundParameters & getParam) const +{ + return std::make_shared(subpipeline_->start(getParam)); +} + +std::shared_ptr +DatasetFunctionElement::Bound:: +boundSource() const +{ + return source_; +} + +std::shared_ptr +DatasetFunctionElement::Bound:: +outputScope() const +{ + return outputScope_; +} } // namespace MLDB } // namespace Datacratic diff --git a/sql/execution_pipeline_impl.h b/sql/execution_pipeline_impl.h index bb594b374..aac1fb20e 100644 --- a/sql/execution_pipeline_impl.h +++ b/sql/execution_pipeline_impl.h @@ -66,6 +66,7 @@ struct GenerateRowsExecutor: public ElementExecutor { std::shared_ptr dataset; BasicRowGenerator generator; + std::function columnGenerator; BoundParameters params; std::vector current; @@ -76,6 +77,8 @@ struct GenerateRowsExecutor: public ElementExecutor { virtual std::shared_ptr take(); + std::shared_ptr takeColumn(); + virtual void restart(); }; @@ -171,7 +174,13 @@ struct SubSelectExecutor: public ElementExecutor { virtual std::shared_ptr take(); + virtual std::shared_ptr takeColumn(); + virtual void restart(); + + std::shared_ptr > > rows; + std::vector columnNames; + int columnIndex; }; @@ -789,5 +798,74 @@ struct ParamsElement: public PipelineElement { std::shared_ptr bind() const; }; +/*****************************************************************************/ +/* TRANSPOSE LEXICAL SCOPE */ +/*****************************************************************************/ + +struct TransposeLexicalScope: public TableLexicalScope { + + TransposeLexicalScope(std::shared_ptr inner, std::shared_ptr rowInfo, Utf8String asName_); + + std::shared_ptr inner; + + virtual ColumnGetter + doGetColumn(const ColumnName & columnName, int fieldOffset); + + virtual GetAllColumnsOutput + doGetAllColumns(std::function keep, int fieldOffset); + + virtual std::set tableNames() const; + + virtual std::vector > + outputAdded() const; +}; + +/*****************************************************************************/ +/* DATASET FUNCTION ELEMENT */ +/*****************************************************************************/ + +struct DatasetFunctionElement : public PipelineElement { + + DatasetFunctionElement(std::shared_ptr source, + std::shared_ptr function, + GetParamInfo getParamInfo); + + std::shared_ptr source_; + std::shared_ptr function_; + + std::shared_ptr pipeline; + std::shared_ptr subpipeline_; + + struct TransposeExecutor: public ElementExecutor { + TransposeExecutor(std::shared_ptr subpipeline); + virtual std::shared_ptr take(); + virtual void restart(); + + std::shared_ptr subpipeline_; + }; + + struct Bound: public BoundPipelineElement { + Bound(std::shared_ptr source, std::shared_ptr subpipeline, const Utf8String& asName); + + std::shared_ptr source_; + std::shared_ptr subpipeline_; + Utf8String asName_; + std::shared_ptr outputScope_; + + std::shared_ptr + start(const BoundParameters & getParam) const; + + virtual std::shared_ptr + boundSource() const; + + virtual std::shared_ptr outputScope() const; + + std::shared_ptr createOuputScope(); + }; + + std::shared_ptr bind() const; + +}; + } // namespace MLDB } // namespace Datacratic diff --git a/sql/sql_expression.h b/sql/sql_expression.h index 6eac8dc9d..bbcc44ac7 100644 --- a/sql/sql_expression.h +++ b/sql/sql_expression.h @@ -233,6 +233,9 @@ struct TableOperations { ssize_t limit)> runQuery; + std::function + getColumn; + /// What aliases (sub-dataset names) does this dataset contain? /// Normally used in a join std::function () > getChildAliases; diff --git a/sql/table_expression_operations.cc b/sql/table_expression_operations.cc index 88c08fdbc..cf6513863 100644 --- a/sql/table_expression_operations.cc +++ b/sql/table_expression_operations.cc @@ -54,6 +54,16 @@ bindDataset(std::shared_ptr dataset, Utf8String asName) offset, limit); }; + //Todo: Column Generator? To avoid doing the getColumnNames all over again + result.table.getColumn = [=] (ssize_t offset) -> MatrixColumn + { + std::vector columnNames = cols->getColumnNames(); + if (offset >= columnNames.size()) + return MatrixColumn(); + else + return cols->getColumn(columnNames[offset]); + }; + result.table.getChildAliases = [=] () { std::vector aliases; @@ -537,6 +547,7 @@ BoundTableExpression DatasetFunctionExpression:: bind(SqlBindingScope & context) const { + //binding args std::vector boundArgs; for (auto arg : args) boundArgs.push_back(arg->bind(context)); @@ -546,6 +557,7 @@ bind(SqlBindingScope & context) const expValOptions = options->constantValue(); } + //getting function auto fn = context.doGetDatasetFunction(functionName, boundArgs, expValOptions, asName); if (!fn) diff --git a/testing/MLDB-1500-transpose-query.js b/testing/MLDB-1500-transpose-query.js index 33a925b76..6475e6f56 100644 --- a/testing/MLDB-1500-transpose-query.js +++ b/testing/MLDB-1500-transpose-query.js @@ -183,5 +183,20 @@ mldb.log(res.json.error) assertEqual(res.json.error, "Non-aggregator 'horizontal_count({*})' with GROUP BY clause is not allowed", "Did not get the expected error"); +//MLDB-1575 + +res = mldb.put('/v1/functions/bop3', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose((select $x))" + } +}) + +mldb.log(res) + +res = mldb.get('/v1/query', { q: "select bop3({x : 'a'})" }); + +mldb.log(res.json) + "success" From 6dacc31d750fa0a3eb4bed3499d2432cc7dda6b6 Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Thu, 26 May 2016 08:27:33 -0400 Subject: [PATCH 02/12] MLDB-1675 add test file for dataset functions in pipeline --- sql/execution_pipeline_impl.cc | 15 ++-- .../MLDB-1675-pipeline-dataset-function.py | 90 +++++++++++++++++++ testing/testing.mk | 1 + 3 files changed, 101 insertions(+), 5 deletions(-) create mode 100644 testing/MLDB-1675-pipeline-dataset-function.py diff --git a/sql/execution_pipeline_impl.cc b/sql/execution_pipeline_impl.cc index cc1e2bb1e..f876c15c1 100644 --- a/sql/execution_pipeline_impl.cc +++ b/sql/execution_pipeline_impl.cc @@ -499,7 +499,10 @@ takeColumn() return true; }; - ExpressionValue& values = subResult->values[1]; //TODO: will it always be 1? + ssize_t valuesOffset = subResult->values.size() - 1; + + ExpressionValue& values = subResult->values[valuesOffset]; + values.forEachColumn(pushColumn); } @@ -522,9 +525,11 @@ takeColumn() for (auto& r : *rows) { - ExpressionValue& rowName = r->values[0]; //rowName becomes the colum name - Path rowNamePath; - rowNamePath.parse(rowName.toUtf8String()); + ssize_t valuesOffset = r->values.size() - 1; + ssize_t rowNameOffset = valuesOffset - 1; + + ExpressionValue& rowName = r->values[rowNameOffset]; //rowName becomes the colum name + Path rowNamePath = Path::parse(rowName.toUtf8String()); auto pushResult = [&] (const PathElement & columnName, const ExpressionValue & val) { if (columnName == subColumnName) { @@ -534,7 +539,7 @@ takeColumn() return true; }; - ExpressionValue& values = r->values[1]; //TODO: will it always be 1? + ExpressionValue& values = r->values[valuesOffset]; values.forEachColumn(pushResult); } diff --git a/testing/MLDB-1675-pipeline-dataset-function.py b/testing/MLDB-1675-pipeline-dataset-function.py new file mode 100644 index 000000000..ce01f21a2 --- /dev/null +++ b/testing/MLDB-1675-pipeline-dataset-function.py @@ -0,0 +1,90 @@ +# MLDB-1639-join-where.py +# Mathieu Marquis Bolduc, 2016-16-05 +# This file is part of MLDB. Copyright 2016 Datacratic. All rights reserved. +# + +import unittest +import json + +mldb = mldb_wrapper.wrap(mldb) # noqa + +class DatasetFunctionTest(MldbUnitTest): + + @classmethod + def setUpClass(self): + # create a dummy dataset + ds = mldb.create_dataset({ "id": "dataset1", "type": "sparse.mutable" }) + ds.record_row("a",[["x", "toy story", 0]]) + ds.commit() + + ds = mldb.create_dataset({ "id": "dataset2", "type": "sparse.mutable" }) + ds.record_row("row_a",[["x", "toy story", 0]]) + ds.record_row("row_b",[["x", "terminator", 0]]) + ds.commit() + + def test_transpose_dataset(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(dataset2)" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName","bop().row_a","bop().row_b"],["result","toy story","terminator"]] + + self.assertEqual(res, expected) + + @unittest.expectedFailure #not the expected rowname + def test_transpose_subselect(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose((select $k as blah))" + } + }) + + mldb.log(res) + + res = mldb.query("select bop({k : 'x'})") + mldb.log(res) + + expected = [["_rowName", "bop().result"],["blah","x"]] + + self.assertEqual(res, expected) + + #joins executors dont support taking columns yet. Is there a way not to keep the whole transposed table in memory? + #one way or another we have to do the full join first to find all the rows + #then we either keep the rows or get them again + #except in the pipeline executor we dont have direct access to row, only iterative + + #in a join dataset, a column belongs to one or the other... + #can we take all columns from one dataset, then the other? + + #should we have a transpose-join-executor? + + #if no subselect is inside the transpose, its probably better to use the old pipeline... + @unittest.expectedFailure + def test_transpose_join(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(dataset1 as t1 JOIN dataset2 as t2 on t1.x = t2.x)" + } + }) + + mldb.log(res) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName", "bop().result"],["blah","x"]] + + self.assertEqual(res, expected) + +mldb.run_tests() \ No newline at end of file diff --git a/testing/testing.mk b/testing/testing.mk index b3c168d0b..eea947c42 100644 --- a/testing/testing.mk +++ b/testing/testing.mk @@ -271,6 +271,7 @@ $(eval $(call mldb_unit_test,MLDB-1452-like-operator.py)) $(eval $(call mldb_unit_test,MLDB-1440_sqlexpr_ignore_unknown_param.py)) $(eval $(call mldb_unit_test,MLDB-1595-count-distinct.py)) $(eval $(call mldb_unit_test,MLDB-1639-join-where.py)) +$(eval $(call mldb_unit_test,MLDB-1675-pipeline-dataset-function.py)) #$(eval $(call mldb_unit_test,pytanic_plugin_test.py)) $(eval $(call python_test,mldb_merged_dataset_test,mldb_py_runner)) From f025e042c8786985ac39d15e2027b021be10e521 Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Fri, 27 May 2016 13:10:55 -0400 Subject: [PATCH 03/12] MLDB-1675 start on takeColumn for joins --- sql/execution_pipeline_impl.cc | 115 +++++++++++++++++++++++++++++++++ sql/execution_pipeline_impl.h | 2 + 2 files changed, 117 insertions(+) diff --git a/sql/execution_pipeline_impl.cc b/sql/execution_pipeline_impl.cc index f876c15c1..2b9dc8b50 100644 --- a/sql/execution_pipeline_impl.cc +++ b/sql/execution_pipeline_impl.cc @@ -1250,6 +1250,121 @@ take() return nullptr; } +std::shared_ptr +JoinElement::EquiJoinExecutor:: +takeColumn() +{ + //Transpose of joins should be avoided. + //We need to build the full row list and column index before we can proceed + + typedef std::map > SideRowIndex; + + SideRowIndex leftRowIndex; + SideRowIndex rightRowIndex; + + int side = 0; + + bool doneYet = false; + + if (!doneYet) { + while (1){ + auto res = take(); + if (!res) + break; + + ssize_t columnsOffset = res->values.size() -1; + ssize_t rowNameOffset = columnsOffset -1; + + Utf8String rowNameUtf8 = res->values.at(rowNameOffset).toUtf8String(); + RowName rowName = RowName::parse(rowNameUtf8); + + Utf8String leftNameUtf8 = ""; + if (!res->values.at(0).empty()) //TODO: Always 0? + leftNameUtf8 = res->values.at(0).toUtf8String(); + size_t i = 2; + for (; i + 2 < res->values.size(); i+=2) { + if (i == 2) + leftNameUtf8 = "[" + leftNameUtf8 + "]"; + + leftNameUtf8 += res->values.at(0).empty() ? "-[]" : + "-[" + res->values.at(i).toUtf8String() + "]"; + } + + RowName leftName = RowName::parse(leftNameUtf8); + + Utf8String rightNameUtf8 = ""; + if (!res->values.at(i).empty()) + rightNameUtf8 = res->values.at(i).toUtf8String(); + RowName rightName = RowName::parse(rightNameUtf8); + + //recordJoinRow(leftName, leftName, rightName, rightName); + + leftRowIndex[leftName].push_back(rowName); + rightRowIndex[rightName].push_back(rowName); + } + } + + std::shared_ptr columnResult; + + SideRowIndex* index = &leftRowIndex; + if (side == 0){ + columnResult = this->left->takeColumn(); + if (!columnResult) + ++side; + } + if (side == 1){ + index = &rightRowIndex; + columnResult = this->right->takeColumn(); + if (!columnResult) + ++side; + } + + if (!columnResult) + return columnResult; + + //For each row in the return column, filter out those that weren't joined + ssize_t columnsOffset = columnResult->values.size() -1; + // ssize_t rowNameOffset = columnsOffset -1; + + const ExpressionValue& rows = columnResult->values[columnsOffset]; + + //typedef std::vector > RowValue; + //RowValue values; + //typedef std::vector > StructValue; + StructValue values; + + //Date ts = rows.getEffectiveTimestamp(); + + auto onRow = [&] (const PathElement & columnName, const ExpressionValue & val) { + RowHash rowHash = RowName(columnName); + + // Does this row appear in the output? If not, nothing to + // do with it + auto it = index->find(rowHash); + if (it == index->end()) + return true; + + //copy the value + if (it->second.size() == 1) { + values.emplace_back(it->second[0][0], std::move(val)); + } + else { + // Can't move the value to avoid it becoming null + for (const RowName & outputRowName: it->second) { + values.emplace_back(outputRowName[0], val); + } + } + + return true; + }; + + rows.forEachColumn(onRow); + + columnResult->values[columnsOffset] = std::move(values); + + return columnResult; +} + void JoinElement::EquiJoinExecutor:: restart() diff --git a/sql/execution_pipeline_impl.h b/sql/execution_pipeline_impl.h index aac1fb20e..b56ee8842 100644 --- a/sql/execution_pipeline_impl.h +++ b/sql/execution_pipeline_impl.h @@ -367,6 +367,8 @@ struct JoinElement: public PipelineElement { virtual std::shared_ptr take(); + virtual std::shared_ptr takeColumn(); + virtual void restart(); }; From 919ebc09fa5e7d2865cf4653e4b722d48057f187 Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Tue, 21 Jun 2016 11:25:23 -0400 Subject: [PATCH 04/12] MLDB-1675 fix sql.query to consider offset with FIRST_ROW option --- plugins/sql_functions.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plugins/sql_functions.cc b/plugins/sql_functions.cc index 7077da5d2..1274e80b2 100644 --- a/plugins/sql_functions.cc +++ b/plugins/sql_functions.cc @@ -176,8 +176,11 @@ struct SqlQueryFunctionApplier: public FunctionApplier { switch (function->functionConfig.output) { case FIRST_ROW: { ExpressionValue result; - auto output = executor->take(); + ssize_t offset = function->functionConfig.query.stm->offset; + + for (size_t n = 0; output && n < offset; ++n) + output = executor->take(); if (output) { // MLDB-1329 band-aid fix. This appears to break a circlar From 4ca0e6665a8e2dd3d21761bfb475d158be0f6689 Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Tue, 21 Jun 2016 11:26:14 -0400 Subject: [PATCH 05/12] MLDB-1675 another pass at transpose of joins in the pipeline executor --- sql/execution_pipeline_impl.cc | 84 ++++++++++++------- sql/execution_pipeline_impl.h | 24 +++++- .../MLDB-1675-pipeline-dataset-function.py | 51 +++++++---- 3 files changed, 107 insertions(+), 52 deletions(-) diff --git a/sql/execution_pipeline_impl.cc b/sql/execution_pipeline_impl.cc index e468078f1..012dd4bd2 100644 --- a/sql/execution_pipeline_impl.cc +++ b/sql/execution_pipeline_impl.cc @@ -15,7 +15,6 @@ #include "table_expression_operations.h" #include #include "mldb/sql/sql_expression_operations.h" -#include "mldb/jml/utils/compact_vector.h" using namespace std; @@ -954,6 +953,17 @@ JoinElement(std::shared_ptr root, ->from(right, boundRight, when, selectAll, rightCondition, condition.right.orderBy) ->select(rightEmbedding); + + //for takeColumn + leftRaw= root + ->where(constantWhere) + ->from(left, boundLeft, when, selectAll, leftCondition, + condition.left.orderBy); + + rightRaw = root + ->where(constantWhere) + ->from(right, boundRight, when, selectAll, rightCondition, + condition.right.orderBy); } std::shared_ptr @@ -963,6 +973,8 @@ bind() const return std::make_shared(root->bind(), leftImpl->bind(), rightImpl->bind(), + leftRaw->bind(), + rightRaw->bind(), condition, joinQualification); } @@ -1109,12 +1121,18 @@ JoinElement::EquiJoinExecutor:: EquiJoinExecutor(const Bound * parent, std::shared_ptr root, std::shared_ptr left, - std::shared_ptr right) + std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw) : parent(parent), root(std::move(root)), left(std::move(left)), - right(std::move(right)) + right(std::move(right)), + leftRaw(std::move(leftRaw)), + rightRaw(std::move(rightRaw)) { + side = 0; + doneYet = false; l = this->left->take(); r = this->right->take(); takeMoreInput(); @@ -1342,26 +1360,14 @@ takeColumn() //Transpose of joins should be avoided. //We need to build the full row list and column index before we can proceed - typedef std::map > SideRowIndex; - - SideRowIndex leftRowIndex; - SideRowIndex rightRowIndex; - - int side = 0; - - bool doneYet = false; - if (!doneYet) { while (1){ auto res = take(); if (!res) break; - ssize_t columnsOffset = res->values.size() -1; - ssize_t rowNameOffset = columnsOffset -1; + //TODO: This will not work in so many cases. Use the scope - Utf8String rowNameUtf8 = res->values.at(rowNameOffset).toUtf8String(); - RowName rowName = RowName::parse(rowNameUtf8); Utf8String leftNameUtf8 = ""; if (!res->values.at(0).empty()) //TODO: Always 0? @@ -1382,24 +1388,26 @@ takeColumn() rightNameUtf8 = res->values.at(i).toUtf8String(); RowName rightName = RowName::parse(rightNameUtf8); - //recordJoinRow(leftName, leftName, rightName, rightName); + RowName rowName = RowName::parse("[" + leftNameUtf8 + "]-[" + rightNameUtf8 + "]"); leftRowIndex[leftName].push_back(rowName); rightRowIndex[rightName].push_back(rowName); } + + doneYet = true; } std::shared_ptr columnResult; SideRowIndex* index = &leftRowIndex; if (side == 0){ - columnResult = this->left->takeColumn(); + columnResult = this->leftRaw->takeColumn(); if (!columnResult) ++side; } if (side == 1){ index = &rightRowIndex; - columnResult = this->right->takeColumn(); + columnResult = this->rightRaw->takeColumn(); if (!columnResult) ++side; } @@ -1409,27 +1417,21 @@ takeColumn() //For each row in the return column, filter out those that weren't joined ssize_t columnsOffset = columnResult->values.size() -1; - // ssize_t rowNameOffset = columnsOffset -1; - const ExpressionValue& rows = columnResult->values[columnsOffset]; - - //typedef std::vector > RowValue; - //RowValue values; - //typedef std::vector > StructValue; StructValue values; - //Date ts = rows.getEffectiveTimestamp(); - auto onRow = [&] (const PathElement & columnName, const ExpressionValue & val) { RowHash rowHash = RowName(columnName); // Does this row appear in the output? If not, nothing to // do with it auto it = index->find(rowHash); - if (it == index->end()) + if (it == index->end()) { return true; + } //copy the value + //but we need the actual rowname if (it->second.size() == 1) { values.emplace_back(it->second[0][0], std::move(val)); } @@ -1444,7 +1446,6 @@ takeColumn() }; rows.forEachColumn(onRow); - columnResult->values[columnsOffset] = std::move(values); return columnResult; @@ -1455,8 +1456,16 @@ JoinElement::EquiJoinExecutor:: restart() { //cerr << "**** equijoin restart" << endl; + side = 0; left->restart(); right->restart(); + leftRaw->restart(); + rightRaw->restart(); + + doneYet = false; + leftRowIndex.clear(); + rightRowIndex.clear(); + l = left->take(); r = right->take(); takeMoreInput(); @@ -1471,11 +1480,15 @@ JoinElement::Bound:: Bound(std::shared_ptr root, std::shared_ptr left, std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw, AnnotatedJoinCondition condition, JoinQualification joinQualification) : root_(std::move(root)), left_(std::move(left)), right_(std::move(right)), + leftRaw_(std::move(leftRaw)), + rightRaw_(std::move(rightRaw)), outputScope_(createOutputScope()), crossWhere_(condition.crossWhere->bind(*outputScope_)), condition_(std::move(condition)), @@ -1533,7 +1546,9 @@ start(const BoundParameters & getParam) const (this, root_->start(getParam), left_->start(getParam), - right_->start(getParam)); + right_->start(getParam), + leftRaw_->start(getParam), + rightRaw_->start(getParam)); default: throw HttpReturnException(400, "Can't execute that kind of join", @@ -1650,8 +1665,6 @@ FromElement(std::shared_ptr root_, UnboundEntities unbound = from->getUnbound(); //cerr << "unbound for from = " << jsonEncode(unbound) << endl; - cerr << from->getType() << endl; - if (from->getType() == "join") { std::shared_ptr join = std::dynamic_pointer_cast(from); @@ -2557,6 +2570,13 @@ take() return subpipeline_->takeColumn(); } +std::shared_ptr +DatasetFunctionElement::TransposeExecutor:: +takeColumn() +{ + return subpipeline_->take(); +} + void DatasetFunctionElement::TransposeExecutor:: restart() diff --git a/sql/execution_pipeline_impl.h b/sql/execution_pipeline_impl.h index 73c1df172..e690f9c18 100644 --- a/sql/execution_pipeline_impl.h +++ b/sql/execution_pipeline_impl.h @@ -9,6 +9,7 @@ #include "execution_pipeline.h" #include "join_utils.h" +#include "mldb/jml/utils/compact_vector.h" namespace Datacratic { namespace MLDB { @@ -325,6 +326,8 @@ struct JoinElement: public PipelineElement { std::shared_ptr leftImpl; std::shared_ptr rightImpl; + std::shared_ptr leftRaw; + std::shared_ptr rightRaw; struct Bound; @@ -361,13 +364,24 @@ struct JoinElement: public PipelineElement { EquiJoinExecutor(const Bound * parent, std::shared_ptr root, std::shared_ptr left, - std::shared_ptr right); + std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw); const Bound * parent; - std::shared_ptr root, left, right; + std::shared_ptr root, left, right, leftRaw, rightRaw; std::shared_ptr l,r; + int side; + + typedef std::map > SideRowIndex; + + SideRowIndex leftRowIndex; + SideRowIndex rightRowIndex; + + bool doneYet/* = false*/; + void takeMoreInput(); virtual std::shared_ptr take(); @@ -386,12 +400,16 @@ struct JoinElement: public PipelineElement { Bound(std::shared_ptr root, std::shared_ptr left, std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw, AnnotatedJoinCondition condition, JoinQualification joinQualification); std::shared_ptr root_; std::shared_ptr left_; std::shared_ptr right_; + std::shared_ptr leftRaw_; + std::shared_ptr rightRaw_; std::shared_ptr outputScope_; BoundSqlExpression crossWhere_; AnnotatedJoinCondition condition_; @@ -414,6 +432,7 @@ struct JoinElement: public PipelineElement { output context is the same as its input context. */ virtual std::shared_ptr outputScope() const; + }; std::shared_ptr @@ -846,6 +865,7 @@ struct DatasetFunctionElement : public PipelineElement { struct TransposeExecutor: public ElementExecutor { TransposeExecutor(std::shared_ptr subpipeline); virtual std::shared_ptr take(); + virtual std::shared_ptr takeColumn(); virtual void restart(); std::shared_ptr subpipeline_; diff --git a/testing/MLDB-1675-pipeline-dataset-function.py b/testing/MLDB-1675-pipeline-dataset-function.py index ce01f21a2..19693dbe7 100644 --- a/testing/MLDB-1675-pipeline-dataset-function.py +++ b/testing/MLDB-1675-pipeline-dataset-function.py @@ -18,7 +18,7 @@ def setUpClass(self): ds.commit() ds = mldb.create_dataset({ "id": "dataset2", "type": "sparse.mutable" }) - ds.record_row("row_a",[["x", "toy story", 0]]) + ds.record_row("row_a",[["x", "toy story", 0],["y", "123456", 0]]) ds.record_row("row_b",[["x", "terminator", 0]]) ds.commit() @@ -38,7 +38,6 @@ def test_transpose_dataset(self): self.assertEqual(res, expected) - @unittest.expectedFailure #not the expected rowname def test_transpose_subselect(self): res = mldb.put('/v1/functions/bop', { @@ -48,28 +47,18 @@ def test_transpose_subselect(self): } }) - mldb.log(res) - res = mldb.query("select bop({k : 'x'})") mldb.log(res) - expected = [["_rowName", "bop().result"],["blah","x"]] + expected = [["_rowName", "bop({k : 'x'}).result"],["result","x"]] self.assertEqual(res, expected) + + def test_transpose_join(self): - #joins executors dont support taking columns yet. Is there a way not to keep the whole transposed table in memory? - #one way or another we have to do the full join first to find all the rows - #then we either keep the rows or get them again - #except in the pipeline executor we dont have direct access to row, only iterative - - #in a join dataset, a column belongs to one or the other... - #can we take all columns from one dataset, then the other? - - #should we have a transpose-join-executor? + res = mldb.query("select * from transpose(dataset1 as t1 JOIN dataset2 as t2 on t1.x = t2.x)"); - #if no subselect is inside the transpose, its probably better to use the old pipeline... - @unittest.expectedFailure - def test_transpose_join(self): + mldb.log(res) res = mldb.put('/v1/functions/bop', { 'type': 'sql.query', @@ -78,12 +67,38 @@ def test_transpose_join(self): } }) + res = mldb.put('/v1/functions/bop2', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(dataset1 as t1 JOIN dataset2 as t2 on t1.x = t2.x) offset 2" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName","bop().[a]-[row_a]"],["result","toy story"]] + + res = mldb.query("select bop2()") mldb.log(res) + expected = [["_rowName","bop2().[a]-[row_a]"], ["result", "123456"]] + + self.assertEqual(res, expected) + + def test_transpose_transpose(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(transpose(dataset2))" + } + }) + res = mldb.query("select bop()") mldb.log(res) - expected = [["_rowName", "bop().result"],["blah","x"]] + expected = [["_rowName","bop().x"],["result","terminator"]] self.assertEqual(res, expected) From 6ad731aca2a1cb04604e6e4080b1fc4c78e3b759 Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Tue, 21 Jun 2016 15:15:35 -0400 Subject: [PATCH 06/12] MLDB-1675 share transpose logic between equi and cross pipeline join executor --- sql/execution_pipeline_impl.cc | 133 ++++++++++-------- sql/execution_pipeline_impl.h | 41 ++++-- .../MLDB-1675-pipeline-dataset-function.py | 21 +++ 3 files changed, 128 insertions(+), 67 deletions(-) diff --git a/sql/execution_pipeline_impl.cc b/sql/execution_pipeline_impl.cc index 012dd4bd2..a31fa59fe 100644 --- a/sql/execution_pipeline_impl.cc +++ b/sql/execution_pipeline_impl.cc @@ -988,11 +988,15 @@ JoinElement::CrossJoinExecutor:: CrossJoinExecutor(const Bound * parent, std::shared_ptr root, std::shared_ptr left, - std::shared_ptr right) + std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw) : parent(parent), root(std::move(root)), left(std::move(left)), - right(std::move(right)) + right(std::move(right)), + leftRaw(std::move(leftRaw)), + rightRaw(std::move(rightRaw)) { ExcAssert(parent && this->root && this->left && this->right); l = this->left->take(); @@ -1102,10 +1106,22 @@ take() } } +std::shared_ptr +JoinElement::CrossJoinExecutor:: +takeColumn() +{ + if (!transpose_) + transpose_ = make_shared(*this, leftRaw, rightRaw); + + return transpose_->take(); +} + void JoinElement::CrossJoinExecutor:: restart() { + transpose_.reset(); + left->restart(); right->restart(); l = left->take(); @@ -1131,8 +1147,6 @@ EquiJoinExecutor(const Bound * parent, leftRaw(std::move(leftRaw)), rightRaw(std::move(rightRaw)) { - side = 0; - doneYet = false; l = this->left->take(); r = this->right->take(); takeMoreInput(); @@ -1357,46 +1371,73 @@ std::shared_ptr JoinElement::EquiJoinExecutor:: takeColumn() { - //Transpose of joins should be avoided. - //We need to build the full row list and column index before we can proceed + if (!transpose_) + transpose_ = make_shared(*this, leftRaw, rightRaw); - if (!doneYet) { - while (1){ - auto res = take(); - if (!res) - break; + return transpose_->take(); +} - //TODO: This will not work in so many cases. Use the scope +void +JoinElement::EquiJoinExecutor:: +restart() +{ + left->restart(); + right->restart(); + leftRaw->restart(); + rightRaw->restart(); + transpose_.reset(); - Utf8String leftNameUtf8 = ""; - if (!res->values.at(0).empty()) //TODO: Always 0? - leftNameUtf8 = res->values.at(0).toUtf8String(); - size_t i = 2; - for (; i + 2 < res->values.size(); i+=2) { - if (i == 2) - leftNameUtf8 = "[" + leftNameUtf8 + "]"; - - leftNameUtf8 += res->values.at(0).empty() ? "-[]" : - "-[" + res->values.at(i).toUtf8String() + "]"; - } - - RowName leftName = RowName::parse(leftNameUtf8); - - Utf8String rightNameUtf8 = ""; - if (!res->values.at(i).empty()) - rightNameUtf8 = res->values.at(i).toUtf8String(); - RowName rightName = RowName::parse(rightNameUtf8); + l = left->take(); + r = right->take(); + takeMoreInput(); +} - RowName rowName = RowName::parse("[" + leftNameUtf8 + "]-[" + rightNameUtf8 + "]"); +JoinElement::JoinTransposeExecutor:: +JoinTransposeExecutor(ElementExecutor& joinExecutor, std::shared_ptr leftRaw, std::shared_ptr rightRaw) : +joinExecutor(joinExecutor), leftRaw(std::move(leftRaw)), rightRaw(std::move(rightRaw)) +{ + side = 0; - leftRowIndex[leftName].push_back(rowName); - rightRowIndex[rightName].push_back(rowName); + //Transpose of joins should be avoided. + //We need to build the full row list and column index before we can proceed + while (1){ + auto res = joinExecutor.take(); + if (!res) + break; + + //TODO: This will not work in so many cases. Use the scope + Utf8String leftNameUtf8 = ""; + if (!res->values.at(0).empty()) //TODO: Always 0? + leftNameUtf8 = res->values.at(0).toUtf8String(); + size_t i = 2; + for (; i + 2 < res->values.size(); i+=2) { + if (i == 2) + leftNameUtf8 = "[" + leftNameUtf8 + "]"; + + leftNameUtf8 += res->values.at(0).empty() ? "-[]" : + "-[" + res->values.at(i).toUtf8String() + "]"; } - doneYet = true; + RowName leftName = RowName::parse(leftNameUtf8); + + Utf8String rightNameUtf8 = ""; + if (!res->values.at(i).empty()) + rightNameUtf8 = res->values.at(i).toUtf8String(); + RowName rightName = RowName::parse(rightNameUtf8); + + RowName rowName = RowName::parse("[" + leftNameUtf8 + "]-[" + rightNameUtf8 + "]"); + + leftRowIndex[leftName].push_back(rowName); + rightRowIndex[rightName].push_back(rowName); } +} + +std::shared_ptr +JoinElement::JoinTransposeExecutor:: +take() +{ std::shared_ptr columnResult; SideRowIndex* index = &leftRowIndex; @@ -1451,26 +1492,6 @@ takeColumn() return columnResult; } -void -JoinElement::EquiJoinExecutor:: -restart() -{ - //cerr << "**** equijoin restart" << endl; - side = 0; - left->restart(); - right->restart(); - leftRaw->restart(); - rightRaw->restart(); - - doneYet = false; - leftRowIndex.clear(); - rightRowIndex.clear(); - - l = left->take(); - r = right->take(); - takeMoreInput(); -} - /*****************************************************************************/ /* BOUND JOIN EXECUTOR */ @@ -1539,7 +1560,9 @@ start(const BoundParameters & getParam) const (this, root_->start(getParam), left_->start(getParam), - right_->start(getParam)); + right_->start(getParam), + leftRaw_->start(getParam), + rightRaw_->start(getParam)); case AnnotatedJoinCondition::EQUIJOIN: return std::make_shared diff --git a/sql/execution_pipeline_impl.h b/sql/execution_pipeline_impl.h index e690f9c18..2b0cdcc76 100644 --- a/sql/execution_pipeline_impl.h +++ b/sql/execution_pipeline_impl.h @@ -330,6 +330,7 @@ struct JoinElement: public PipelineElement { std::shared_ptr rightRaw; struct Bound; + struct JoinTransposeExecutor; /** Execution runs over all left rows for each right row. The complexity is therefore O(left rows) * O(right rows). The canonical example of this @@ -339,15 +340,21 @@ struct JoinElement: public PipelineElement { CrossJoinExecutor(const Bound * parent, std::shared_ptr root, std::shared_ptr left, - std::shared_ptr right); + std::shared_ptr right, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw); const Bound * parent; - std::shared_ptr root, left, right; + std::shared_ptr root, left, right, leftRaw, rightRaw; std::shared_ptr l,r; + + std::shared_ptr transpose_; virtual std::shared_ptr take(); + virtual std::shared_ptr takeColumn(); + void restart(); }; @@ -369,18 +376,10 @@ struct JoinElement: public PipelineElement { std::shared_ptr rightRaw); const Bound * parent; - std::shared_ptr root, left, right, leftRaw, rightRaw; - + std::shared_ptr root, left, right, leftRaw, rightRaw; std::shared_ptr l,r; - int side; - - typedef std::map > SideRowIndex; - - SideRowIndex leftRowIndex; - SideRowIndex rightRowIndex; - - bool doneYet/* = false*/; + std::shared_ptr transpose_; void takeMoreInput(); @@ -435,6 +434,24 @@ struct JoinElement: public PipelineElement { }; + struct JoinTransposeExecutor { + + JoinTransposeExecutor(ElementExecutor& joinExecutor, std::shared_ptr leftRaw, std::shared_ptr rightRaw); + + std::shared_ptr take(); + + ElementExecutor& joinExecutor; + std::shared_ptr leftRaw; + std::shared_ptr rightRaw; + + int side; + + typedef std::map > SideRowIndex; + + SideRowIndex leftRowIndex; + SideRowIndex rightRowIndex; + }; + std::shared_ptr bind() const; }; diff --git a/testing/MLDB-1675-pipeline-dataset-function.py b/testing/MLDB-1675-pipeline-dataset-function.py index 19693dbe7..49bdb2058 100644 --- a/testing/MLDB-1675-pipeline-dataset-function.py +++ b/testing/MLDB-1675-pipeline-dataset-function.py @@ -86,6 +86,27 @@ def test_transpose_join(self): self.assertEqual(res, expected) + def test_transpose_join_cross(self): + + res = mldb.query("select * from transpose(dataset1 as t1 JOIN dataset2 as t2)"); + + mldb.log(res) + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(dataset1 as t1 JOIN dataset2 as t2)" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName", "bop().[a]-[row_a]", "bop().[a]-[row_b]"], + ["result", "toy story", "toy story"]] + + self.assertEqual(res, expected) + def test_transpose_transpose(self): res = mldb.put('/v1/functions/bop', { From 3603aed9513eec552666fef191a6d39b89d4e0ed Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Tue, 12 Jul 2016 10:11:41 -0400 Subject: [PATCH 07/12] MLDB-1675 implement dataset merge in pipeline executor --- sql/execution_pipeline_impl.cc | 286 ++++++++++++++---- sql/execution_pipeline_impl.h | 23 +- .../MLDB-1675-pipeline-dataset-function.py | 38 +++ 3 files changed, 285 insertions(+), 62 deletions(-) diff --git a/sql/execution_pipeline_impl.cc b/sql/execution_pipeline_impl.cc index a31fa59fe..19fefb160 100644 --- a/sql/execution_pipeline_impl.cc +++ b/sql/execution_pipeline_impl.cc @@ -82,74 +82,126 @@ doGetAllColumns(std::function keep, // << fieldOffset << endl; ExcAssertGreaterEqual(fieldOffset, 0); - std::vector columnsWithInfo; - std::map index; - for (auto & column: knownColumns) { + if (hasUnknownColumns) { + auto exec = [=] (const SqlRowScope & rowScope, const VariableFilter & filter) -> ExpressionValue + { + auto & row = rowScope.as(); - ColumnName outputName = keep(column.columnName); + const ExpressionValue & rowContents + = row.values.at(fieldOffset + ROW_CONTENTS); - if (outputName.empty() && !asName.empty()) { - // BAD SMELL - //try with the table alias - outputName = keep(PathElement(asName) + column.columnName); - } + RowValue result; - if (outputName.empty()) { - continue; - } + auto onColumn = [&] (const PathElement & columnName, + const ExpressionValue & value) + { + ColumnName outputName = keep(columnName); + if (outputName.empty() && !asName.empty()) { + //try with the table alias + outputName = keep(PathElement(asName) + columnName); + } + + if (!outputName.empty()) { + auto onAtom = [&] (const Path & columnName, + const Path & prefix, + CellValue atom, + Date ts) + { + result.emplace_back(prefix + columnName, + std::move(atom), ts); + return true; + }; + + // TODO: lots of optimizations possible here... + value.forEachAtom(onAtom, outputName); + } + + return true; + }; + + if (!rowContents.empty()) + rowContents.forEachColumn(onColumn); - KnownColumn out = column; - out.columnName = ColumnName(outputName); - columnsWithInfo.emplace_back(std::move(out)); - index[column.columnName] = ColumnName(outputName); + ExpressionValue val(std::move(result)); + return val.getFilteredDestructive(filter); + }; + + GetAllColumnsOutput result; + result.info = std::make_shared(columnsWithInfo, SCHEMA_OPEN); + result.exec = exec; + return result; } - - auto exec = [=] (const SqlRowScope & rowScope, const VariableFilter & filter) -> ExpressionValue - { - auto & row = rowScope.as(); + else { - const ExpressionValue & rowContents - = row.values.at(fieldOffset + ROW_CONTENTS); + std::map index; - RowValue result; + for (auto & column: knownColumns) { - auto onColumn = [&] (const PathElement & columnName, - const ExpressionValue & value) + ColumnName outputName = keep(column.columnName); + + if (outputName.empty() && !asName.empty()) { + // BAD SMELL + //try with the table alias + outputName = keep(PathElement(asName) + column.columnName); + } + + if (outputName.empty()) { + continue; + } + + KnownColumn out = column; + out.columnName = ColumnName(outputName); + columnsWithInfo.emplace_back(std::move(out)); + index[column.columnName] = ColumnName(outputName); + } + + auto exec = [=] (const SqlRowScope & rowScope, const VariableFilter & filter) -> ExpressionValue { - auto it = index.find(Path(columnName)); - if (it == index.end()) { - return true; - } + auto & row = rowScope.as(); - auto onAtom = [&] (const Path & columnName, - const Path & prefix, - CellValue atom, - Date ts) + const ExpressionValue & rowContents + = row.values.at(fieldOffset + ROW_CONTENTS); + + RowValue result; + + auto onColumn = [&] (const PathElement & columnName, + const ExpressionValue & value) { - result.emplace_back(prefix + columnName, - std::move(atom), ts); + auto it = index.find(Path(columnName)); + if (it == index.end()) { + return true; + } + + auto onAtom = [&] (const Path & columnName, + const Path & prefix, + CellValue atom, + Date ts) + { + result.emplace_back(prefix + columnName, + std::move(atom), ts); + return true; + }; + + // TODO: lots of optimizations possible here... + value.forEachAtom(onAtom, it->second); + return true; }; - // TODO: lots of optimizations possible here... - value.forEachAtom(onAtom, it->second); + if (!rowContents.empty()) + rowContents.forEachColumn(onColumn); - return true; + ExpressionValue val(std::move(result)); + return val.getFilteredDestructive(filter); }; - if (!rowContents.empty()) - rowContents.forEachColumn(onColumn); - - ExpressionValue val(std::move(result)); - return val.getFilteredDestructive(filter); - }; - - GetAllColumnsOutput result; - result.info = std::make_shared(columnsWithInfo, SCHEMA_CLOSED); - result.exec = exec; - return result; + GetAllColumnsOutput result; + result.info = std::make_shared(columnsWithInfo, SCHEMA_CLOSED); + result.exec = exec; + return result; + } } BoundFunction @@ -964,6 +1016,7 @@ JoinElement(std::shared_ptr root, ->where(constantWhere) ->from(right, boundRight, when, selectAll, rightCondition, condition.right.orderBy); + } std::shared_ptr @@ -2461,22 +2514,52 @@ DatasetFunctionElement:: DatasetFunctionElement(std::shared_ptr root, std::shared_ptr function, GetParamInfo getParamInfo) : source_(std::move(root)), function_(function) { - ExcAssert(function->args.size() == 1); + ExcAssert(function->args.size() <= 2); + ExcAssert(function->args.size() > 0); ExcAssert(source_); ExcAssert(getParamInfo); - pipeline = source_ + if (function_->functionName == "merge") { + ExcAssert(function->args.size() == 2); + + pipeline = source_ + ->params(getParamInfo) + ->from(function->args[0], WhenExpression::TRUE, + SelectExpression::STAR, SelectExpression::TRUE, + OrderByExpression(), getParamInfo) + ->select("rowName()") + ->sort(OrderByExpression::parse("rowName()")) + ->select("rowName()") + ->select("*"); + + pipelineRight = source_ + ->params(getParamInfo) + ->from(function->args[1], WhenExpression::TRUE, + SelectExpression::STAR, SelectExpression::TRUE, + OrderByExpression(), getParamInfo) + ->select("rowName()") + ->sort(OrderByExpression::parse("rowName()")) + ->select("rowName()") + ->select("*"); + } + else + { + pipeline = source_ ->params(getParamInfo) ->from(function->args[0], WhenExpression::TRUE, SelectExpression::STAR, SelectExpression::TRUE, OrderByExpression(), getParamInfo); + } } std::shared_ptr DatasetFunctionElement:: bind() const { - return std::make_shared(source_->bind(), pipeline->bind(), function_->getAs()); + if (pipelineRight) + return std::make_shared(source_->bind(), pipeline->bind(), pipelineRight->bind(), function_->getAs(), function_->functionName); + else + return std::make_shared(source_->bind(), pipeline->bind(), function_->getAs(), function_->functionName); } /*****************************************************************************/ @@ -2607,6 +2690,63 @@ restart() subpipeline_->restart(); } +/*****************************************************************************/ +/* DATASET FUNCTION ELEMENT MERGE EXECUTOR */ +/*****************************************************************************/ + +DatasetFunctionElement::MergeExecutor:: +MergeExecutor(std::shared_ptr subpipelineLeft, std::shared_ptr subpipelineRight) : + subpipelineLeft_(subpipelineLeft), subpipelineRight_(subpipelineRight) +{ + left = subpipelineLeft_->take(); + right = subpipelineRight_->take(); +} + +std::shared_ptr +DatasetFunctionElement::MergeExecutor:: +take() +{ + std::shared_ptr result; + while (left && right) { + auto& leftRowName = left->values[left->values.size() - 2]; + auto& rightRowName = right->values[right->values.size() - 2]; + + if (leftRowName < rightRowName) { + left = subpipelineLeft_->take(); + } + else if (leftRowName > rightRowName) { + right = subpipelineRight_->take(); + } + else{ + + RowValue row; + left->values[left->values.size() - 1].appendToRow(ColumnName(), row); + right->values[right->values.size() - 1].appendToRow(ColumnName(), row); + + result = left; + result->values.push_back(leftRowName); + result->values.push_back(ExpressionValue(row)); + + left = subpipelineLeft_->take(); + right = subpipelineRight_->take(); + + break; + } + } + + return result; +} + +void +DatasetFunctionElement::MergeExecutor:: +restart() +{ + subpipelineLeft_->restart(); + subpipelineRight_->restart(); + left = subpipelineLeft_->take(); + right = subpipelineRight_->take(); +} + /*****************************************************************************/ /* BOUND DATASET FUNCTION ELEMENT */ /*****************************************************************************/ @@ -2614,10 +2754,27 @@ restart() DatasetFunctionElement::Bound:: Bound(std::shared_ptr source, std::shared_ptr subpipeline, - const Utf8String& asName) + const Utf8String& asName, + const Utf8String& functionName) : source_(std::move(source)), subpipeline_(std::move(subpipeline)), asName_(asName), + functionName_(functionName), + outputScope_(createOuputScope()) +{ +} + +DatasetFunctionElement::Bound:: +Bound(std::shared_ptr source, + std::shared_ptr subpipeline, + std::shared_ptr subpipelineRight, + const Utf8String& asName, + const Utf8String& functionName) + : source_(std::move(source)), + subpipeline_(std::move(subpipeline)), + subpipelineRight_(std::move(subpipelineRight)), + asName_(asName), + functionName_(functionName), outputScope_(createOuputScope()) { } @@ -2626,22 +2783,31 @@ std::shared_ptr DatasetFunctionElement::Bound:: createOuputScope() { - //To get a complete scheme we would need all the row names from the sub pipeline. + //For Transpose to get a complete schema we would need all the row names from the sub pipeline. + //TODO: for Merge we should get the complete schema std::vector columns; - std::shared_ptr rowValueInfo = std::make_shared(columns, SCHEMA_OPEN); - auto tableScope = source_->outputScope() - ->tableScope(std::make_shared(subpipeline_->outputScope(), rowValueInfo, asName_)); - - return tableScope; + if (functionName_ == "transpose") + return source_->outputScope()->tableScope(std::make_shared(subpipeline_->outputScope(), rowValueInfo, asName_)); + else if (functionName_ == "merge") + return subpipeline_->outputScope()->tableScope(std::make_shared(rowValueInfo, asName_)); + else + throw HttpReturnException(400, "Unknown dataset function in pipeline executor", + "function", functionName_); } std::shared_ptr DatasetFunctionElement::Bound:: start(const BoundParameters & getParam) const { - return std::make_shared(subpipeline_->start(getParam)); + if (functionName_ == "transpose") + return std::make_shared(subpipeline_->start(getParam)); + else if (functionName_ == "merge") + return std::make_shared(subpipeline_->start(getParam), subpipelineRight_->start(getParam)); + else + throw HttpReturnException(400, "Unknown dataset function in pipeline executor", + "function", functionName_); } std::shared_ptr diff --git a/sql/execution_pipeline_impl.h b/sql/execution_pipeline_impl.h index 2b0cdcc76..49c32e757 100644 --- a/sql/execution_pipeline_impl.h +++ b/sql/execution_pipeline_impl.h @@ -877,7 +877,7 @@ struct DatasetFunctionElement : public PipelineElement { std::shared_ptr function_; std::shared_ptr pipeline; - std::shared_ptr subpipeline_; + std::shared_ptr pipelineRight; struct TransposeExecutor: public ElementExecutor { TransposeExecutor(std::shared_ptr subpipeline); @@ -888,12 +888,31 @@ struct DatasetFunctionElement : public PipelineElement { std::shared_ptr subpipeline_; }; + struct MergeExecutor: public ElementExecutor { + MergeExecutor(std::shared_ptr subpipelineLeft, std::shared_ptr subpipelineRight); + virtual std::shared_ptr take(); + virtual void restart(); + + std::shared_ptr subpipelineLeft_; + std::shared_ptr subpipelineRight_; + + std::shared_ptr left; + std::shared_ptr right; + }; + struct Bound: public BoundPipelineElement { - Bound(std::shared_ptr source, std::shared_ptr subpipeline, const Utf8String& asName); + Bound(std::shared_ptr source, std::shared_ptr subpipeline, const Utf8String& asName, const Utf8String& functionName); + Bound(std::shared_ptr source, + std::shared_ptr subpipeline, + std::shared_ptr subpipelineRight, + const Utf8String& asName, + const Utf8String& functionName); std::shared_ptr source_; std::shared_ptr subpipeline_; + std::shared_ptr subpipelineRight_; Utf8String asName_; + Utf8String functionName_; std::shared_ptr outputScope_; std::shared_ptr diff --git a/testing/MLDB-1675-pipeline-dataset-function.py b/testing/MLDB-1675-pipeline-dataset-function.py index 49bdb2058..1f31b8aaf 100644 --- a/testing/MLDB-1675-pipeline-dataset-function.py +++ b/testing/MLDB-1675-pipeline-dataset-function.py @@ -22,6 +22,11 @@ def setUpClass(self): ds.record_row("row_b",[["x", "terminator", 0]]) ds.commit() + ds = mldb.create_dataset({ "id": "dataset3", "type": "sparse.mutable" }) + ds.record_row("row_a",[["z", 0.1, 0]]) + ds.record_row("row_b",[["z", 0.2, 0]]) + ds.commit() + def test_transpose_dataset(self): res = mldb.put('/v1/functions/bop', { @@ -123,4 +128,37 @@ def test_transpose_transpose(self): self.assertEqual(res, expected) + def test_transpose_transpose(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from transpose(transpose(dataset2))" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [["_rowName","bop().x"],["result","terminator"]] + + self.assertEqual(res, expected) + + def test_merge_dataset(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from merge(dataset2, dataset3)" + } + }) + + res = mldb.query("select bop()") + mldb.log("merge") + mldb.log(res) + + expected = [["_rowName","bop().x","bop().y","bop().z"],["result","toy story","123456",0.10000000149011612]] + + self.assertEqual(res, expected) + mldb.run_tests() \ No newline at end of file From 11012549405399158cf870767c8a79e7c9d0af6f Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Tue, 12 Jul 2016 11:30:02 -0400 Subject: [PATCH 08/12] MLDB-1675 add merge subselect test including bound parameter --- .../MLDB-1675-pipeline-dataset-function.py | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/testing/MLDB-1675-pipeline-dataset-function.py b/testing/MLDB-1675-pipeline-dataset-function.py index 1f31b8aaf..b3f047d43 100644 --- a/testing/MLDB-1675-pipeline-dataset-function.py +++ b/testing/MLDB-1675-pipeline-dataset-function.py @@ -27,6 +27,10 @@ def setUpClass(self): ds.record_row("row_b",[["z", 0.2, 0]]) ds.commit() + ds = mldb.create_dataset({ "id": "dataset4", "type": "sparse.mutable" }) + ds.record_row("result",[["z", 0.1, 0]]) + ds.commit() + def test_transpose_dataset(self): res = mldb.put('/v1/functions/bop', { @@ -154,11 +158,26 @@ def test_merge_dataset(self): }) res = mldb.query("select bop()") - mldb.log("merge") mldb.log(res) expected = [["_rowName","bop().x","bop().y","bop().z"],["result","toy story","123456",0.10000000149011612]] self.assertEqual(res, expected) + def test_merge_subselect(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from merge(dataset4, (select $k as blah))" + } + }) + + res = mldb.query("select bop({k : 'x'})") + mldb.log(res) + + expected = [["_rowName","bop({k : 'x'}).blah","bop({k : 'x'}).z"],["result","x",0.10000000149011612]] + + self.assertEqual(res, expected) + mldb.run_tests() \ No newline at end of file From 28c7b291290744e2851593090e00524e33dd315e Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Tue, 12 Jul 2016 13:11:20 -0400 Subject: [PATCH 09/12] MLDB-1675 add row dataset test --- testing/MLDB-1675-pipeline-dataset-function.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/testing/MLDB-1675-pipeline-dataset-function.py b/testing/MLDB-1675-pipeline-dataset-function.py index b3f047d43..b52b9774f 100644 --- a/testing/MLDB-1675-pipeline-dataset-function.py +++ b/testing/MLDB-1675-pipeline-dataset-function.py @@ -180,4 +180,21 @@ def test_merge_subselect(self): self.assertEqual(res, expected) + def test_row_dataset(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "select * from row_dataset({x: 1, y:2, z: 'three'})" + } + }) + + res = mldb.query("select bop()") + mldb.log("here") + mldb.log(res) + + expected = [["_rowName","bop().column","bop().value"],["result","x",1]] + + self.assertEqual(res, expected) + mldb.run_tests() \ No newline at end of file From 48bd8a40ff1b64af410b911d1840f556e65570ec Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Tue, 12 Jul 2016 13:50:02 -0400 Subject: [PATCH 10/12] MLDB-1675 add expected failure for sample dataset in pipeline executor --- sql/execution_pipeline_impl.cc | 6 +++++- testing/MLDB-1675-pipeline-dataset-function.py | 18 +++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/sql/execution_pipeline_impl.cc b/sql/execution_pipeline_impl.cc index f1339ac01..653dd86a2 100644 --- a/sql/execution_pipeline_impl.cc +++ b/sql/execution_pipeline_impl.cc @@ -2526,7 +2526,10 @@ DatasetFunctionElement(std::shared_ptr root, std::shared_ptrfunctionName == "merge") { + if (function_->functionName == "sample") { + throw HttpReturnException(500, "Dataset sampling not implemented for sql.query or cross joins"); + } + else if (function_->functionName == "merge") { ExcAssert(function->args.size() == 2); pipeline = source_ @@ -2715,6 +2718,7 @@ take() { std::shared_ptr result; while (left && right) { + auto& leftRowName = left->values[left->values.size() - 2]; auto& rightRowName = right->values[right->values.size() - 2]; diff --git a/testing/MLDB-1675-pipeline-dataset-function.py b/testing/MLDB-1675-pipeline-dataset-function.py index b52b9774f..b8b96d891 100644 --- a/testing/MLDB-1675-pipeline-dataset-function.py +++ b/testing/MLDB-1675-pipeline-dataset-function.py @@ -190,11 +190,27 @@ def test_row_dataset(self): }) res = mldb.query("select bop()") - mldb.log("here") mldb.log(res) expected = [["_rowName","bop().column","bop().value"],["result","x",1]] self.assertEqual(res, expected) + @unittest.expectedFailure #not yet implemented + def test_sampled_dataset(self): + + res = mldb.put('/v1/functions/bop', { + 'type': 'sql.query', + 'params': { + 'query': "SELECT x.* FROM sample(dataset2, {rows: 1, withReplacement: FALSE}) AS x" + } + }) + + res = mldb.query("select bop()") + mldb.log(res) + + expected = [] # to be filled + + self.assertEqual(res, expected) + mldb.run_tests() \ No newline at end of file From cc6fc3a65c3a389d1c79478ab98ab436e42ab3f8 Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Mon, 18 Jul 2016 09:22:39 -0400 Subject: [PATCH 11/12] MLDB-1500 changes as per PR --- sql/execution_pipeline_impl.cc | 40 ++++++++++++++++++++++------------ sql/execution_pipeline_impl.h | 16 ++++++++++---- 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/sql/execution_pipeline_impl.cc b/sql/execution_pipeline_impl.cc index 653dd86a2..58358cd3a 100644 --- a/sql/execution_pipeline_impl.cc +++ b/sql/execution_pipeline_impl.cc @@ -89,8 +89,7 @@ doGetAllColumns(std::function keep, { auto & row = rowScope.as(); - const ExpressionValue & rowContents - = row.values.at(fieldOffset + ROW_CONTENTS); + const ExpressionValue & rowContents = row.values.at(fieldOffset + ROW_CONTENTS); RowValue result; @@ -114,7 +113,6 @@ doGetAllColumns(std::function keep, return true; }; - // TODO: lots of optimizations possible here... value.forEachAtom(onAtom, outputName); } @@ -184,7 +182,6 @@ doGetAllColumns(std::function keep, return true; }; - // TODO: lots of optimizations possible here... value.forEachAtom(onAtom, it->second); return true; @@ -1454,8 +1451,10 @@ restart() } JoinElement::JoinTransposeExecutor:: -JoinTransposeExecutor(ElementExecutor& joinExecutor, std::shared_ptr leftRaw, std::shared_ptr rightRaw) : -joinExecutor(joinExecutor), leftRaw(std::move(leftRaw)), rightRaw(std::move(rightRaw)) +JoinTransposeExecutor(ElementExecutor& joinExecutor, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw) + : joinExecutor(joinExecutor), leftRaw(std::move(leftRaw)), rightRaw(std::move(rightRaw)) { side = 0; @@ -1468,7 +1467,7 @@ joinExecutor(joinExecutor), leftRaw(std::move(leftRaw)), rightRaw(std::move(righ //TODO: This will not work in so many cases. Use the scope Utf8String leftNameUtf8 = ""; - if (!res->values.at(0).empty()) //TODO: Always 0? + if (!res->values.at(0).empty()) leftNameUtf8 = res->values.at(0).toUtf8String(); size_t i = 2; for (; i + 2 < res->values.size(); i+=2) { @@ -1779,7 +1778,8 @@ FromElement(std::shared_ptr root_, impl.reset(new SubSelectElement(root, subSelect->statement, orderBy, getParamInfo, from->getAs())); } else if (from->getType() == "datasetFunction") { - std::shared_ptr function = std::dynamic_pointer_cast(from); + std::shared_ptr function + = std::dynamic_pointer_cast(from); ExcAssert(this->root); ExcAssert(function); @@ -2518,7 +2518,9 @@ outputScope() const /*****************************************************************************/ DatasetFunctionElement:: -DatasetFunctionElement(std::shared_ptr root, std::shared_ptr function, GetParamInfo getParamInfo) +DatasetFunctionElement(std::shared_ptr root, + std::shared_ptr function, + GetParamInfo getParamInfo) : source_(std::move(root)), function_(function) { ExcAssert(function->args.size() <= 2); @@ -2567,9 +2569,16 @@ DatasetFunctionElement:: bind() const { if (pipelineRight) - return std::make_shared(source_->bind(), pipeline->bind(), pipelineRight->bind(), function_->getAs(), function_->functionName); + return std::make_shared(source_->bind(), + pipeline->bind(), + pipelineRight->bind(), + function_->getAs(), + function_->functionName); else - return std::make_shared(source_->bind(), pipeline->bind(), function_->getAs(), function_->functionName); + return std::make_shared(source_->bind(), + pipeline->bind(), + function_->getAs(), + function_->functionName); } /*****************************************************************************/ @@ -2577,7 +2586,9 @@ bind() const /*****************************************************************************/ TransposeLexicalScope:: -TransposeLexicalScope(std::shared_ptr inner, std::shared_ptr rowValueInfo, Utf8String asName) +TransposeLexicalScope(std::shared_ptr inner, + std::shared_ptr rowValueInfo, + Utf8String asName) : TableLexicalScope(rowValueInfo, asName), inner(inner) { @@ -2600,7 +2611,7 @@ doGetAllColumns(std::function keep, auto & row = rowScope.as(); const ExpressionValue & rowContents - = row.values.at(fieldOffset + ROW_CONTENTS); + = row.values.at(fieldOffset + ROW_CONTENTS); RowValue result; @@ -2800,7 +2811,8 @@ createOuputScope() std::shared_ptr rowValueInfo = std::make_shared(columns, SCHEMA_OPEN); if (functionName_ == "transpose") - return source_->outputScope()->tableScope(std::make_shared(subpipeline_->outputScope(), rowValueInfo, asName_)); + return source_->outputScope()->tableScope(std::make_shared(subpipeline_->outputScope(), + rowValueInfo, asName_)); else if (functionName_ == "merge") return subpipeline_->outputScope()->tableScope(std::make_shared(rowValueInfo, asName_)); else diff --git a/sql/execution_pipeline_impl.h b/sql/execution_pipeline_impl.h index 6f54ee906..804339f36 100644 --- a/sql/execution_pipeline_impl.h +++ b/sql/execution_pipeline_impl.h @@ -436,7 +436,9 @@ struct JoinElement: public PipelineElement { struct JoinTransposeExecutor { - JoinTransposeExecutor(ElementExecutor& joinExecutor, std::shared_ptr leftRaw, std::shared_ptr rightRaw); + JoinTransposeExecutor(ElementExecutor& joinExecutor, + std::shared_ptr leftRaw, + std::shared_ptr rightRaw); std::shared_ptr take(); @@ -847,7 +849,9 @@ struct ParamsElement: public PipelineElement { struct TransposeLexicalScope: public TableLexicalScope { - TransposeLexicalScope(std::shared_ptr inner, std::shared_ptr rowInfo, Utf8String asName_); + TransposeLexicalScope(std::shared_ptr inner, + std::shared_ptr rowInfo, + Utf8String asName_); std::shared_ptr inner; @@ -889,7 +893,8 @@ struct DatasetFunctionElement : public PipelineElement { }; struct MergeExecutor: public ElementExecutor { - MergeExecutor(std::shared_ptr subpipelineLeft, std::shared_ptr subpipelineRight); + MergeExecutor(std::shared_ptr subpipelineLeft, + std::shared_ptr subpipelineRight); virtual std::shared_ptr take(); virtual void restart(); @@ -901,7 +906,10 @@ struct DatasetFunctionElement : public PipelineElement { }; struct Bound: public BoundPipelineElement { - Bound(std::shared_ptr source, std::shared_ptr subpipeline, const Utf8String& asName, const Utf8String& functionName); + Bound(std::shared_ptr source, + std::shared_ptr subpipeline, + const Utf8String& asName, + const Utf8String& functionName); Bound(std::shared_ptr source, std::shared_ptr subpipeline, std::shared_ptr subpipelineRight, From eac8c4986704577cf4cdaa1fee52862dda030c24 Mon Sep 17 00:00:00 2001 From: Mathieu Bolduc Date: Mon, 18 Jul 2016 10:13:23 -0400 Subject: [PATCH 12/12] MLDB-1675 fixes tests as per PR --- testing/MLDB-1500-transpose-query.js | 10 ++++++-- .../MLDB-1675-pipeline-dataset-function.py | 24 +++++++++---------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/testing/MLDB-1500-transpose-query.js b/testing/MLDB-1500-transpose-query.js index d7f64c5c7..dc0e75224 100644 --- a/testing/MLDB-1500-transpose-query.js +++ b/testing/MLDB-1500-transpose-query.js @@ -193,11 +193,17 @@ res = mldb.put('/v1/functions/bop3', { } }) -mldb.log(res) +expected = [ + [ "_rowName", "bop3({x : 'a'}).result" ], + [ "result", "a" ] +]; -res = mldb.get('/v1/query', { q: "select bop3({x : 'a'})" }); +res = mldb.get('/v1/query', { q: "select bop3({x : 'a'})", format: 'table' }); mldb.log(res.json) +assertEqual(mldb.diff(expected, res.json, false /* strict */), {}, + "output was not the same as expected output in pipeline executor"); + "success" diff --git a/testing/MLDB-1675-pipeline-dataset-function.py b/testing/MLDB-1675-pipeline-dataset-function.py index b8b96d891..8de377fe1 100644 --- a/testing/MLDB-1675-pipeline-dataset-function.py +++ b/testing/MLDB-1675-pipeline-dataset-function.py @@ -45,7 +45,7 @@ def test_transpose_dataset(self): expected = [["_rowName","bop().row_a","bop().row_b"],["result","toy story","terminator"]] - self.assertEqual(res, expected) + self.assertTableResultEquals(res, expected) def test_transpose_subselect(self): @@ -61,7 +61,7 @@ def test_transpose_subselect(self): expected = [["_rowName", "bop({k : 'x'}).result"],["result","x"]] - self.assertEqual(res, expected) + self.assertTableResultEquals(res, expected) def test_transpose_join(self): @@ -93,7 +93,7 @@ def test_transpose_join(self): expected = [["_rowName","bop2().[a]-[row_a]"], ["result", "123456"]] - self.assertEqual(res, expected) + self.assertTableResultEquals(res, expected) def test_transpose_join_cross(self): @@ -114,7 +114,7 @@ def test_transpose_join_cross(self): expected = [["_rowName", "bop().[a]-[row_a]", "bop().[a]-[row_b]"], ["result", "toy story", "toy story"]] - self.assertEqual(res, expected) + self.assertTableResultEquals(res, expected) def test_transpose_transpose(self): @@ -130,7 +130,7 @@ def test_transpose_transpose(self): expected = [["_rowName","bop().x"],["result","terminator"]] - self.assertEqual(res, expected) + self.assertTableResultEquals(res, expected) def test_transpose_transpose(self): @@ -146,7 +146,7 @@ def test_transpose_transpose(self): expected = [["_rowName","bop().x"],["result","terminator"]] - self.assertEqual(res, expected) + self.assertTableResultEquals(res, expected) def test_merge_dataset(self): @@ -160,9 +160,9 @@ def test_merge_dataset(self): res = mldb.query("select bop()") mldb.log(res) - expected = [["_rowName","bop().x","bop().y","bop().z"],["result","toy story","123456",0.10000000149011612]] + expected = [["_rowName","bop().x","bop().y","bop().z"],["result","toy story","123456",0.1]] - self.assertEqual(res, expected) + self.assertTableResultEquals(res, expected) def test_merge_subselect(self): @@ -176,9 +176,9 @@ def test_merge_subselect(self): res = mldb.query("select bop({k : 'x'})") mldb.log(res) - expected = [["_rowName","bop({k : 'x'}).blah","bop({k : 'x'}).z"],["result","x",0.10000000149011612]] + expected = [["_rowName","bop({k : 'x'}).blah","bop({k : 'x'}).z"],["result","x",0.1]] - self.assertEqual(res, expected) + self.assertTableResultEquals(res, expected) def test_row_dataset(self): @@ -194,7 +194,7 @@ def test_row_dataset(self): expected = [["_rowName","bop().column","bop().value"],["result","x",1]] - self.assertEqual(res, expected) + self.assertTableResultEquals(res, expected) @unittest.expectedFailure #not yet implemented def test_sampled_dataset(self): @@ -211,6 +211,6 @@ def test_sampled_dataset(self): expected = [] # to be filled - self.assertEqual(res, expected) + self.assertTableResultEquals(res, expected) mldb.run_tests() \ No newline at end of file