Skip to content

Commit 6ab6afc

Browse files
authored
some simple restrictions on clone privilege. (#22329)
``` 1. 普通租户不能 clone 任一系统 db/table 如 mo_task, mo_catalog, system, mysql, system_mertric, information_schema, mo_debug 这个操作等同于在系统 db 下面创建表,为了和 create table 权限保持一致 2. sys租户和普通租户都不能 clone 数据到系统 db 不能将其他 table clone 到 系统 db 下 3. 普通租户不能 clone cluster table cluster table 会聚合很多租户的数据,普通租户可以用 select 查看属于自己的数据。 如果允许 clone,那么 clone 就需要过滤 table 的数据,无法优化,与 select 功能无异。(rawlog, statement_info...) 4. 普通租户可以 clone 已经订阅的 db/table ==> 只有 sys 租户才能 1. clone 系统表 sys 租户和普通租户都不能 1. clone 数据到系统 db 下面 ``` Approved by: @heni02, @iamlinjunhong, @ouyuanning, @daviszhen
1 parent f9e59f5 commit 6ab6afc

File tree

7 files changed

+166
-21
lines changed

7 files changed

+166
-21
lines changed

pkg/frontend/clone.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ func handleCloneTableAcrossAccounts(
8585
fromAccountId uint32
8686
)
8787

88-
reqCtx = context.WithValue(reqCtx, tree.CloneLevelCtxKey{}, tree.CloneLevelTable)
88+
if reqCtx.Value(tree.CloneLevelCtxKey{}) == nil {
89+
reqCtx = context.WithValue(reqCtx, tree.CloneLevelCtxKey{}, tree.NormalCloneLevelTable)
90+
}
8991

9092
bh = ses.GetBackgroundExec(reqCtx)
9193
if err = bh.Exec(reqCtx, "begin"); err != nil {
@@ -191,7 +193,9 @@ func handleCloneDatabase(
191193
snapshotTS int64
192194
)
193195

194-
reqCtx = context.WithValue(reqCtx, tree.CloneLevelCtxKey{}, tree.CloneLevelDatabase)
196+
if reqCtx.Value(tree.CloneLevelCtxKey{}) == nil {
197+
reqCtx = context.WithValue(reqCtx, tree.CloneLevelCtxKey{}, tree.NormalCloneLevelDatabase)
198+
}
195199

196200
bh = ses.GetBackgroundExec(reqCtx)
197201
if err = bh.Exec(reqCtx, "begin"); err != nil {

pkg/frontend/pitr.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,7 +1113,7 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s
11131113
// restore according the restore level
11141114
switch restoreLevel {
11151115
case tree.RESTORELEVELCLUSTER:
1116-
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.CloneLevelCluster)
1116+
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.RestoreCloneLevelCluster)
11171117
subDbToRestore := make(map[string]*subDbRestoreRecord)
11181118
if err = restoreToCluster(ctx, ses, bh, pitrName, ts, subDbToRestore); err != nil {
11191119
return
@@ -1129,17 +1129,17 @@ func doRestorePitr(ctx context.Context, ses *Session, stmt *tree.RestorePitr) (s
11291129
}
11301130
return
11311131
case tree.RESTORELEVELACCOUNT:
1132-
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.CloneLevelAccount)
1132+
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.RestoreCloneLevelAccount)
11331133
if err = restoreToAccountWithPitr(ctx, ses.GetService(), bh, pitrName, ts, fkTableMap, viewMap, tenantInfo.TenantID); err != nil {
11341134
return
11351135
}
11361136
case tree.RESTORELEVELDATABASE:
1137-
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.CloneLevelDatabase)
1137+
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.RestoreCloneLevelDatabase)
11381138
if err = restoreToDatabaseWithPitr(ctx, ses.GetService(), bh, pitrName, ts, dbName, fkTableMap, viewMap, tenantInfo.TenantID); err != nil {
11391139
return
11401140
}
11411141
case tree.RESTORELEVELTABLE:
1142-
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.CloneLevelTable)
1142+
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.RestoreCloneLevelTable)
11431143
if err = restoreToTableWithPitr(ctx, ses.service, bh, pitrName, ts, dbName, tblName, fkTableMap, viewMap, tenantInfo.TenantID); err != nil {
11441144
return
11451145
}

pkg/frontend/snapshot.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ func doRestoreSnapshot(ctx context.Context, ses *Session, stmt *tree.RestoreSnap
557557

558558
// restore cluster
559559
if stmt.Level == tree.RESTORELEVELCLUSTER {
560-
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.CloneLevelCluster)
560+
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.RestoreCloneLevelCluster)
561561

