Skip to content

Commit 9d45ba7

Browse files
committed
feat: provide support for custom table name
- support mysql - support pgsql Signed-off-by: happy-game <[email protected]>
1 parent 04d126f commit 9d45ba7

File tree

2 files changed

+84
-64
lines changed

2 files changed

+84
-64
lines changed

pkg/drivers/mysql/mysql.go

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ const (
2424
defaultHostDSN = "root@tcp(127.0.0.1)/"
2525
)
2626

27-
var (
28-
schema = []string{
29-
`CREATE TABLE IF NOT EXISTS kine
27+
func getSchema(tableName string) []string {
28+
return []string{
29+
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s
3030
(
3131
id BIGINT UNSIGNED AUTO_INCREMENT,
3232
name VARCHAR(630) CHARACTER SET ascii,
@@ -38,21 +38,25 @@ var (
3838
value MEDIUMBLOB,
3939
old_value MEDIUMBLOB,
4040
PRIMARY KEY (id)
41-
);`,
42-
`CREATE INDEX kine_name_index ON kine (name)`,
43-
`CREATE INDEX kine_name_id_index ON kine (name,id)`,
44-
`CREATE INDEX kine_id_deleted_index ON kine (id,deleted)`,
45-
`CREATE INDEX kine_prev_revision_index ON kine (prev_revision)`,
46-
`CREATE UNIQUE INDEX kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
47-
}
48-
schemaMigrations = []string{
49-
`ALTER TABLE kine MODIFY COLUMN id BIGINT UNSIGNED AUTO_INCREMENT NOT NULL UNIQUE, MODIFY COLUMN create_revision BIGINT UNSIGNED, MODIFY COLUMN prev_revision BIGINT UNSIGNED`,
41+
);`, tableName),
42+
fmt.Sprintf(`CREATE INDEX %s_name_index ON %s (name)`, tableName, tableName),
43+
fmt.Sprintf(`CREATE INDEX %s_name_id_index ON %s (name,id)`, tableName, tableName),
44+
fmt.Sprintf(`CREATE INDEX %s_id_deleted_index ON %s (id,deleted)`, tableName, tableName),
45+
fmt.Sprintf(`CREATE INDEX %s_prev_revision_index ON %s (prev_revision)`, tableName, tableName),
46+
fmt.Sprintf(`CREATE UNIQUE INDEX %s_name_prev_revision_uindex ON %s (name, prev_revision)`, tableName, tableName),
47+
}
48+
}
49+
50+
func getSchemaMigrations(tableName string) []string {
51+
return []string{
52+
fmt.Sprintf(`ALTER TABLE %s MODIFY COLUMN id BIGINT UNSIGNED AUTO_INCREMENT NOT NULL UNIQUE, MODIFY COLUMN create_revision BIGINT UNSIGNED, MODIFY COLUMN prev_revision BIGINT UNSIGNED`, tableName),
5053
// Creating an empty migration to ensure that postgresql and mysql migrations match up
5154
// with each other for a give value of KINE_SCHEMA_MIGRATION env var
5255
``,
5356
}
54-
createDB = "CREATE DATABASE IF NOT EXISTS `%s`;"
55-
)
57+
}
58+
59+
var createDB = "CREATE DATABASE IF NOT EXISTS `%s`;"
5660

5761
func New(ctx context.Context, cfg *drivers.Config) (bool, server.Backend, error) {
5862
tlsConfig, err := cfg.BackendTLSConfig.ClientConfig()
@@ -78,28 +82,33 @@ func New(ctx context.Context, cfg *drivers.Config) (bool, server.Backend, error)
7882
return false, nil, err
7983
}
8084

