Skip to content
9 changes: 7 additions & 2 deletions datasource/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
)

var (
// SourceTypeCsv the data-source type for this source
SourceTypeCsv = "csv"

// Ensure we meet desired interfaces
_ schema.Source = (*CsvDataSource)(nil)
_ schema.Conn = (*CsvDataSource)(nil)
_ schema.ConnScanner = (*CsvDataSource)(nil)
Expand Down Expand Up @@ -93,12 +97,13 @@ func NewCsvSource(table string, indexCol int, ior io.Reader, exit <-chan bool) (
m.colindex[key] = i
m.headers[i] = key
}
m.loadTable()
m.defineTable()
//u.Infof("csv headers: %v colIndex: %v", headers, m.colindex)
return &m, nil
}

func (m *CsvDataSource) Init() {}
func (m *CsvDataSource) Type() string { return SourceTypeCsv }
func (m *CsvDataSource) Setup(*schema.Schema) error { return nil }
func (m *CsvDataSource) Tables() []string { return []string{m.table} }
func (m *CsvDataSource) Columns() []string { return m.headers }
Expand All @@ -108,7 +113,7 @@ func (m *CsvDataSource) Table(tableName string) (*schema.Table, error) {
}
return nil, schema.ErrNotFound
}
func (m *CsvDataSource) loadTable() error {
func (m *CsvDataSource) defineTable() error {
tbl := schema.NewTable(strings.ToLower(m.table))
columns := m.Columns()
for i := range columns {
Expand Down
2 changes: 1 addition & 1 deletion datasource/datatypes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type dtData struct {
func TestDataTypes(t *testing.T) {

// Load in a "csv file" into our mock data store
mockcsv.LoadTable(td.MockSchema.Name, "typestest", `user_id,categories,json_obj,json_cats,t1
mockcsv.CreateCsvTable(td.MockSchema.Name, "typestest", `user_id,categories,json_obj,json_cats,t1
9Ip1aKbeZe2njCDM,"sports,politics,worldnews","{""name"":""bob""}","[""sports"",""politics"",""worldnews""]","2014-01-01"`)

data := dtData{}
Expand Down
4 changes: 3 additions & 1 deletion datasource/files/filesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func NewFileSource() *FileSource {

func (m *FileSource) Init() {}

func (m *FileSource) Type() string { return SourceType }

// Setup the filesource with schema info
func (m *FileSource) Setup(ss *schema.Schema) error {
m.ss = ss
Expand Down Expand Up @@ -134,7 +136,7 @@ func (m *FileSource) init() error {

// u.Debugf("File init %v", string(m.ss.Conf.Settings.PrettyJson()))

conf := m.ss.Conf.Settings
conf := u.NewJsonHelperMapString(m.ss.Conf.Settings)
if tablePath := conf.String("path"); tablePath != "" {
m.path = tablePath
}
Expand Down
10 changes: 5 additions & 5 deletions datasource/files/filesource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ func (m *testSource) Setup(s *schema.Schema) error {
fileStore = os.Getenv("FILESTORE")
}

settings := u.JsonHelper(map[string]interface{}{
settings := map[string]string{
"path": "baseball",
"filetype": "csv",
"type": fileStore,
})
}
s.Conf = &schema.ConfigSource{
Name: "testcsvs",
SourceType: "testcsvs",
Settings: settings,
Name: "testcsvs",
Type: "testcsvs",
Settings: settings,
}
return m.FileSource.Setup(s)
}
Expand Down
6 changes: 3 additions & 3 deletions datasource/files/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func FileStoreLoader(ss *schema.Schema) (cloudstorage.StoreReader, error) {
}

//u.Debugf("json conf:\n%s", ss.Conf.Settings.PrettyJson())
storeType := ss.Conf.Settings.String("type")
storeType := ss.Conf.Settings["type"]
if storeType == "" {
return nil, fmt.Errorf("Expected 'type' in File Store definition conf")
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func RegisterFileStore(storeType string, fs FileStoreCreator) {

func createGCSFileStore(ss *schema.Schema) (FileStore, error) {

conf := ss.Conf.Settings
conf := u.NewJsonHelperMapString(ss.Conf.Settings)

c := gcsConfig
if proj := conf.String("project"); proj != "" {
Expand All @@ -122,7 +122,7 @@ func createGCSFileStore(ss *schema.Schema) (FileStore, error) {

func createLocalFileStore(ss *schema.Schema) (FileStore, error) {

conf := ss.Conf.Settings
conf := u.NewJsonHelperMapString(ss.Conf.Settings)

localPath := conf.String("localpath")
if localPath == "" {
Expand Down
10 changes: 5 additions & 5 deletions datasource/files/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ func (m *jsonTestSource) Setup(ss *schema.Schema) error {
if os.Getenv("FILESTORE") != "" {
fileStore = os.Getenv("FILESTORE")
}
settings := u.JsonHelper(map[string]interface{}{
settings := map[string]string{
"path": "github",
"filetype": "json",
"format": "github_json",
"type": fileStore,
})
}
ss.Conf = &schema.ConfigSource{
Name: "testjson",
SourceType: "testjson",
Settings: settings,
Name: "testjson",
Type: "testjson",
Settings: settings,
}
return m.FileSource.Setup(ss)
}
Expand Down
15 changes: 11 additions & 4 deletions datasource/files/storesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ var (
// Connection Interfaces
_ schema.Conn = (*storeSource)(nil)
_ schema.ConnScanner = (*storeSource)(nil)

// StoreType
storeSourceType = "filestore_source"
)

// storeSource DataSource for reading lists of files/names/metadata of files
Expand Down Expand Up @@ -50,6 +53,7 @@ func newStoreSource(table string, fs *FileSource) (*storeSource, error) {
}

func (m *storeSource) Init() {}
func (m *storeSource) Type() string { return storeSourceType }
func (m *storeSource) Setup(*schema.Schema) error { return nil }
func (m *storeSource) Tables() []string { return []string{m.table} }
func (m *storeSource) Columns() []string { return m.f.fdbcols }
Expand All @@ -58,21 +62,24 @@ func (m *storeSource) Table(tableName string) (*schema.Table, error) {
// u.Debugf("Table(%q), tbl nil?%v", tableName, m.tbl == nil)
if m.tbl != nil {
return m.tbl, nil
} else {
m.loadTable()
}

if err := m.tableColumnExpansion(); err != nil {
return nil, err
}

if m.tbl != nil {
return m.tbl, nil
}
return nil, schema.ErrNotFound
}
func (m *storeSource) loadTable() error {
func (m *storeSource) tableColumnExpansion() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.tbl != nil {
return nil
}
// u.Debugf("storeSource.loadTable(%q)", m.table)
// u.Debugf("storeSource.tableColumnExpansion(%q)", m.table)
tbl := schema.NewTable(strings.ToLower(m.table))
columns := m.Columns()
for i := range columns {
Expand Down
72 changes: 46 additions & 26 deletions datasource/introspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,42 @@ func IntrospectTable(tbl *schema.Table, iter schema.Iterator) error {

k := nameIndex[i]
_, exists := tbl.FieldMap[k]
if exists {
//u.Warnf("skipping because exists %s.%s", tbl.Name, k)
// The flaw here is we only look at one value per field(k)
// We really should do deeper inspection at more than one value.
continue
}

//u.Debugf("i:%v k:%s v: %T %v", i, k, v, v)
switch val := v.(type) {
case int, int64, int16, int32, uint16, uint64, uint32:
tbl.AddFieldType(k, value.IntType)
tbl.AddField(schema.NewFieldBase(k, value.IntType, 64, ""))
case time.Time, *time.Time:
tbl.AddFieldType(k, value.TimeType)
tbl.AddField(schema.NewFieldBase(k, value.TimeType, 64, ""))
case bool:
tbl.AddFieldType(k, value.BoolType)
tbl.AddField(schema.NewFieldBase(k, value.BoolType, 1, ""))
case float32, float64:
tbl.AddFieldType(k, value.NumberType)
tbl.AddField(schema.NewFieldBase(k, value.NumberType, 64, ""))
case string:
valType := value.ValueTypeFromStringAll(val)
if !exists {
tbl.AddFieldType(k, valType)
//fld := tbl.FieldMap[k]
//u.Debugf("add field? %+v", fld)
//u.Debugf("%s = %v type: %T vt:%s new? %v", k, val, val, valType, !exists)
switch valType {
case value.NumberType, value.IntType, value.TimeType:
tbl.AddField(schema.NewFieldBase(k, valType, 64, ""))
case value.BoolType:
tbl.AddField(schema.NewFieldBase(k, valType, 1, ""))
case value.StringType:
tbl.AddField(schema.NewFieldBase(k, valType, 255, ""))
default:
tbl.AddField(schema.NewFieldBase(k, valType, 2000, ""))
}
case map[string]interface{}:
tbl.AddFieldType(k, value.JsonType)
tbl.AddField(schema.NewFieldBase(k, value.JsonType, 2000, ""))
case []interface{}:
tbl.AddField(schema.NewFieldBase(k, value.JsonType, 2000, ""))
default:
u.Debugf("not implemented: %T", val)
tbl.AddField(schema.NewFieldBase(k, value.JsonType, 2000, ""))
}
}
case *SqlDriverMessageMap:
Expand All @@ -97,34 +110,41 @@ func IntrospectTable(tbl *schema.Table, iter schema.Iterator) error {
// }

_, exists := tbl.FieldMap[k]
if exists {
//u.Warnf("skipping because exists %s.%s", tbl.Name, k)
// The flaw here is we only look at one value per field(k)
// We really should do deeper inspection at more than one value.
continue
}

//u.Debugf("i:%v k:%s v: %T %v", i, k, v, v)
//u.Debugf("%p %s i:%v k:%s v: %T %v", tbl, tbl.Name, i, k, v, v)
switch val := v.(type) {
case int, int64, int16, int32, uint16, uint64, uint32:
tbl.AddFieldType(k, value.IntType)
tbl.AddField(schema.NewFieldBase(k, value.IntType, 64, ""))
case time.Time, *time.Time:
tbl.AddFieldType(k, value.TimeType)
tbl.AddField(schema.NewFieldBase(k, value.TimeType, 64, ""))
case bool:
tbl.AddFieldType(k, value.BoolType)
tbl.AddField(schema.NewFieldBase(k, value.BoolType, 1, ""))
case float32, float64, json.Number:
tbl.AddFieldType(k, value.NumberType)
tbl.AddField(schema.NewFieldBase(k, value.NumberType, 64, ""))
case string:
valType := value.ValueTypeFromStringAll(val)
if !exists {
tbl.AddFieldType(k, valType)
//fld := tbl.FieldMap[k]
//u.Debugf("add field? %+v", fld)
//u.Debugf("%s = %v type: %T vt:%s new? %v", k, val, val, valType, !exists)
switch valType {
case value.NumberType, value.IntType, value.TimeType:
tbl.AddField(schema.NewFieldBase(k, valType, 64, ""))
case value.BoolType:
tbl.AddField(schema.NewFieldBase(k, valType, 1, ""))
case value.StringType:
tbl.AddField(schema.NewFieldBase(k, valType, 255, ""))
default:
tbl.AddField(schema.NewFieldBase(k, valType, 2000, ""))
}
case map[string]interface{}:
tbl.AddFieldType(k, value.JsonType)
tbl.AddField(schema.NewFieldBase(k, value.JsonType, 2000, ""))
case []interface{}:
tbl.AddFieldType(k, value.JsonType)
case nil:
// hm.....
tbl.AddFieldType(k, value.JsonType)
tbl.AddField(schema.NewFieldBase(k, value.JsonType, 2000, ""))
default:
tbl.AddFieldType(k, value.JsonType)
tbl.AddField(schema.NewFieldBase(k, value.JsonType, 2000, ""))
u.LogThrottle(u.WARN, 10, "not implemented: k:%v %T", k, val)
}
}
Expand Down
9 changes: 7 additions & 2 deletions datasource/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
)

var (
// SourceTypeJson the data-source type for this source
SourceTypeJson = "json"

// Ensure we meet interfaces
_ schema.Source = (*JsonSource)(nil)
_ schema.Conn = (*JsonSource)(nil)
_ schema.ConnScanner = (*JsonSource)(nil)
Expand Down Expand Up @@ -85,11 +89,12 @@ func NewJsonSource(table string, rc io.ReadCloser, exit <-chan bool, lh FileLine
js.lh = js.jsonDefaultLine
}

//m.loadTable()
//m.defineTable()
return js, nil
}

func (m *JsonSource) Init() {}
func (m *JsonSource) Type() string { return SourceTypeJson }
func (m *JsonSource) Setup(*schema.Schema) error { return nil }
func (m *JsonSource) Tables() []string { return []string{m.table} }
func (m *JsonSource) Columns() []string { return m.columns }
Expand All @@ -100,7 +105,7 @@ func (m *JsonSource) Table(tableName string) (*schema.Table, error) {
}
return nil, schema.ErrNotFound
}
func (m *JsonSource) loadTable() error {
func (m *JsonSource) defineTable() error {
tbl := schema.NewTable(strings.ToLower(m.table))
columns := m.Columns()
for i := range columns {
Expand Down
1 change: 1 addition & 0 deletions datasource/membtree/btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func NewStaticData(name string) *StaticDataSource {
}

func (m *StaticDataSource) Init() {}
func (m *StaticDataSource) Type() string { return sourceType }
func (m *StaticDataSource) Setup(*schema.Schema) error { return nil }
func (m *StaticDataSource) Open(connInfo string) (schema.Conn, error) { return m, nil }
func (m *StaticDataSource) Table(table string) (*schema.Table, error) { return m.tbl, nil }
Expand Down
7 changes: 6 additions & 1 deletion datasource/memdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ func NewMemDbData(name string, data [][]driver.Value, cols []string) (*MemDb, er
}
// Insert initial values
conn := newDbConn(m)
defer conn.Close()
for _, row := range data {
conn.Put(nil, nil, row)
}
conn.Close()

// we are going to look at ~10 rows to create schema for it
conn = newDbConn(m)
defer conn.Close()
if err = datasource.IntrospectTable(m.tbl, conn); err != nil {
u.Errorf("Could not introspect schema %v", err)
return nil, err
Expand Down Expand Up @@ -100,6 +102,9 @@ func NewMemDbForSchema(name string, cols []string) (*MemDb, error) {
// Init initilize this db
func (m *MemDb) Init() {}

// Type the source-type.
func (m *MemDb) Type() string { return sourceType }

// Setup this db with parent schema.
func (m *MemDb) Setup(*schema.Schema) error { return nil }

Expand Down
13 changes: 9 additions & 4 deletions datasource/mockcsv/mockcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
const (
// SchemaName is "mockcsv"
SchemaName = "mockcsv"
SourceType = "mockcsv"
)

var (
Expand Down Expand Up @@ -44,10 +45,13 @@ func Schema() *schema.Schema {
return sch
}

// LoadTable MockCsv is used for mocking so has a global data source we can load data into
func LoadTable(schemaName, name, csvRaw string) {
// CreateCsvTable MockCsv is used for mocking so has a global data source we can load data into
func CreateCsvTable(schemaName, name, csvRaw string) {
CsvGlobal.CreateTable(name, csvRaw)
schema.DefaultRegistry().SchemaRefresh(SchemaName)
s := Schema()
if err := s.Discovery(); err != nil {
panic(err.Error())
}
}

// Source DataSource for testing creates an in memory b-tree per "table".
Expand All @@ -74,7 +78,8 @@ func New() *Source {
}

// Init no-op meets interface
func (m *Source) Init() {}
func (m *Source) Init() {}
func (m *Source) Type() string { return SourceType }

// Setup accept schema
func (m *Source) Setup(s *schema.Schema) error {
Expand Down
Loading