562562
// restore cluster
563563
subDbToRestore := make(map[string]*subDbRestoreRecord)
@@ -580,7 +580,7 @@ func doRestoreSnapshot(ctx context.Context, ses *Session, stmt *tree.RestoreSnap
580580

581581
// restore account by cluster level snapshot
582582
if snapshot.level == tree.RESTORELEVELCLUSTER.String() && len(srcAccountName) != 0 {
583-
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.CloneLevelCluster)
583+
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.RestoreCloneLevelCluster)
584584

585585
err = restoreToAccountUsingCluster(ctx, ses, bh, stmt, *snapshot)
586586
if err != nil {
@@ -616,7 +616,7 @@ func doRestoreSnapshot(ctx context.Context, ses *Session, stmt *tree.RestoreSnap
616616

617617
switch stmt.Level {
618618
case tree.RESTORELEVELACCOUNT:
619-
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.CloneLevelAccount)
619+
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.RestoreCloneLevelAccount)
620620

621621
if err = restoreToAccount(ctx,
622622
ses.GetService(),
@@ -631,7 +631,7 @@ func doRestoreSnapshot(ctx context.Context, ses *Session, stmt *tree.RestoreSnap
631631
return stats, err
632632
}
633633
case tree.RESTORELEVELDATABASE:
634-
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.CloneLevelDatabase)
634+
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.RestoreCloneLevelDatabase)
635635

636636
if err = restoreToDatabase(ctx,
637637
ses.GetService(),
@@ -648,7 +648,7 @@ func doRestoreSnapshot(ctx context.Context, ses *Session, stmt *tree.RestoreSnap
648648
return stats, err
649649
}
650650
case tree.RESTORELEVELTABLE:
651-
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.CloneLevelTable)
651+
ctx = context.WithValue(ctx, tree.CloneLevelCtxKey{}, tree.RestoreCloneLevelTable)
652652

