Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions pkg/workload/tpcc/tpcc_multi_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ type tpccMultiDB struct {

// dbListFile contains the list of databases that tpcc schema will be
// created on and have the workload executed on.
dbListFile string
dbList []*tree.ObjectNamePrefix
dbNames map[string]struct{}
dbListFile string
dbListForDDL []*tree.ObjectNamePrefix
dbListForDML []*tree.ObjectNamePrefix
dbNames map[string]struct{}

adminUrlStr string
adminUrls []string
Expand Down Expand Up @@ -257,13 +258,13 @@ func (t *tpccMultiDB) runBeforeEachTxn(ctx context.Context, tx pgx.Tx) error {
// in a roundrobin manner.
nextIdx := t.nextDatabase.Add(1)
targetDb := "tpccmultidb"
if t.dbList != nil {
databaseIdx := int(nextIdx % uint64(len(t.dbList)))
targetDb = t.dbList[databaseIdx].Catalog()
if _, err := tx.Exec(ctx, "USE $1", t.dbList[databaseIdx].Catalog()); err != nil {
if t.dbListForDML != nil {
databaseIdx := int(nextIdx % uint64(len(t.dbListForDML)))
targetDb = t.dbListForDML[databaseIdx].Catalog()
if _, err := tx.Exec(ctx, "USE $1", t.dbListForDML[databaseIdx].Catalog()); err != nil {
return err
}
if _, err := tx.Exec(ctx, fmt.Sprintf("SET search_path = %s", t.dbList[databaseIdx].Schema())); err != nil {
if _, err := tx.Exec(ctx, fmt.Sprintf("SET search_path = %s", t.dbListForDML[databaseIdx].Schema())); err != nil {
return err
}
}
Expand Down Expand Up @@ -298,16 +299,16 @@ func (t *tpccMultiDB) Ops(
// Tables implements the Generator interface.
func (t *tpccMultiDB) Tables() []workload.Table {
existingTables := t.tpcc.Tables()
if len(t.dbList) == 0 {
if len(t.dbListForDDL) == 0 {
return existingTables
}
// Take the normal TPCC tables and make a copy for each
// database in the list.
tablesPerDb := make([]workload.Table, 0, len(existingTables)*len(t.dbList))
tablesPerDb := make([]workload.Table, 0, len(existingTables)*len(t.dbListForDDL))
// We are going to order the list such that we are working on different
// databases in a round-robin fashion.
for _, tbl := range existingTables {
for _, db := range t.dbList {
for _, db := range t.dbListForDDL {
tbl.ObjectPrefix = db
tablesPerDb = append(tablesPerDb, tbl)
}
Expand All @@ -321,7 +322,8 @@ func (t *tpccMultiDB) runInit() error {
var err error
t.initLogic.Do(func() {
if t.dbListFile != "" {
file, err := os.ReadFile(t.dbListFile)
var file []byte
file, err = os.ReadFile(t.dbListFile)
if err != nil {
return
}
Expand All @@ -332,6 +334,7 @@ func (t *tpccMultiDB) runInit() error {
// First, sort the prefixes by database name.
dbToBucket := make(map[string]int)
t.dbNames = make(map[string]struct{})
t.dbListForDML = make([]*tree.ObjectNamePrefix, 0, len(strDbList))
var dbNameListBuckets [][]*tree.ObjectNamePrefix
maxBucketLen := 0
for _, dbAndSchema := range strDbList {
Expand All @@ -354,17 +357,22 @@ func (t *tpccMultiDB) runInit() error {
bucket := dbToBucket[parts[0]]
dbNameListBuckets[bucket] = append(dbNameListBuckets[bucket], prefix)
maxBucketLen = max(maxBucketLen, len(dbNameListBuckets[bucket]))
t.dbListForDML = append(t.dbListForDML, prefix)
}
// Next, generate the dbList slice by doing a round-robin across the
if len(t.dbListForDML) == 0 {
err = errors.Newf("db-list-file specified but no databases found")
return
}
// Next, generate the dbListForDDL slice by doing a round-robin across the
// databases in the map. This minimizes deadlocks by ensuring we are
// concurrently working on separate databases.
t.dbList = make([]*tree.ObjectNamePrefix, 0, len(strDbList))
t.dbListForDDL = make([]*tree.ObjectNamePrefix, 0, len(strDbList))
for range maxBucketLen {
for idx := range dbNameListBuckets {
if len(dbNameListBuckets[idx]) == 0 {
continue
}
t.dbList = append(t.dbList, dbNameListBuckets[idx][0])
t.dbListForDDL = append(t.dbListForDDL, dbNameListBuckets[idx][0])
dbNameListBuckets[idx] = dbNameListBuckets[idx][1:]
}
}
Expand All @@ -379,7 +387,8 @@ func (t *tpccMultiDB) runInit() error {
t.adminUrls = strings.Split(t.adminUrlStr, ",")
}
if t.consoleAPICommandFile != "" {
file, err := os.ReadFile(t.consoleAPICommandFile)
var file []byte
file, err = os.ReadFile(t.consoleAPICommandFile)
if err != nil {
return
}
Expand Down Expand Up @@ -421,7 +430,7 @@ func (t *tpccMultiDB) Hooks() workload.Hooks {
return err
}
// Create all of the databases that was specified in the list.
for _, dbName := range t.dbList {
for _, dbName := range t.dbListForDDL {
if _, err := tx.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", dbName.Catalog())); err != nil {
return err
}
Expand Down Expand Up @@ -460,7 +469,7 @@ func (t *tpccMultiDB) Hooks() workload.Hooks {
hooks.PostLoad = func(ctx context.Context, db *gosql.DB) error {
grp := ctxgroup.WithContext(ctx)
postLoadConcurrency := quotapool.NewIntPool("post-load-pool", 8)
for _, dbName := range t.dbList {
for _, dbName := range t.dbListForDDL {
alloc, err := postLoadConcurrency.Acquire(ctx, 1)
if err != nil {
return err
Expand Down