85+
tableName := cfg.TableName
86+
if tableName == "" {
87+
tableName = "kine"
88+
}
89+
8190
dialect.LastInsertID = true
82-
dialect.GetSizeSQL = `
91+
dialect.GetSizeSQL = fmt.Sprintf(`
8392
SELECT SUM(data_length + index_length)
8493
FROM information_schema.TABLES
85-
WHERE table_schema = DATABASE() AND table_name = 'kine'`
86-
dialect.CompactSQL = `
87-
DELETE kv FROM kine AS kv
94+
WHERE table_schema = DATABASE() AND table_name = '%s'`, tableName)
95+
dialect.CompactSQL = fmt.Sprintf(`
96+
DELETE kv FROM %s AS kv
8897
INNER JOIN (
8998
SELECT kp.prev_revision AS id
90-
FROM kine AS kp
99+
FROM %s AS kp
91100
WHERE
92101
kp.name != 'compact_rev_key' AND
93102
kp.prev_revision != 0 AND
94103
kp.id <= ?
95104
UNION
96105
SELECT kd.id AS id
97-
FROM kine AS kd
106+
FROM %s AS kd
98107
WHERE
99108
kd.deleted != 0 AND
100109
kd.id <= ?
101110
) AS ks
102-
ON kv.id = ks.id`
111+
ON kv.id = ks.id`, tableName, tableName, tableName)
103112
dialect.TranslateErr = func(err error) error {
104113
if err, ok := err.(*mysql.MySQLError); ok && err.Number == 1062 {
105114
return server.ErrKeyExists
@@ -115,24 +124,24 @@ func New(ctx context.Context, cfg *drivers.Config) (bool, server.Backend, error)
115124
}
116125
return err.Error()
117126
}
118-
if err := setup(dialect.DB); err != nil {
127+
if err := setup(dialect.DB, tableName); err != nil {
119128
return false, nil, err
120129
}
121130

122131
dialect.Migrate(context.Background())
123132
return true, logstructured.New(sqllog.New(dialect)), nil
124133
}
125134

