-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathprovisioning.go
More file actions
188 lines (144 loc) · 6.68 KB
/
provisioning.go
File metadata and controls
188 lines (144 loc) · 6.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package ppm
import (
"errors"
"fmt"
"time"
"github.com/qonto/postgresql-partition-manager/internal/infra/partition"
"github.com/qonto/postgresql-partition-manager/internal/infra/postgresql"
"github.com/qonto/postgresql-partition-manager/internal/infra/retry"
"github.com/qonto/postgresql-partition-manager/internal/infra/uuid7"
)
var ErrPartitionProvisioningFailed = errors.New("partition provisioning failed for one or more partition")
func (p PPM) ProvisioningPartitions() error {
provisioningFailed := false
for name, config := range p.partitions {
p.logger.Info("Provisioning partition", "partition", name)
if err := p.provisionPartitionsFor(config, p.workDate); err != nil {
p.logger.Error("Failed to provision partitions", "error", err, "schema", config.Schema, "table", config.Table)
provisioningFailed = true
}
}
if provisioningFailed {
return ErrPartitionProvisioningFailed
}
p.logger.Info("All partitions are correctly provisioned")
return nil
}
func (p PPM) provisionPartitionsFor(config partition.Configuration, at time.Time) error {
foundPartitions, err := p.ListPartitions(config.Schema, config.Table)
if err != nil {
return fmt.Errorf("could not list partitions: %w", err)
}
partitions, err := getExpectedPartitions(config, at)
if err != nil {
return fmt.Errorf("could not generate partition to create: %w", err)
}
currentRange, err := p.getGlobalRange(foundPartitions)
if err != nil {
return fmt.Errorf("could not evaluate existing ranges: %w", err)
}
p.logger.Info("Current ", "c_range", currentRange.String())
expectedRange, err := p.getGlobalRange(partitions)
if err != nil {
return fmt.Errorf("could not evaluate ranges to create: %w", err)
}
p.logger.Info("Expected", "e_range", expectedRange)
if expectedRange.IsEqual(currentRange) {
// If expected and current ranges are the same, there is no partition to create
return nil
}
for _, candidate := range partitions {
p.logger.Info("Candidate", "range", partition.Bounds(candidate.LowerBound, candidate.UpperBound))
if !candidate.UpperBound.After(currentRange.LowerBound) || !candidate.LowerBound.Before(currentRange.UpperBound) {
// no intersection between candidate and existing: create new partition
p.logger.Info("No intersection", "create-range", partition.Bounds(candidate.LowerBound, candidate.UpperBound))
err = p.CreatePartition(config, candidate)
}
if err == nil && candidate.LowerBound.Before(currentRange.LowerBound) && candidate.UpperBound.After(currentRange.LowerBound) {
// left segment of the candidate outside, of the intersection with existing partitions
segLeft := candidate
segLeft.UpperBound = currentRange.LowerBound
segLeft.Name = fmt.Sprintf("%s_%s_%s", config.Table, segLeft.LowerBound.Format("20060102"), segLeft.UpperBound.Format("20060102"))
p.logger.Info("Left intersection", "create-range", partition.Bounds(segLeft.LowerBound, segLeft.UpperBound))
err = p.CreatePartition(config, segLeft)
}
if err == nil && candidate.UpperBound.After(currentRange.UpperBound) && candidate.LowerBound.Before(currentRange.UpperBound) {
// right segment of the candidate, outside of the intersection with existing partitions
segRight := candidate
segRight.LowerBound = currentRange.UpperBound
segRight.Name = fmt.Sprintf("%s_%s_%s", config.Table, segRight.LowerBound.Format("20060102"), segRight.UpperBound.Format("20060102"))
p.logger.Info("Right intersection", "create-range", partition.Bounds(segRight.LowerBound, segRight.UpperBound))
err = p.CreatePartition(config, segRight)
}
if err != nil {
p.logger.Error("Failed to create partition", "error", err)
return ErrPartitionProvisioningFailed
}
}
return nil
}
func (p PPM) CreatePartition(partitionConfiguration partition.Configuration, partition partition.Partition) error {
p.logger.Debug("Creating partition", "schema", partition.Schema, "table", partition.Name)
_, partitionKey, err := p.db.GetPartitionSettings(partition.Schema, partition.ParentTable)
if err != nil {
return fmt.Errorf("failed to get partition settings: %w", err)
}
partitionKeyType, err := p.db.GetColumnDataType(partition.Schema, partition.ParentTable, partitionKey)
if err != nil {
return fmt.Errorf("failed to get partition key details: %w", err)
}
tableExists, err := p.db.IsTableExists(partition.Schema, partition.Name)
if err != nil {
return fmt.Errorf("failed to check if table exists: %w", err)
}
if !tableExists {
err := p.db.CreateTableLikeTable(partition.Schema, partition.Name, partition.ParentTable)
if err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
p.logger.Info("Table created", "schema", partition.Schema, "table", partition.Name)
} else {
p.logger.Info("Table already exists, skip", "schema", partition.Schema, "table", partition.Name)
}
partitionAttached, err := p.db.IsPartitionAttached(partition.Schema, partition.Name)
if err != nil {
return fmt.Errorf("failed to check partition attachment status: %w", err)
}
if partitionAttached {
p.logger.Info("Table is already attached to the parent table, skip", "schema", partition.Schema, "table", partition.Name)
return nil
}
var lowerBound, upperBound string
switch partitionKeyType {
case postgresql.Date:
lowerBound = partition.LowerBound.Format("2006-01-02")
upperBound = partition.UpperBound.Format("2006-01-02")
case postgresql.DateTime, postgresql.DateTimeWithTZ:
lowerBound = partition.LowerBound.Format("2006-01-02 00:00:00")
upperBound = partition.UpperBound.Format("2006-01-02 00:00:00")
case postgresql.UUID:
lowerBound = uuid7.FromTime(partition.LowerBound)
upperBound = uuid7.FromTime(partition.UpperBound)
default:
return ErrUnsupportedPartitionStrategy
}
maxRetries := 3
err = retry.WithRetry(maxRetries, func(attempt int) error {
err := p.db.AttachPartition(partition.Schema, partition.Name, partition.ParentTable, lowerBound, upperBound)
if err != nil {
p.logger.Warn("fail to attach partition", "error", err, "schema", partition.Schema, "table", partition.Name, "attempt", attempt, "max_retries", maxRetries)
return fmt.Errorf("fail to attach partition: %w", err)
}
err = p.db.SetPartitionReplicaIdentity(partition.Schema, partition.Name, partition.ParentTable)
if err != nil {
p.logger.Warn("failed to set replica identity", "error", err, "schema", partition.Schema, "table", partition.Name, "attempt", attempt, "max_retries", maxRetries)
return fmt.Errorf("fail to set replica identity: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("failed to attach partition after retries: %w", err)
}
p.logger.Info("Partition attached to parent table", "schema", partition.Schema, "table", partition.Name, "parent_table", partition.ParentTable)
return nil
}