Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Single Active Replication #21347

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions api/v2.0/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1795,7 +1795,7 @@ paths:
items:
$ref: '#/definitions/AuditLogEventType'
'401':
$ref: '#/responses/401'
$ref: '#/responses/401'
/projects/{project_name}/logs:
get:
summary: Get recent logs of the projects (deprecated)
Expand Down Expand Up @@ -1863,7 +1863,7 @@ paths:
'401':
$ref: '#/responses/401'
'500':
$ref: '#/responses/500'
$ref: '#/responses/500'
/p2p/preheat/providers:
get:
summary: List P2P providers
Expand Down Expand Up @@ -6949,8 +6949,8 @@ definitions:
description: The time when this operation is triggered.
AuditLogEventType:
type: object
properties:
event_type:
properties:
event_type:
type: string
description: the event type, such as create_user.
example: create_user
Expand Down Expand Up @@ -7446,6 +7446,12 @@ definitions:
type: boolean
description: Whether to enable copy by chunk.
x-isnullable: true
single_active_replication:
type: boolean
description: |-
Whether to defer execution until the previous active execution finishes,
avoiding the execution of the same replication rules multiple times in parallel.
x-isnullable: true # make this field optional to keep backward compatibility
ReplicationTrigger:
type: object
properties:
Expand Down
2 changes: 2 additions & 0 deletions make/migrations/postgresql/0160_2.13.0_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ CREATE INDEX IF NOT EXISTS idx_audit_log_ext_op_time ON audit_log_ext (op_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_optime ON audit_log_ext (project_id, op_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_resource_type ON audit_log_ext (project_id, resource_type);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_operation ON audit_log_ext (project_id, operation);

ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS single_active_replication boolean;
23 changes: 23 additions & 0 deletions src/controller/replication/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,33 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
if op := operator.FromContext(ctx); op != "" {
extra["operator"] = op
}

id, err := c.execMgr.Create(ctx, job.ReplicationVendorType, policy.ID, trigger, extra)
if err != nil {
return 0, err
}

// If running executions are found, skip the current execution and mark it as skipped.
if policy.SingleActiveReplication {
count, err := c.execMgr.Count(ctx, &q.Query{
Keywords: map[string]interface{}{
"VendorType": job.ReplicationVendorType,
"VendorID": policy.ID,
"Status": job.RunningStatus.String(),
},
})
if err != nil {
return 0, err
}

if count > 1 {
if err = c.execMgr.MarkSkipped(ctx, id, "Execution skipped: active replication still in progress."); err != nil {
return 0, err
}
return id, nil
}
}

Comment on lines +117 to +138
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

core logic lies here.

// start the replication flow in background
// as the process runs inside a goroutine, the transaction in the outer ctx
// may be submitted already when the process starts, so create an new context
Expand Down
32 changes: 32 additions & 0 deletions src/controller/replication/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,38 @@ func (r *replicationTestSuite) TestStart() {
r.execMgr.AssertExpectations(r.T())
r.flowCtl.AssertExpectations(r.T())
r.ormCreator.AssertExpectations(r.T())

r.SetupTest()

// run replication flow with SingleActiveReplication, flow should not start
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
r.execMgr.On("MarkSkipped", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r.execMgr.On("Count", mock.Anything, mock.Anything).Return(int64(2), nil) // Simulate an existing running execution
id, err = r.ctl.Start(context.Background(), &repctlmodel.Policy{Enabled: true, SingleActiveReplication: true}, nil, task.ExecutionTriggerManual)
r.Require().Nil(err)
r.Equal(int64(1), id)
time.Sleep(1 * time.Second) // wait the functions called in the goroutine
r.flowCtl.AssertNumberOfCalls(r.T(), "Start", 0)
r.execMgr.AssertNumberOfCalls(r.T(), "MarkSkipped", 1) // Ensure execution marked as skipped
r.execMgr.AssertExpectations(r.T())
r.flowCtl.AssertExpectations(r.T())
r.ormCreator.AssertExpectations(r.T())

r.SetupTest()

// no error when running the replication flow with SingleActiveReplication
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil)
r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
r.ormCreator.On("Create").Return(nil)
r.execMgr.On("Count", mock.Anything, mock.Anything).Return(int64(1), nil) // Simulate no running execution
id, err = r.ctl.Start(context.Background(), &repctlmodel.Policy{Enabled: true, SingleActiveReplication: true}, nil, task.ExecutionTriggerManual)
r.Require().Nil(err)
r.Equal(int64(1), id)
time.Sleep(1 * time.Second) // wait the functions called in the goroutine
r.execMgr.AssertExpectations(r.T())
r.flowCtl.AssertExpectations(r.T())
r.ormCreator.AssertExpectations(r.T())
}

func (r *replicationTestSuite) TestStop() {
Expand Down
3 changes: 3 additions & 0 deletions src/controller/replication/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Policy struct {
UpdateTime time.Time `json:"update_time"`
Speed int32 `json:"speed"`
CopyByChunk bool `json:"copy_by_chunk"`
SingleActiveReplication bool `json:"single_active_replication"`
}

// IsScheduledTrigger returns true when the policy is scheduled trigger and enabled
Expand Down Expand Up @@ -141,6 +142,7 @@ func (p *Policy) From(policy *replicationmodel.Policy) error {
p.UpdateTime = policy.UpdateTime
p.Speed = policy.Speed
p.CopyByChunk = policy.CopyByChunk
p.SingleActiveReplication = policy.SingleActiveReplication

if policy.SrcRegistryID > 0 {
p.SrcRegistry = &model.Registry{
Expand Down Expand Up @@ -186,6 +188,7 @@ func (p *Policy) To() (*replicationmodel.Policy, error) {
UpdateTime: p.UpdateTime,
Speed: p.Speed,
CopyByChunk: p.CopyByChunk,
SingleActiveReplication: p.SingleActiveReplication,
}
if p.SrcRegistry != nil {
policy.SrcRegistryID = p.SrcRegistry.ID
Expand Down
4 changes: 2 additions & 2 deletions src/jobservice/common/rds/scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ local function compare(status, revision)
local aCode = stCode(ARGV[1])
local aRev = tonumber(ARGV[2]) or 0
local aCheckInT = tonumber(ARGV[3]) or 0
if revision < aRev or
if revision < aRev or
( revision == aRev and sCode <= aCode ) or
( revision == aRev and aCheckInT ~= 0 )
then
Expand Down Expand Up @@ -96,7 +96,7 @@ if res then
redis.call('persist', KEYS[1])
end
end

return 'ok'
end
end
Expand Down
4 changes: 4 additions & 0 deletions src/jobservice/job/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
SuccessStatus Status = "Success"
// ScheduledStatus : job status scheduled
ScheduledStatus Status = "Scheduled"
// SkippedStatus : job status skipped
SkippedStatus Status = "Skipped"
)

// Status of job
Expand Down Expand Up @@ -62,6 +64,8 @@ func (s Status) Code() int {
return 3
case "Success":
return 3
case "Skipped":
return 3
default:
}

Expand Down
2 changes: 1 addition & 1 deletion src/jobservice/runner/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
case job.PendingStatus, job.ScheduledStatus:
// do nothing now
break
case job.StoppedStatus:
case job.StoppedStatus, job.SkippedStatus:
// Probably jobs has been stopped by directly mark status to stopped.
// Directly exit and no retry
return nil
Expand Down
1 change: 1 addition & 0 deletions src/pkg/replication/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Policy struct {
UpdateTime time.Time `orm:"column(update_time);auto_now"`
Speed int32 `orm:"column(speed_kb)"`
CopyByChunk bool `orm:"column(copy_by_chunk)"`
SingleActiveReplication bool `orm:"column(single_active_replication)"`
}

// TableName set table name for ORM
Expand Down
13 changes: 13 additions & 0 deletions src/pkg/task/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type ExecutionManager interface {
// In other cases, the execution status can be calculated from the referenced tasks automatically
// and no need to update it explicitly
MarkDone(ctx context.Context, id int64, message string) (err error)
// MarkSkipped marks the status of the specified execution as skipped.
MarkSkipped(ctx context.Context, id int64, message string) (err error)
// MarkError marks the status of the specified execution as error.
// It must be called to update the execution status when failed to create tasks.
// In other cases, the execution status can be calculated from the referenced tasks automatically
Expand Down Expand Up @@ -139,6 +141,17 @@ func (e *executionManager) UpdateExtraAttrs(ctx context.Context, id int64, extra
return e.executionDAO.Update(ctx, execution, "ExtraAttrs", "UpdateTime")
}

func (e *executionManager) MarkSkipped(ctx context.Context, id int64, message string) error {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{
ID: id,
Status: job.SkippedStatus.String(),
StatusMessage: message,
UpdateTime: now,
EndTime: now,
}, "Status", "StatusMessage", "UpdateTime", "EndTime")
}

Comment on lines +144 to +154
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function used to mark the duplicate executions as skipped.

func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,37 @@ <h3 class="modal-title">{{ headerTitle | translate }}</h3>
'REPLICATION.ENABLED_RULE' | translate
}}</label>
</div>
<div class="clr-checkbox-wrapper">
<input
type="checkbox"
class="clr-checkbox"
[checked]="true"
id="singleActiveReplication"
formControlName="single_active_replication" />
<label
for="singleActiveReplication"
class="clr-control-label single-active"
>{{
'REPLICATION.SINGLE_ACTIVE_REPLICATION'
| translate
}}
<clr-tooltip class="override-tooltip">
<clr-icon
clrTooltipTrigger
shape="info-circle"
size="24"></clr-icon>
<clr-tooltip-content
clrPosition="top-left"
clrSize="md"
*clrIfOpen>
<span>{{
'TOOLTIP.SINGLE_ACTIVE_REPLICATION'
| translate
}}</span>
</clr-tooltip-content>
</clr-tooltip>
</label>
</div>
</div>
</div>
</form>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ clr-modal {
width: 8.6rem;
}

.single-active {
width: 16rem;
}

.des-tooltip {
margin-left: 0.5rem;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ export class CreateEditRuleComponent implements OnInit, OnDestroy {
override: true,
speed: -1,
copy_by_chunk: false,
single_active_replication: false,
});
}

Expand Down Expand Up @@ -367,6 +368,7 @@ export class CreateEditRuleComponent implements OnInit, OnDestroy {
dest_namespace_replace_count: Flatten_Level.FLATTEN_LEVEl_1,
speed: -1,
copy_by_chunk: false,
single_active_replication: false,
});
this.isPushMode = true;
this.selectedUnit = BandwidthUnit.KB;
Expand Down Expand Up @@ -410,6 +412,7 @@ export class CreateEditRuleComponent implements OnInit, OnDestroy {
override: rule.override,
speed: speed,
copy_by_chunk: rule.copy_by_chunk,
single_active_replication: rule.single_active_replication,
});
let filtersArray = this.getFilterArray(rule);
this.noSelectedEndpoint = false;
Expand Down
2 changes: 2 additions & 0 deletions src/portal/src/i18n/lang/de-de-lang.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"PULL_BASED": "Lade die Ressourcen von der entfernten Registry auf den lokalen Harbor runter.",
"DESTINATION_NAMESPACE": "Spezifizieren des Ziel-Namespace. Wenn das Feld leer ist, werden die Ressourcen unter dem gleichen Namespace abgelegt wie in der Quelle.",
"OVERRIDE": "Spezifizieren, ob die Ressourcen am Ziel überschrieben werden sollen, falls eine Ressource mit gleichem Namen existiert.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to defer execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "E-Mail sollte eine gültige E-Mail-Adresse wie [email protected] sein.",
"USER_NAME": "Darf keine Sonderzeichen enthalten und sollte kürzer als 255 Zeichen sein.",
"FULL_NAME": "Maximale Länge soll 20 Zeichen sein.",
Expand Down Expand Up @@ -560,6 +561,7 @@
"ALLOWED_CHARACTERS": "Erlaubte Sonderzeichen",
"TOTAL": "Gesamt",
"OVERRIDE": "Überschreiben",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Aktiviere Regel",
"OVERRIDE_INFO": "Überschreiben",
"OPERATION": "Operation",
Expand Down
2 changes: 2 additions & 0 deletions src/portal/src/i18n/lang/en-us-lang.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"PULL_BASED": "Pull the resources from the remote registry to the local Harbor.",
"DESTINATION_NAMESPACE": "Specify the destination namespace. If empty, the resources will be put under the same namespace as the source.",
"OVERRIDE": "Specify whether to override the resources at the destination if a resource with the same name exists.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to defer execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "Email should be a valid email address like [email protected].",
"USER_NAME": "Cannot contain special characters and maximum length should be 255 characters.",
"FULL_NAME": "Maximum length should be 20 characters.",
Expand Down Expand Up @@ -560,6 +561,7 @@
"ALLOWED_CHARACTERS": "Allowed special characters",
"TOTAL": "Total",
"OVERRIDE": "Override",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Enable rule",
"OVERRIDE_INFO": "Override",
"OPERATION": "Operation",
Expand Down
2 changes: 2 additions & 0 deletions src/portal/src/i18n/lang/es-es-lang.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"PULL_BASED": "Pull de recursos del remote registry al local Harbor.",
"DESTINATION_NAMESPACE": "Especificar el namespace de destino. Si esta vacio, los recursos se colocan en el mismo namespace del recurso.",
"OVERRIDE": "Especifique si desea anular los recursos en el destino si existe un recurso con el mismo nombre.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to defer execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "El email debe ser una dirección válida como [email protected].",
"USER_NAME": "Debe tener una longitud máxima de 255 caracteres y no puede contener caracteres especiales.",
"FULL_NAME": "La longitud máxima debería ser de 20 caracteres.",
Expand Down Expand Up @@ -560,6 +561,7 @@
"ALLOWED_CHARACTERS": "Caracteres Especiales Permitidos",
"TOTAL": "Total",
"OVERRIDE": "Sobreescribir",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Activar regla",
"OVERRIDE_INFO": "Sobreescribir",
"CURRENT": "Actual",
Expand Down
2 changes: 2 additions & 0 deletions src/portal/src/i18n/lang/fr-fr-lang.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"PULL_BASED": "Pull les ressources du registre distant vers le Harbor local.",
"DESTINATION_NAMESPACE": "Spécifier l'espace de nom de destination. Si vide, les ressources seront placées sous le même espace de nom que la source.",
"OVERRIDE": "Spécifier s'il faut remplacer les ressources dans la destination si une ressource avec le même nom existe.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to defer execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "L'e-mail doit être une adresse e-mail valide comme [email protected].",
"USER_NAME": "Ne peut pas contenir de caractères spéciaux et la longueur maximale est de 255 caractères.",
"FULL_NAME": "La longueur maximale est de 20 caractères.",
Expand Down Expand Up @@ -560,6 +561,7 @@
"ALLOWED_CHARACTERS": "Caractères spéciaux autorisés",
"TOTAL": "Total",
"OVERRIDE": "Surcharger",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Activer la règle",
"OVERRIDE_INFO": "Surcharger",
"OPERATION": "Opération",
Expand Down
2 changes: 2 additions & 0 deletions src/portal/src/i18n/lang/ko-kr-lang.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"PULL_BASED": "원격 레지스트리의 리소스를 로컬 'Harbor'로 가져옵니다.",
"DESTINATION_NAMESPACE": "대상 네임스페이스를 지정합니다. 비어 있으면 리소스는 소스와 동일한 네임스페이스에 배치됩니다.",
"OVERRIDE": "동일한 이름의 리소스가 있는 경우 대상의 리소스를 재정의할지 여부를 지정합니다.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to defer execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "이메일은 [email protected]과 같은 유효한 이메일 주소여야 합니다.",
"USER_NAME": "특수 문자를 포함할 수 없으며 최대 길이는 255자입니다.",
"FULL_NAME": "최대 길이는 20자입니다.",
Expand Down Expand Up @@ -557,6 +558,7 @@
"ALLOWED_CHARACTERS": "허용되는 특수 문자",
"TOTAL": "총",
"OVERRIDE": "Override",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "규칙 활성화",
"OVERRIDE_INFO": "Override",
"OPERATION": "작업",
Expand Down
2 changes: 2 additions & 0 deletions src/portal/src/i18n/lang/pt-br-lang.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"PULL_BASED": "Trazer recursos do repositório remoto para o Harbor local.",
"DESTINATION_NAMESPACE": "Especificar o namespace de destino. Se vazio, os recursos serão colocados no mesmo namespace que a fonte.",
"OVERRIDE": "Sobrescrever recursos no destino se já existir com o mesmo nome.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to defer execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "Deve ser um endereço de e-mail válido como [email protected].",
"USER_NAME": "Não pode conter caracteres especiais. Tamanho máximo de 255 caracteres.",
"FULL_NAME": "Tamanho máximo de 20 caracteres.",
Expand Down Expand Up @@ -558,6 +559,7 @@
"ALLOWED_CHARACTERS": "Símbolos permitidos",
"TOTAL": "Total",
"OVERRIDE": "Sobrescrever",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Habiltar regra",
"OVERRIDE_INFO": "Sobrescrever",
"CURRENT": "atual",
Expand Down
Loading
Loading