126-
func setup(db *sql.DB) error {
135+
func setup(db *sql.DB, tableName string) error {
127136
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")
128137
var exists bool
129-
err := db.QueryRow("SELECT 1 FROM information_schema.TABLES WHERE table_schema = DATABASE() AND table_name = ?", "kine").Scan(&exists)
138+
err := db.QueryRow("SELECT 1 FROM information_schema.TABLES WHERE table_schema = DATABASE() AND table_name = ?", tableName).Scan(&exists)
130139
if err != nil && err != sql.ErrNoRows {
131-
logrus.Warnf("Failed to check existence of database table %s, going to attempt create: %v", "kine", err)
140+
logrus.Warnf("Failed to check existence of database table %s, going to attempt create: %v", tableName, err)
132141
}
133142

134143
if !exists {
135-
for _, stmt := range schema {
144+
for _, stmt := range getSchema(tableName) {
136145
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
137146
if _, err := db.Exec(stmt); err != nil {
138147
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 {
@@ -146,7 +155,7 @@ func setup(db *sql.DB) error {
146155
// Note that the schema created by the `schema` var is always the latest revision;
147156
// migrations should handle deltas between prior schema versions.
148157
schemaVersion, _ := strconv.ParseUint(os.Getenv("KINE_SCHEMA_MIGRATION"), 10, 64)
149-
for i, stmt := range schemaMigrations {
158+
for i, stmt := range getSchemaMigrations(tableName) {
150159
if i >= int(schemaVersion) {
151160
break
152161
}

pkg/drivers/pgsql/pgsql.go

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ const (
2828
defaultDSN = "postgres://postgres:postgres@localhost/"
2929
)
3030

31-
var (
32-
schema = []string{
33-
`CREATE TABLE IF NOT EXISTS kine
31+
func getSchema(tableName string) []string {
32+
return []string{
33+
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s
3434
(
3535
id BIGSERIAL PRIMARY KEY,
3636
name text COLLATE "C",
@@ -41,23 +41,27 @@ var (
4141
lease INTEGER,
4242
value bytea,
4343
old_value bytea
44-
);`,
44+
);`, tableName),
4545

46-
`CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`,
47-
`CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`,
48-
`CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`,
49-
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
50-
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
51-
`CREATE INDEX IF NOT EXISTS kine_list_query_index on kine(name, id DESC, deleted)`,
46+
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_name_index ON %s (name)`, tableName, tableName),
47+
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_name_id_index ON %s (name,id)`, tableName, tableName),
48+
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_id_deleted_index ON %s (id,deleted)`, tableName, tableName),
49+
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_prev_revision_index ON %s (prev_revision)`, tableName, tableName),
50+
fmt.Sprintf(`CREATE UNIQUE INDEX IF NOT EXISTS %s_name_prev_revision_uindex ON %s (name, prev_revision)`, tableName, tableName),
51+
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_list_query_index on %s(name, id DESC, deleted)`, tableName, tableName),
5252
}
53-
schemaMigrations = []string{
54-
`ALTER TABLE kine ALTER COLUMN id SET DATA TYPE BIGINT, ALTER COLUMN create_revision SET DATA TYPE BIGINT, ALTER COLUMN prev_revision SET DATA TYPE BIGINT; ALTER SEQUENCE kine_id_seq AS BIGINT`,
53+
}
54+
55+
func getSchemaMigrations(tableName string) []string {
56+
return []string{
57+
fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN id SET DATA TYPE BIGINT, ALTER COLUMN create_revision SET DATA TYPE BIGINT, ALTER COLUMN prev_revision SET DATA TYPE BIGINT; ALTER SEQUENCE %s_id_seq AS BIGINT`, tableName, tableName),
5558
// It is important to set the collation to "C" to ensure that LIKE and COMPARISON
5659
// queries use the index.
57-
`ALTER TABLE kine ALTER COLUMN name SET DATA TYPE TEXT COLLATE "C" USING name::TEXT COLLATE "C"`,
60+
fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN name SET DATA TYPE TEXT COLLATE "C" USING name::TEXT COLLATE "C"`, tableName),
5861
}
59-
createDB = `CREATE DATABASE "%s";`
60-
)
62+
}
63+
64+
var createDB = `CREATE DATABASE "%s";`
6165

6266
func New(ctx context.Context, cfg *drivers.Config) (bool, server.Backend, error) {
6367
parsedDSN, err := prepareDSN(cfg.DataSourceName, cfg.BackendTLSConfig)
@@ -73,67 +77,74 @@ func New(ctx context.Context, cfg *drivers.Config) (bool, server.Backend, error)
7377
if err != nil {
7478
return false, nil, err
7579
}
76-
listSQL := `
80+
81+
tableName := cfg.TableName
82+
if tableName == "" {
83+
tableName = "kine"
84+
}
85+
86+
listSQL := fmt.Sprintf(`
7787
SELECT
78-
(SELECT MAX(rkv.id) AS id FROM kine AS rkv),
79-
(SELECT MAX(crkv.prev_revision) AS prev_revision FROM kine AS crkv WHERE crkv.name = 'compact_rev_key'),
88+
(SELECT MAX(rkv.id) AS id FROM %s AS rkv),
89+
(SELECT MAX(crkv.prev_revision) AS prev_revision FROM %s AS crkv WHERE crkv.name = 'compact_rev_key'),
8090
maxkv.*
8191
FROM (
8292
SELECT DISTINCT ON (name)
8393
kv.id AS theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value
8494
FROM
85-
kine AS kv
95+
%s AS kv
8696
WHERE
8797
kv.name LIKE ?
88-
%s
98+
%%s
8999
ORDER BY kv.name, theid DESC
90100
) AS maxkv
91101
WHERE
92102
maxkv.deleted = 0 OR ?
93103
ORDER BY maxkv.name, maxkv.theid DESC
94-
`
104+
`, tableName, tableName, tableName)
95105

96-
countSQL := `
106+
countSQL := fmt.Sprintf(`
97107
SELECT
98-
(SELECT MAX(rkv.id) AS id FROM kine AS rkv),
108+
(SELECT MAX(rkv.id) AS id FROM %s AS rkv),
99109
COUNT(c.theid)
100110
FROM (
101111
SELECT DISTINCT ON (name)
102112
kv.id AS theid, kv.deleted
103-
FROM kine AS kv
113+
FROM %s AS kv
104114
WHERE
105115
kv.name LIKE ?
106-
%s
116+
%%s
107117
ORDER BY kv.name, theid DESC
108118
) AS c
109119
WHERE c.deleted = 0 OR ?
110-
`
111-
dialect.GetSizeSQL = `SELECT pg_total_relation_size('kine')`
112-
dialect.CompactSQL = `
113-
DELETE FROM kine AS kv
120+
`, tableName, tableName)
121+
122+
dialect.GetSizeSQL = fmt.Sprintf(`SELECT pg_total_relation_size('%s')`, tableName)
123+
dialect.CompactSQL = fmt.Sprintf(`
124+
DELETE FROM %s AS kv
114125
USING (
115126
SELECT kp.prev_revision AS id
116-
FROM kine AS kp
127+
FROM %s AS kp
117128
WHERE
118129
kp.name != 'compact_rev_key' AND
119130
kp.prev_revision != 0 AND
120131
kp.id <= $1
121132
UNION
122133
SELECT kd.id AS id
123-
FROM kine AS kd
134+
FROM %s AS kd
124135
WHERE
125136
kd.deleted != 0 AND
126137
kd.id <= $2
127138
) AS ks
128-
WHERE kv.id = ks.id`
139+
WHERE kv.id = ks.id`, tableName, tableName, tableName)
129140
dialect.GetCurrentSQL = q(fmt.Sprintf(listSQL, "AND kv.name > ?"))
130141
dialect.ListRevisionStartSQL = q(fmt.Sprintf(listSQL, "AND kv.id <= ?"))
131142
dialect.GetRevisionAfterSQL = q(fmt.Sprintf(listSQL, "AND kv.name > ? AND kv.id <= ?"))
132143
dialect.CountCurrentSQL = q(fmt.Sprintf(countSQL, "AND kv.name > ?"))
133144
dialect.CountRevisionSQL = q(fmt.Sprintf(countSQL, "AND kv.name > ? AND kv.id <= ?"))
134145
dialect.FillRetryDuration = time.Millisecond + 5
135146
dialect.InsertRetry = func(err error) bool {
136-
if err, ok := err.(*pgconn.PgError); ok && err.Code == pgerrcode.UniqueViolation && err.ConstraintName == "kine_pkey" {
147+
if err, ok := err.(*pgconn.PgError); ok && err.Code == pgerrcode.UniqueViolation && err.ConstraintName == fmt.Sprintf("%s_pkey", cfg.TableName) {
137148
return true
138149
}
139150
return false
@@ -154,15 +165,15 @@ func New(ctx context.Context, cfg *drivers.Config) (bool, server.Backend, error)
154165
return err.Error()
155166
}
156167

157-
if err := setup(dialect.DB); err != nil {
168+
if err := setup(dialect.DB, tableName); err != nil {
158169
return false, nil, err
159170
}
160171

161172
dialect.Migrate(context.Background())
162173
return true, logstructured.New(sqllog.New(dialect)), nil
163174
}
164175

165-
func setup(db *sql.DB) error {
176+
func setup(db *sql.DB, tableName string) error {
166177
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")
167178
var version string
168179
collationSupported := true
@@ -173,7 +184,7 @@ func setup(db *sql.DB) error {
173184
collationSupported = false
174185
}
175186

176-
for _, stmt := range schema {
187+
for _, stmt := range getSchema(tableName) {
177188
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
178189
if !collationSupported {
179190
stmt = strings.ReplaceAll(stmt, ` COLLATE "C"`, "")
@@ -187,7 +198,7 @@ func setup(db *sql.DB) error {
187198
// Note that the schema created by the `schema` var is always the latest revision;
188199
// migrations should handle deltas between prior schema versions.
189200
schemaVersion, _ := strconv.ParseUint(os.Getenv("KINE_SCHEMA_MIGRATION"), 10, 64)
190-
for i, stmt := range schemaMigrations {
201+
for i, stmt := range getSchemaMigrations(tableName) {
191202
if i >= int(schemaVersion) {
192203
break
193204
}

0 commit comments

Comments
 (0)