653653
if err = restoreToTable(ctx,
654654
ses.GetService(),

pkg/sql/parsers/tree/clone.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,19 @@ func init() {
4141
}
4242

4343
type CloneLevelCtxKey struct{}
44-
type CloneLevelType int
44+
type CloneLevelType uint64
4545
type CloneStmtType int
4646

4747
const (
48-
CloneLevelTable CloneLevelType = iota
49-
CloneLevelDatabase
50-
CloneLevelAccount
51-
CloneLevelCluster
48+
NormalCloneLevelTable CloneLevelType = 1 << iota
49+
NormalCloneLevelDatabase
50+
NormalCloneLevelAccount
51+
NormalCloneLevelCluster
52+
53+
RestoreCloneLevelTable
54+
RestoreCloneLevelDatabase
55+
RestoreCloneLevelAccount
56+
RestoreCloneLevelCluster
5257
)
5358

5459
const (
@@ -162,25 +167,25 @@ func DecideCloneStmtType(
162167
}
163168

164169
var (
165-
level = CloneLevelTable
170+
level = NormalCloneLevelTable
166171
)
167172

168173
if val := ctx.Value(CloneLevelCtxKey{}); val != nil {
169174
level = val.(CloneLevelType)
170175
}
171176

172177
switch level {
173-
case CloneLevelCluster:
178+
case NormalCloneLevelCluster, RestoreCloneLevelCluster:
174179
return CloneCluster
175-
case CloneLevelAccount:
180+
case NormalCloneLevelAccount, RestoreCloneLevelAccount:
176181
return CloneAccount
177-
case CloneLevelDatabase:
182+
case NormalCloneLevelDatabase, RestoreCloneLevelDatabase:
178183
if srcAccount == toAccount {
179184
return WithinAccCloneDB
180185
}
181186
return BetweenAccCloneDB
182187

183-
case CloneLevelTable:
188+
case NormalCloneLevelTable, RestoreCloneLevelTable:
184189
if srcAccount == toAccount {
185190
if srcDbName == dstDbName {
186191
return WithinDBCloneTable

pkg/sql/plan/build_table_clone.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
package plan
1616

1717
import (
18+
"context"
19+
"slices"
20+
"strings"
21+
22+
"github.com/matrixorigin/matrixone/pkg/catalog"
1823
"github.com/matrixorigin/matrixone/pkg/common/moerr"
1924
"github.com/matrixorigin/matrixone/pkg/pb/plan"
2025
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
@@ -135,6 +140,12 @@ func buildCloneTable(
135140
dstAccount, srcAccount,
136141
)
137142

143+
if err = checkPrivilege(
144+
ctx.GetContext(), stmt, opAccount, srcAccount, dstAccount, srcTblDef, dstTblDef,
145+
); err != nil {
146+
return nil, err
147+
}
148+
138149
if createTablePlan, err = buildCreateTable(ctx, &stmt.CreateTable, stmt); err != nil {
139150
return nil, err
140151
}
@@ -148,3 +159,56 @@ func buildCloneTable(
148159

149160
return createTablePlan, nil
150161
}
162+
163+
func checkPrivilege(
164+
ctx context.Context,
165+
stmt *tree.CloneTable,
166+
opAccount uint32,
167+
dstAccount uint32,
168+
srcAccount uint32,
169+
srcTblDef *TableDef,
170+
dstTblDef *TableDef,
171+
) (err error) {
172+
173+
// 1. only sys can clone from system databases
174+
// 2. sys and non-sys both cannot clone to system database
175+
// 3. if this is a restore clone stmt, skip this check
176+
177+
if val := ctx.Value(tree.CloneLevelCtxKey{}); val != nil {
178+
switch val.(tree.CloneLevelType) {
179+
case tree.RestoreCloneLevelAccount,
180+
tree.RestoreCloneLevelCluster,
181+
tree.RestoreCloneLevelDatabase,
182+
tree.RestoreCloneLevelTable:
183+
// skip this check
184+
return nil
185+
default:
186+
}
187+
}
188+
189+
var (
190+
typ int
191+
)
192+
193+
if slices.Index(
194+
catalog.SystemDatabases, strings.ToLower(srcTblDef.DbName),
195+
) != -1 {
196+
// clone from system databases
197+
typ = 1
198+
} else if slices.Index(
199+
catalog.SystemDatabases, strings.ToLower(dstTblDef.DbName),
200+
) != -1 {
201+
// clone to a system database
202+
typ = 2
203+
}
204+
205+
if typ == 2 {
206+
return moerr.NewInternalErrorNoCtx("cannot clone data into system database")
207+
} else if typ == 1 {
208+
if opAccount != catalog.System_Account {
209+
return moerr.NewInternalErrorNoCtx("non-sys account cannot clone data from system database")
210+
}
211+
}
212+
213+
return nil
214+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
drop account if exists acc1;
2+
create account acc1 admin_name "root1" identified by "111";
3+
create database db1 clone mo_catalog;
4+
internal error: non-sys account cannot clone data from system database
5+
create database db3 clone system;
6+
internal error: non-sys account cannot clone data from system database
7+
create database db4 clone information_schema;
8+
internal error: non-sys account cannot clone data from system database
9+
create database db6;
10+
create table db6.t1 clone mo_catalog.mo_tables;
11+
internal error: non-sys account cannot clone data from system database
12+
create table db6.t2 clone system.statement_info;
13+
internal error: non-sys account cannot clone data from system database
14+
create database db7;
15+
create table db7.t1 (a int primary key);
16+
create table mo_catalog.t1 clone db7.t1;
17+
internal error: cannot clone data into system database
18+
create table system.t1 clone db7.t1;
19+
internal error: cannot clone data into system database
20+
create database db1 clone mo_catalog;
21+
create database db2 clone system;
22+
create database db3;
23+
create table db3.t1 clone mo_catalog.mo_tables;
24+
create table db3.t2 clone system.statement_info;
25+
create database db4;
26+
create table db4.t1 (a int primary key);
27+
create table mo_catalog.t1 clone db4.t1;
28+
internal error: cannot clone data into system database
29+
create table system.t1 clone db4.t1;
30+
internal error: cannot clone data into system database
31+
drop account if exists acc1;
32+
drop database if exists db1;
33+
drop database if exists db2;
34+
drop database if exists db3;
35+
drop database if exists db4;
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
drop account if exists acc1;
2+
create account acc1 admin_name "root1" identified by "111";
3+
4+
-- @session:id=1&user=acc1:root1&password=111
5+
create database db1 clone mo_catalog;
6+
create database db3 clone system;
7+
create database db4 clone information_schema;
8+
9+
create database db6;
10+
create table db6.t1 clone mo_catalog.mo_tables;
11+
create table db6.t2 clone system.statement_info;
12+
13+
create database db7;
14+
create table db7.t1 (a int primary key);
15+
16+
create table mo_catalog.t1 clone db7.t1;
17+
create table system.t1 clone db7.t1;
18+
-- @session
19+
20+
create database db1 clone mo_catalog;
21+
create database db2 clone system;
22+
23+
create database db3;
24+
create table db3.t1 clone mo_catalog.mo_tables;
25+
create table db3.t2 clone system.statement_info;
26+
27+
create database db4;
28+
create table db4.t1 (a int primary key);
29+
create table mo_catalog.t1 clone db4.t1;
30+
create table system.t1 clone db4.t1;
31+
32+
33+
drop account if exists acc1;
34+
drop database if exists db1;
35+
drop database if exists db2;
36+
drop database if exists db3;
37+
drop database if exists db4;

0 commit comments

Comments
 (0)