Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add DelRange API to go-client #2199

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go-client/pegasus/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
OpSortKeyCount
OpIncr
OpBatchGet
OpDelRange
)

var opTypeToStringMap = map[OpType]string{
Expand All @@ -78,6 +79,7 @@ var opTypeToStringMap = map[OpType]string{
OpSortKeyCount: "SORTKEY_COUNT",
OpIncr: "INCR",
OpBatchGet: "BATCH_GET",
OpDelRange: "DEL_RANGE",
}

func (op OpType) String() string {
Expand Down
3 changes: 3 additions & 0 deletions go-client/pegasus/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,7 @@ func TestReturnCorrectErrorCode(t *testing.T) {

_, err = tb.GetUnorderedScanners(context.Background(), 0, nil)
assert.Equal(t, err.(*PError).Op, OpGetUnorderedScanners)

err = tb.DelRange(context.Background(), nil, nil, nil)
assert.Equal(t, err.(*PError).Op, OpDelRange)
}
6 changes: 5 additions & 1 deletion go-client/pegasus/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ const (
batchUnknownError = -4 // rpc succeed, but operation encounter some unknown error in server side
)

const (
defaultScannerBatchSize = 1000
)

// Scanner defines the interface of client-side scanning.
type Scanner interface {
// Grabs the next entry.
Expand Down Expand Up @@ -83,7 +87,7 @@ type pegasusScanner struct {
// NewScanOptions returns the default ScannerOptions.
func NewScanOptions() *ScannerOptions {
return &ScannerOptions{
BatchSize: 1000,
BatchSize: defaultScannerBatchSize,
StartInclusive: true,
StopInclusive: false,
HashKeyFilter: Filter{Type: FilterTypeNoFilter, Pattern: nil},
Expand Down
104 changes: 104 additions & 0 deletions go-client/pegasus/table_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ var DefaultMultiGetOptions = &MultiGetOptions{
NoValue: false,
}

// DelRangeOptions is the options for DelRange, defaults to DefaultDelRangeOptions.
type DelRangeOptions struct {
nextSortKey []byte
StartInclusive bool
StopInclusive bool
SortKeyFilter Filter
}

// DefaultDelRangeOptions defines the defaults of DelRangeOptions.
var DefaultDelRangeOptions = &DelRangeOptions{
nextSortKey: nil,
StartInclusive: true,
StopInclusive: false,
SortKeyFilter: Filter{
Type: FilterTypeNoFilter,
Pattern: nil,
},
}

// TableConnector is used to communicate with single Pegasus table.
type TableConnector interface {
// Get retrieves the entry for `hashKey` + `sortKey`.
Expand Down Expand Up @@ -143,6 +162,16 @@ type TableConnector interface {
// `sortKeys[i]` : CAN'T be nil but CAN be empty.
MultiDel(ctx context.Context, hashKey []byte, sortKeys [][]byte) error

// DelRange /DelRangeOpt deletes the multiple entries under `hashKey`, between range (`startSortKey`, `stopSortKey`),
// atomically in one operation.
// DelRange is identical to DelRangeOpt except that the former uses DefaultDelRangeOptions as `options`.
//
// startSortKey: nil or len(startSortKey) == 0 means to start from the first entry in the sorted key range.
// stopSortKey: nil or len(stopSortKey) == 0 means to stop at the last entry in the sorted key range.
// `hashKey` : CAN'T be nil or empty.
DelRange(ctx context.Context, hashKey []byte, startSortKey []byte, stopSortKey []byte) error
DelRangeOpt(ctx context.Context, hashKey []byte, startSortKey []byte, stopSortKey []byte, options *DelRangeOptions) error

// Returns ttl(time-to-live) in seconds: -1 if ttl is not set; -2 if entry doesn't exist.
// `hashKey` : CAN'T be nil or empty.
// `sortKey` : CAN'T be nil but CAN be empty.
Expand Down Expand Up @@ -449,6 +478,81 @@ func (p *pegasusTableConnector) MultiDel(ctx context.Context, hashKey []byte, so
return err
}

func (p *pegasusTableConnector) DelRange(ctx context.Context, hashKey []byte, startSortKey []byte, stopSortKey []byte) error {
return p.DelRangeOpt(ctx, hashKey, startSortKey, stopSortKey, DefaultDelRangeOptions)
}

func (p *pegasusTableConnector) DelRangeOpt(ctx context.Context, hashKey []byte, startSortKey []byte, stopSortKey []byte, options *DelRangeOptions) error {
err := func() error {
scannerOptions := ScannerOptions{
BatchSize: defaultScannerBatchSize,
StartInclusive: options.StartInclusive,
StopInclusive: options.StopInclusive,
HashKeyFilter: Filter{Type: FilterTypeNoFilter, Pattern: nil},
SortKeyFilter: options.SortKeyFilter,
NoValue: true,
}

if startSortKey != nil {
options.nextSortKey = make([]byte, len(startSortKey))
copy(options.nextSortKey, startSortKey)
} else {
options.nextSortKey = nil
}

scanner, err := p.GetScanner(context.Background(), hashKey, startSortKey, stopSortKey, &scannerOptions)
if err != nil {
switch {
case errors.Is(err, context.DeadlineExceeded):
return fmt.Errorf("Getting pegasusScanner takes too long time when delete hashKey: %s, startSortKey: %v, stopSortKey: %v", hashKey, startSortKey, stopSortKey)
default:
return err
}
}
defer scanner.Close()

var sortKeys [][]byte
index := 0
for {
completed, _, s, _, err := scanner.Next(ctx)
if err != nil {
return err
}
if completed {
break
}
sortKeys = append(sortKeys, s)
if len(sortKeys) >= scannerOptions.BatchSize {
options.nextSortKey = sortKeys[0]
if err := p.MultiDel(ctx, hashKey, sortKeys); err != nil {
switch {
case errors.Is(err, context.DeadlineExceeded):
return fmt.Errorf("DelRange of hashKey: %s from sortKey: %s[index: %d] timeout", hashKey, options.nextSortKey, index)
default:
return fmt.Errorf("DelRange of hashKey: %s from sortKey: %s[index: %d] failed", hashKey, options.nextSortKey, index)
}
}
sortKeys = nil
index += scannerOptions.BatchSize
}
}

if len(sortKeys) > 0 {
if err := p.MultiDel(ctx, hashKey, sortKeys); err != nil {
switch {
case errors.Is(err, context.DeadlineExceeded):
return fmt.Errorf("DelRange of hashKey: %s from sortKey: %s[index: %d] timeout", hashKey, sortKeys[0], index)
default:
return fmt.Errorf("DelRange of hashKey: %s from sortKey: %s[index: %d] failed", hashKey, sortKeys[0], index)
}
}
options.nextSortKey = nil
}
return nil
}()
return WrapError(err, OpDelRange)
}

// -2 means entry not found.
func (p *pegasusTableConnector) TTL(ctx context.Context, hashKey []byte, sortKey []byte) (int, error) {
res, err := p.runPartitionOp(ctx, hashKey, &op.TTL{HashKey: hashKey, SortKey: sortKey}, OpTTL)
Expand Down
87 changes: 87 additions & 0 deletions go-client/pegasus/table_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ func TestPegasusTableConnector_EmptyInput(t *testing.T) {
assert.Contains(t, err.Error(), "sortkey must not be nil")
_, err = tb.TTL(context.Background(), []byte("h1"), []byte(""))
assert.Nil(t, err)

// DelRange
err = tb.DelRange(context.Background(), nil, nil, nil)
assert.Contains(t, err.Error(), "hashkey must not be nil")
err = tb.DelRangeOpt(context.Background(), []byte{}, nil, nil, &DelRangeOptions{})
assert.Contains(t, err.Error(), "hashkey must not be empty")
}

func TestPegasusTableConnector_TriggerSelfUpdate(t *testing.T) {
Expand Down Expand Up @@ -657,6 +663,87 @@ func testMultiKeyOperations(t *testing.T, tb TableConnector) {
assert.Error(t, tb.MultiSetOpt(context.Background(), hashKey, sortKeys, values, -1*time.Second))
}

func TestPegasusTableConnector_DelRange(t *testing.T) {
defer leaktest.Check(t)()

client := NewClient(testingCfg)
defer client.Close()

tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()

testDelRangeOperations(t, tb)
}

func testDelRangeOperations(t *testing.T, tb TableConnector) {
hashKey := []byte("h1")

sortKeys := make([][]byte, 10)
values := make([][]byte, 10)
for i := 0; i < 10; i++ {
// make sortKeys sorted.
sidBuf := []byte(fmt.Sprintf("%d", i))
var sidWithLeadingZero bytes.Buffer
for k := 0; k < 20-len(sidBuf); k++ {
sidWithLeadingZero.WriteByte('0')
}
sidWithLeadingZero.Write(sidBuf)
sortKeys[i] = sidWithLeadingZero.Bytes()
values[i] = []byte(fmt.Sprintf("v%d", i))
}

// delete non-existent sortKey should be ok
err := tb.DelRange(context.Background(), hashKey, sortKeys[0], sortKeys[9])
assert.NoError(t, err)

// setup
err = tb.MultiSet(context.Background(), hashKey, sortKeys, values)
assert.NoError(t, err)

// read after deletion
err = tb.DelRange(context.Background(), hashKey, sortKeys[5], sortKeys[6])
assert.NoError(t, err)
count, err := tb.SortKeyCount(context.Background(), hashKey)
assert.NoError(t, err)
assert.Equal(t, int64(9), count) // 0,1,2,3,4,5,7,8,9

// DelRange with "*Inclusive" option
err = tb.DelRangeOpt(context.Background(), hashKey, sortKeys[2], sortKeys[6],
&DelRangeOptions{StartInclusive: true, StopInclusive: true})
assert.NoError(t, err)
count, err = tb.SortKeyCount(context.Background(), hashKey)
assert.NoError(t, err)
assert.Equal(t, int64(5), count) // 0,1,7,8,9

err = tb.DelRangeOpt(context.Background(), hashKey, sortKeys[6], sortKeys[8],
&DelRangeOptions{StartInclusive: false, StopInclusive: false})
assert.NoError(t, err)
count, err = tb.SortKeyCount(context.Background(), hashKey)
assert.NoError(t, err)
assert.Equal(t, int64(4), count) // 0,1,8,9

// DelRange with FilterTypeMatchPostfix option
err = tb.DelRangeOpt(context.Background(), hashKey, sortKeys[0], sortKeys[9], &DelRangeOptions{SortKeyFilter: Filter{
Type: FilterTypeMatchPostfix,
Pattern: []byte("8"),
}})
assert.NoError(t, err)
count, err = tb.SortKeyCount(context.Background(), hashKey)
assert.NoError(t, err)
assert.Equal(t, int64(3), count) // 0,1,9

// ensure passing nil to startSortKey and stopSortKey in DelRange deletes all entries
err = tb.MultiSet(context.Background(), hashKey, sortKeys, values)
assert.NoError(t, err)
err = tb.DelRange(context.Background(), hashKey, nil, nil)
assert.NoError(t, err)
count, err = tb.SortKeyCount(context.Background(), hashKey)
assert.NoError(t, err)
assert.Equal(t, int64(0), count)

}

func TestPegasusTableConnector_CheckAndSet(t *testing.T) {
defer leaktest.Check(t)()

Expand Down
Loading