Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datasource/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (m *SqlDriverMessageMap) Values() []driver.Value { return m.Vals }
func (m *SqlDriverMessageMap) SetRow(row []driver.Value) { m.Vals = row }
func (m *SqlDriverMessageMap) Ts() time.Time { return time.Time{} }
func (m *SqlDriverMessageMap) Get(key string) (value.Value, bool) {
key = strings.ToLower(key)
if idx, ok := m.ColIndex[key]; ok {
return value.NewValue(m.Vals[idx]), true
}
Expand Down Expand Up @@ -229,6 +230,9 @@ func NewNestedContextReadWriter(readers []expr.ContextReader, writer expr.Contex

func (n *NestedContextReader) Get(key string) (value.Value, bool) {
for _, r := range n.readers {
if r == nil {
continue
}
val, ok := r.Get(key)
if ok && val != nil {
return val, ok
Expand Down
1 change: 1 addition & 0 deletions datasource/datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (m *TimeValue) Time() time.Time {

func (m *TimeValue) Scan(src interface{}) error {

u.Debugf("time %T %v", src, src)
var t time.Time
var dstr string
switch val := src.(type) {
Expand Down
1 change: 1 addition & 0 deletions datasource/files/filesource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestFileList(t *testing.T) {
{"testjson"},
},
)
return
testutil.TestSqlSelect(t, "testcsvs", `show tables;`,
[][]driver.Value{
{"appearances"},
Expand Down
1 change: 1 addition & 0 deletions datasource/mockcsv/mockcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (m *Source) loadTable(tableName string) error {
return fmt.Errorf("No csv-source created for %q", tableName)
}
ds := membtree.NewStaticData(tableName)
//u.Infof("loaded columns table=%q cols=%v", tableName, csvSource.Columns())
ds.SetColumns(csvSource.Columns())
m.tables[tableName] = ds

Expand Down
1 change: 1 addition & 0 deletions datasource/schemadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (m *SchemaDb) Open(schemaObjectName string) (schema.Conn, error) {
case "engines", "procedures", "functions", "indexes":
return &SchemaSource{db: m, tbl: tbl, rows: nil}, nil
default:
u.Warnf("here")
return &SchemaSource{db: m, tbl: tbl, rows: tbl.AsRows()}, nil
}

Expand Down
10 changes: 5 additions & 5 deletions datasource/sqlite/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,11 @@ func (m *qryconn) WalkSourceSelect(planner plan.Planner, p *plan.Source) (plan.T

sqlSelect := p.Stmt.Source
u.Infof("original %s", sqlSelect.String())
p.Stmt.Source = nil
p.Stmt.Rewrite(sqlSelect)
sqlSelect = p.Stmt.Source
u.Infof("original after From(source) rewrite %s", sqlSelect.String())
sqlSelect.RewriteAsRawSelect()
//p.Stmt.Source = nil
//p.Stmt.Rewrite(sqlSelect)
//sqlSelect = p.Stmt.Source
//u.Infof("original after From(source) rewrite %s", sqlSelect.String())
//sqlSelect.RewriteAsRawSelect()

m.cols = sqlSelect.Columns.UnAliasedFieldNames()
m.colidx = sqlSelect.ColIndexes()
Expand Down
6 changes: 3 additions & 3 deletions exec/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (m *JoinMerge) Run() error {
//u.Debugf("msgsct: %v msgs:%#v", len(msgs), msgs)
for _, msg := range msgs {
//outCh <- datasource.NewUrlValuesMsg(i, msg)
//u.Debugf("i:%d msg:%#v", i, msg)
u.Warnf("i:%d msg:%#v", i, msg)
msg.IdVal = i
i++
outCh <- msg
Expand Down Expand Up @@ -289,8 +289,8 @@ func (m *JoinMerge) valIndexing(valOut, valSource []driver.Value, cols []*rel.Co
if col.Index < 0 || col.Index >= len(valSource) {
u.Errorf("source index out of range? idx:%v of %d source: %#v \n\tcol=%#v", col.Index, len(valSource), valSource, col)
}
//u.Infof("found: si=%v pi:%v idx:%d as=%v vals:%v len(out):%v", col.SourceIndex, col.ParentIndex, col.Index, col.As, valSource, len(valOut))
valOut[col.ParentIndex] = valSource[col.Index]
//u.Infof("found: si=%v pi:%v idx:%d as=%v val:%v len(out):%v", col.SourceIndex, col.ParentIndex, col.Index, col.As, valSource[col.Index], len(valOut))
valOut[col.ParentIndex] = valSource[col.SourceIndex]
}
return valOut
}
48 changes: 38 additions & 10 deletions exec/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,21 @@ func (m *Projection) projectionEvaluator(isFinal bool) MessageHandler {
colCt := len(columns)
// If we have a projection, use that as col count
if m.p.Proj != nil {
colCt = len(m.p.Proj.Columns)
if len(m.p.Proj.Columns) > colCt {
colCt = len(m.p.Proj.Columns)
} else if len(m.p.Proj.Columns) != colCt {
u.Warnf("wtf less? %v vs %v", colCt, len(m.p.Proj.Columns))
}

if len(m.p.Proj.Columns) == 0 {
u.Errorf("crap %+v", m.p.Proj)
}
for i, col := range m.p.Proj.Columns {
u.Debugf("%d %#v", i, col)
}
for i, col := range columns {
u.Debugf("%d %#v", i, col)
}
}

rowCt := 0
Expand All @@ -139,17 +153,27 @@ func (m *Projection) projectionEvaluator(isFinal bool) MessageHandler {
var outMsg schema.Message
switch mt := msg.(type) {
case *datasource.SqlDriverMessageMap:
var rdr expr.ContextReader
// use our custom write context for example purposes
row := make([]driver.Value, colCt)
rdr := datasource.NewNestedContextReader([]expr.ContextReader{
mt,
ctx.Session,
}, mt.Ts())
//u.Debugf("about to project: %#v", mt)
if ctx.Session == nil {
rdr = mt
} else {
rdr = datasource.NewNestedContextReader([]expr.ContextReader{
mt,
ctx.Session,
}, mt.Ts())
}

u.Debugf("about to project: colCt:%d message:%#v", colCt, mt)
colIdx := -1
for _, col := range columns {
colIdx += 1
//u.Debugf("%d colidx:%v sidx: %v pidx:%v key:%q Expr:%v", colIdx, col.Index, col.SourceIndex, col.ParentIndex, col.Key(), col.Expr)
u.Debugf("%d colidx:%v sidx: %v pidx:%v star=%v key:%q Expr:%v", colIdx, col.Index, col.SourceIndex, col.ParentIndex, col.Star, col.Key(), col.Expr)
if len(row) <= colIdx {
row = append(row, nil)
u.Warnf("wtf wrong count %v %v", colIdx, len(row))
}

if isFinal && col.ParentIndex < 0 {
continue
Expand All @@ -175,6 +199,9 @@ func (m *Projection) projectionEvaluator(isFinal bool) MessageHandler {
}
if col.Star {
starRow := mt.Values()
if colCt != len(starRow) {
u.Warnf("wtf wrong count %v %v", colCt, len(starRow))
}
//u.Infof("star row: %#v", starRow)
if len(columns) > 1 {
// select *, myvar, 1
Expand Down Expand Up @@ -217,14 +244,15 @@ func (m *Projection) projectionEvaluator(isFinal bool) MessageHandler {
//u.Infof("mt: %T mt %#v", mt, mt)
row[colIdx] = nil //v.Value()
} else {
//u.Debugf("%d:%d row:%d evaled: %v val=%v", colIdx, colCt, len(row), col, v.Value())
u.Debugf("%d:%d row:%d evaled: %v val=%v", colIdx, colCt, len(row), col, v.Value())
//writeContext.Put(col, mt, v)
row[colIdx] = v.Value()

}
}
}
//u.Infof("row: %#v", row)
//u.Infof("row cols: %v", colIndex)
u.Infof("row: %#v", row)
u.Infof("row cols: %v", colIndex)
outMsg = datasource.NewSqlDriverMessageMap(0, row, colIndex)

case expr.ContextReader:
Expand Down
1 change: 1 addition & 0 deletions exec/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (m *Source) Run() error {

for item := m.Scanner.Next(); item != nil; item = m.Scanner.Next() {

u.Debugf("source msg %#v", item)
select {
case <-sigChan:
return nil
Expand Down
8 changes: 4 additions & 4 deletions exec/where.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewHaving(ctx *plan.Context, p *plan.Having) *Where {
func whereFilter(filter expr.Node, task TaskRunner, cols map[string]int) MessageHandler {
out := task.MessageOut()

//u.Debugf("prepare filter %s", filter)
u.Debugf("WHERE prepare filter %s", filter)
return func(ctx *plan.Context, msg schema.Message) bool {

var filterValue value.Value
Expand All @@ -102,7 +102,7 @@ func whereFilter(filter expr.Node, task TaskRunner, cols map[string]int) Message
case *datasource.SqlDriverMessageMap:
filterValue, ok = vm.Eval(mt, filter)
if !ok {
u.Warnf("wtf %s %#v", filter, mt)
//u.Warnf("wtf %s %#v", filter, mt)
}
//u.Debugf("WHERE: result:%v T:%T \n\trow:%#v \n\tvals:%#v", filterValue, msg, mt, mt.Values())
//u.Debugf("cols: %#v", cols)
Expand All @@ -125,7 +125,7 @@ func whereFilter(filter expr.Node, task TaskRunner, cols map[string]int) Message
switch valTyped := filterValue.(type) {
case value.BoolValue:
if valTyped.Val() == false {
//u.Debugf("Filtering out: T:%T v:%#v", valTyped, valTyped)
u.Debugf("Filtering out: T:%T v:%#v \n\t%#v", valTyped, valTyped, msg)
return true
}
case nil:
Expand All @@ -136,7 +136,7 @@ func whereFilter(filter expr.Node, task TaskRunner, cols map[string]int) Message
}
}

//u.Debugf("about to send from where to forward: %#v", msg)
u.Debugf("about to send from where to forward: %#v", msg)
select {
case out <- msg:
return true
Expand Down
32 changes: 26 additions & 6 deletions plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func (m *Source) serializeToPb() error {
return nil
}
func (m *Source) load() error {
// u.Debugf("source load schema=%s from=%s %#v", m.ctx.Schema.Name, m.Stmt.SourceName(), m.Stmt)
u.Debugf("source load schema=%s from=%s %#v", m.ctx.Schema.Name, m.Stmt.SourceName(), m.Stmt)
if m.Stmt == nil {
return nil
}
Expand Down Expand Up @@ -821,6 +821,9 @@ func (m *Projection) ToPb() (*PlanPb, error) {
if err != nil {
return nil, err
}
if m.Proj == nil {
u.WarnT(10)
}
ppbptr := m.Proj.ToPB()
ppcpy := *ppbptr
ppcpy.Final = m.Final
Expand Down Expand Up @@ -864,15 +867,32 @@ func NewJoinMerge(l, r Task, lf, rf *rel.SqlSource) *JoinMerge {
// Build an index of source to destination column indexing
for _, col := range lf.Source.Columns {
//u.Debugf("left col: idx=%d key=%q as=%q col=%v parentidx=%v", len(m.colIndex), col.Key(), col.As, col.String(), col.ParentIndex)
m.ColIndex[lf.Alias+"."+col.Key()] = col.ParentIndex
//u.Debugf("left colIndex: %15q : idx:%d sidx:%d pidx:%d", m.leftStmt.Alias+"."+col.Key(), col.Index, col.SourceIndex, col.ParentIndex)
if col.ParentIndex >= 0 {
m.ColIndex[lf.Alias+"."+col.Key()] = col.ParentIndex
}
u.Debugf("left colIndex: %15q : idx:%d sidx:%d pidx:%d", lf.Alias+"."+col.Key(), col.Index, col.SourceIndex, col.ParentIndex)
}
for _, col := range rf.Source.Columns {
//u.Debugf("right col: idx=%d key=%q as=%q col=%v", len(m.colIndex), col.Key(), col.As, col.String())
m.ColIndex[rf.Alias+"."+col.Key()] = col.ParentIndex
//u.Debugf("right colIndex: %15q : idx:%d sidx:%d pidx:%d", m.rightStmt.Alias+"."+col.Key(), col.Index, col.SourceIndex, col.ParentIndex)
if col.ParentIndex >= 0 {
m.ColIndex[rf.Alias+"."+col.Key()] = col.ParentIndex
}
u.Debugf("right colIndex: %15q : idx:%d sidx:%d pidx:%d", rf.Alias+"."+col.Key(), col.Index, col.SourceIndex, col.ParentIndex)
}
for _, col := range lf.Source.Columns {
//u.Debugf("left col: idx=%d key=%q as=%q col=%v parentidx=%v", len(m.colIndex), col.Key(), col.As, col.String(), col.ParentIndex)
if col.ParentIndex < 0 {
m.ColIndex[lf.Alias+"."+col.Key()] = len(m.ColIndex)
}
u.Debugf("left colIndex: %15q : idx:%d sidx:%d pidx:%d", lf.Alias+"."+col.Key(), col.Index, col.SourceIndex, len(m.ColIndex)-1)
}
for _, col := range rf.Source.Columns {
//u.Debugf("right col: idx=%d key=%q as=%q col=%v", len(m.colIndex), col.Key(), col.As, col.String())
if col.ParentIndex < 0 {
m.ColIndex[rf.Alias+"."+col.Key()] = len(m.ColIndex)
}
u.Debugf("right colIndex: %15q : idx:%d sidx:%d pidx:%d", rf.Alias+"."+col.Key(), col.Index, col.SourceIndex, len(m.ColIndex)-1)
}

return m
}

Expand Down
Loading