Skip to content

Commit 634b96f

Browse files
committed
update ut
1 parent 9f57daa commit 634b96f

File tree

4 files changed

+222
-5
lines changed

4 files changed

+222
-5
lines changed

pkg/iscp/iteration.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ func ExecuteIteration(
345345
return nil
346346
}
347347

348-
func FlushJobStatusOnIterationState(
348+
var FlushJobStatusOnIterationState = func(
349349
ctx context.Context,
350350
cnUUID string,
351351
cnEngine engine.Engine,
@@ -435,7 +435,7 @@ func FlushStatus(
435435
return
436436
}
437437

438-
func GetJobSpecs(
438+
var GetJobSpecs = func(
439439
ctx context.Context,
440440
cnUUID string,
441441
txn client.TxnOperator,
@@ -495,3 +495,14 @@ func (status *JobStatus) SetError(err error) {
495495
func (status *JobStatus) PermanentlyFailed() bool {
496496
return status.ErrorCode >= PermanentErrorThreshold
497497
}
498+
499+
func NewIterationContext(accountID uint32, tableID uint64, jobNames []string, jobIDs []uint64, fromTS types.TS, toTS types.TS) *IterationContext {
500+
return &IterationContext{
501+
accountID: accountID,
502+
tableID: tableID,
503+
jobNames: jobNames,
504+
jobIDs: jobIDs,
505+
fromTS: fromTS,
506+
toTS: toTS,
507+
}
508+
}

pkg/vm/engine/disttae/change_handle.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ func (h *CheckpointChangesHandle) Next(
174174
return
175175
}
176176
func (h *CheckpointChangesHandle) Close() error {
177-
h.reader.Close()
177+
if h.reader != nil {
178+
h.reader.Close()
179+
}
178180
return nil
179181
}
180182
func (h *CheckpointChangesHandle) initReader(ctx context.Context) (err error) {

pkg/vm/engine/tae/rpc/handle_debug.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -484,13 +484,15 @@ func (h *Handle) HandleGetChangedTableList(
484484
resp.Newest = &tt
485485
}()
486486

487+
now:=h.db.TxnMgr.MaxCommittedTS.Load()
488+
487489
if len(req.TableIds) == 0 && len(req.TS) == 0 {
488-
to = types.BuildTS(time.Now().UnixNano(), 0)
490+
to = *now
489491
return nil, nil
490492
}
491493

492494
if req.Type == cmd_util.CheckChanged {
493-
to = types.BuildTS(time.Now().UnixNano(), 0)
495+
to = *now
494496
minFrom := slices.MinFunc(req.TS, func(a, b *timestamp.Timestamp) int {
495497
return a.Compare(*b)
496498
})

pkg/vm/engine/test/change_handle_test.go

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ package test
1717
import (
1818
"context"
1919
"fmt"
20+
"sync"
2021
"testing"
2122
"time"
2223

24+
"github.com/prashantv/gostub"
2325
"github.com/stretchr/testify/assert"
2426
"github.com/stretchr/testify/require"
2527

@@ -3195,3 +3197,203 @@ func TestDropJobsByDBName(t *testing.T) {
31953197
_, ok = cdcExecutor.GetWatermark(accountId, tableID2, "job2")
31963198
assert.False(t, ok)
31973199
}
3200+
3201+
func TestCancelIteration1(t *testing.T) {
3202+
catalog.SetupDefines("")
3203+
3204+
// idAllocator := common.NewIdAllocator(1000)
3205+
3206+
var (
3207+
accountId = catalog.System_Account
3208+
)
3209+
3210+
ctx, cancel := context.WithCancel(context.Background())
3211+
defer cancel()
3212+
ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId)
3213+
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Minute*5)
3214+
3215+
disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t)
3216+
defer func() {
3217+
disttaeEngine.Close(ctx)
3218+
taeHandler.Close(true)
3219+
rpcAgent.Close()
3220+
}()
3221+
3222+
cancelCh := make(chan struct{})
3223+
3224+
stub := gostub.Stub(
3225+
&iscp.GetJobSpecs,
3226+
func(
3227+
context.Context,
3228+
string,
3229+
client.TxnOperator,
3230+
uint32,
3231+
uint64,
3232+
[]string,
3233+
[]uint64,
3234+
) (jobSpec []*iscp.JobSpec, err error) {
3235+
cancelCh <- struct{}{}
3236+
<-cancelCh
3237+
return []*iscp.JobSpec{
3238+
{
3239+
ConsumerInfo: iscp.ConsumerInfo{
3240+
ConsumerType: int8(iscp.ConsumerType_CNConsumer),
3241+
SrcTable: iscp.TableInfo{
3242+
DBName: "srcdb",
3243+
TableName: "src_table",
3244+
},
3245+
},
3246+
},
3247+
}, nil
3248+
},
3249+
)
3250+
defer stub.Reset()
3251+
3252+
err := mock_mo_indexes(disttaeEngine, ctxWithTimeout)
3253+
require.NoError(t, err)
3254+
err = mock_mo_foreign_keys(disttaeEngine, ctxWithTimeout)
3255+
require.NoError(t, err)
3256+
err = mock_mo_intra_system_change_propagation_log(disttaeEngine, ctxWithTimeout)
3257+
require.NoError(t, err)
3258+
bat := CreateDBAndTableForCNConsumerAndGetAppendData(t, disttaeEngine, ctxWithTimeout, "srcdb", "src_table", 10)
3259+
defer bat.Close()
3260+
3261+
_, rel, txn, err := disttaeEngine.GetTable(ctxWithTimeout, "srcdb", "src_table")
3262+
require.Nil(t, err)
3263+
3264+
tableID := rel.GetTableID(ctxWithTimeout)
3265+
3266+
txn.Commit(ctxWithTimeout)
3267+
3268+
wg := sync.WaitGroup{}
3269+
wg.Add(1)
3270+
go func() {
3271+
defer wg.Done()
3272+
err = iscp.ExecuteIteration(
3273+
ctxWithTimeout,
3274+
"",
3275+
disttaeEngine.Engine,
3276+
disttaeEngine.GetTxnClient(),
3277+
iscp.NewIterationContext(accountId, tableID, []string{"job1"}, []uint64{1}, types.TS{}, types.TS{}),
3278+
common.DebugAllocator,
3279+
)
3280+
assert.Error(t, err)
3281+
}()
3282+
<-cancelCh
3283+
cancel()
3284+
close(cancelCh)
3285+
wg.Wait()
3286+
}
3287+
3288+
func TestCancelIteration2(t *testing.T) {
3289+
catalog.SetupDefines("")
3290+
3291+
// idAllocator := common.NewIdAllocator(1000)
3292+
3293+
var (
3294+
accountId = catalog.System_Account
3295+
)
3296+
3297+
ctx, cancel := context.WithCancel(context.Background())
3298+
defer cancel()
3299+
ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId)
3300+
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Minute*5)
3301+
3302+
disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t)
3303+
defer func() {
3304+
disttaeEngine.Close(ctx)
3305+
taeHandler.Close(true)
3306+
rpcAgent.Close()
3307+
}()
3308+
3309+
cancelCh := make(chan struct{})
3310+
3311+
stub := gostub.Stub(
3312+
&iscp.GetJobSpecs,
3313+
func(
3314+
context.Context,
3315+
string,
3316+
client.TxnOperator,
3317+
uint32,
3318+
uint64,
3319+
[]string,
3320+
[]uint64,
3321+
) (jobSpec []*iscp.JobSpec, err error) {
3322+
return []*iscp.JobSpec{
3323+
{
3324+
ConsumerInfo: iscp.ConsumerInfo{
3325+
ConsumerType: int8(iscp.ConsumerType_CNConsumer),
3326+
SrcTable: iscp.TableInfo{
3327+
DBName: "srcdb",
3328+
TableName: "src_table",
3329+
},
3330+
},
3331+
},
3332+
}, nil
3333+
},
3334+
)
3335+
defer stub.Reset()
3336+
3337+
var flushCount int
3338+
stub2 := gostub.Stub(
3339+
&iscp.FlushJobStatusOnIterationState,
3340+
func(
3341+
context.Context,
3342+
string,
3343+
engine.Engine,
3344+
client.TxnClient,
3345+
uint32,
3346+
uint64,
3347+
[]string,
3348+
[]uint64,
3349+
[]*iscp.JobStatus,
3350+
types.TS,
3351+
int8,
3352+
) error {
3353+
if flushCount == 0 {
3354+
cancelCh <- struct{}{}
3355+
<-cancelCh
3356+
return nil
3357+
}
3358+
flushCount++
3359+
return nil
3360+
},
3361+
)
3362+
defer stub2.Reset()
3363+
3364+
err := mock_mo_indexes(disttaeEngine, ctxWithTimeout)
3365+
require.NoError(t, err)
3366+
err = mock_mo_foreign_keys(disttaeEngine, ctxWithTimeout)
3367+
require.NoError(t, err)
3368+
err = mock_mo_intra_system_change_propagation_log(disttaeEngine, ctxWithTimeout)
3369+
require.NoError(t, err)
3370+
bat := CreateDBAndTableForCNConsumerAndGetAppendData(t, disttaeEngine, ctxWithTimeout, "srcdb", "src_table", 10)
3371+
defer bat.Close()
3372+
3373+
_, rel, txn, err := disttaeEngine.GetTable(ctxWithTimeout, "srcdb", "src_table")
3374+
require.Nil(t, err)
3375+
3376+
tableID := rel.GetTableID(ctxWithTimeout)
3377+
3378+
txn.Commit(ctxWithTimeout)
3379+
3380+
wg := sync.WaitGroup{}
3381+
wg.Add(1)
3382+
go func() {
3383+
defer wg.Done()
3384+
err = iscp.ExecuteIteration(
3385+
ctxWithTimeout,
3386+
"",
3387+
disttaeEngine.Engine,
3388+
disttaeEngine.GetTxnClient(),
3389+
iscp.NewIterationContext(accountId, tableID, []string{"job1"}, []uint64{1}, types.TS{}, types.TS{}),
3390+
common.DebugAllocator,
3391+
)
3392+
assert.NoError(t, err)
3393+
}()
3394+
<-cancelCh
3395+
cancel()
3396+
close(cancelCh)
3397+
wg.Wait()
3398+
3399+
}

0 commit comments

Comments
 (0)