Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/migrate_from_faiss.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (r *MigrateFromFaissCmd) Run(globals *Globals) error {
if err != nil {
return fmt.Errorf("failed to connect to Qdrant target: %w", err)
}
defer targetClient.Close()

err = commons.PrepareOffsetsCollection(ctx, r.Migration.OffsetsCollection, targetClient)
if err != nil {
Expand Down Expand Up @@ -151,7 +152,7 @@ func (r *MigrateFromFaissCmd) getFaissTotal(ctx context.Context) (int, error) {
}
var total int
_, err = fmt.Sscanf(string(totalOut), "%d", &total)
if err != nil || total <= 0 {
if err != nil || total < 0 {
return 0, fmt.Errorf("invalid total returned from FAISS index: %s", string(totalOut))
}
return total, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/migrate_from_mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (r *MigrateFromMongoDBCmd) migrateData(ctx context.Context, sourceClient *m
return fmt.Errorf("failed to get start offset: %w", err)
}
offsetCount = count
page = uint64(offsetCount / batchSize)
page = offsetCount / batchSize
}

bar, _ := pterm.DefaultProgressbar.WithTotal(int(sourcePointCount)).Start()
Expand Down
28 changes: 21 additions & 7 deletions cmd/migrate_from_opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,14 @@ func (r *MigrateFromOpenSearchCmd) extractVectorFields(mapping map[string]any) (
if spaceType, ok := fieldProps["space_type"].(string); ok {
if mappedDistance, exists := distanceMapping[strings.ToLower(spaceType)]; exists {
distance = mappedDistance
} else {
distance = qdrant.Distance_Cosine
pterm.Warning.Printfln("Unsupported space type '%s' for field '%s', defaulting to cosine distance", spaceType, fieldName)
}
}

if distance == qdrant.Distance_UnknownDistance {
distance = qdrant.Distance_Cosine
pterm.Warning.Printfln("Unsupported space type for field '%s', defaulting to cosine distance", fieldName)
}

vectorParamsMap[fieldName] = &qdrant.VectorParams{
Size: uint64(dimension),
Distance: distance,
Expand Down Expand Up @@ -290,9 +292,18 @@ func (r *MigrateFromOpenSearchCmd) migrateData(ctx context.Context, sourceClient

var targetPoints []*qdrant.PointStruct
for _, hit := range hits {
doc := hit.(map[string]any)
source := doc["_source"].(map[string]any)
docID := doc["_id"].(string)
doc, ok := hit.(map[string]any)
if !ok {
return fmt.Errorf("invalid hit format: expected map[string]any, got %T", hit)
}
source, ok := doc["_source"].(map[string]any)
if !ok {
return fmt.Errorf("invalid _source format: expected map[string]any, got %T", doc["_source"])
}
docID, ok := doc["_id"].(string)
if !ok {
return fmt.Errorf("invalid _id format: expected string, got %T", doc["_id"])
}

point := &qdrant.PointStruct{}
vectors := make(map[string]*qdrant.Vector)
Expand Down Expand Up @@ -330,7 +341,10 @@ func (r *MigrateFromOpenSearchCmd) migrateData(ctx context.Context, sourceClient

offsetCount += uint64(len(targetPoints))

lastDoc := hits[len(hits)-1].(map[string]any)
lastDoc, ok := hits[len(hits)-1].(map[string]any)
if !ok {
return fmt.Errorf("invalid last hit format: expected map[string]any, got %T", hits[len(hits)-1])
}
lastSortValue = lastDoc["_id"]

var offsetID *qdrant.PointId
Expand Down
7 changes: 6 additions & 1 deletion cmd/migrate_from_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (r *MigrateFromRedisCmd) migrateData(ctx context.Context, rdb *redis.Client
return nil
}

// Ref: https://redis.io/docs/latest/develop/clients/go/vecsearch/#add-a-helper-function
func bytesToFloats(b []byte) []float32 {
if len(b)%4 != 0 {
log.Printf("Warning: byte slice length %d is not a multiple of 4, truncating", len(b))
Expand All @@ -236,7 +237,11 @@ func parseFieldValue(attrType string, val string) interface{} {
// redis.SearchFieldTypeVector is handled
// before invoking this function.
if attrType == redis.SearchFieldTypeNumeric.String() {
f, _ := strconv.ParseFloat(val, 64)
f, err := strconv.ParseFloat(val, 64)
if err != nil {
log.Printf("Warning: failed to parse numeric value '%s': %v", val, err)
return val
}
return f
}
return val
Expand Down
2 changes: 1 addition & 1 deletion pkg/commons/offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func GetStartOffset(ctx context.Context, migrationOffsetsCollectionName string,

offsetCountValue, ok := offsetCount.GetKind().(*qdrant.Value_IntegerValue)
if !ok {
return nil, 0, fmt.Errorf("failed to get offset count: %w", err)
return nil, 0, fmt.Errorf("failed to get offset count: invalid type")
}

offsetIntegerValue, ok := offset.GetKind().(*qdrant.Value_IntegerValue)
Expand Down
Loading