Skip to content

Commit c15e396

Browse files
authored
Chore: Fix snapshot upload filter for incompatible SQL statements (#1186)
* Fix pgsql database name * Change extension filter to list of filters and stops * Revert "Fix pgsql database name" This reverts commit 30737b1. * Update comments, function name, and file name for clarity
1 parent 806985b commit c15e396

File tree

5 files changed

+346
-147
lines changed

5 files changed

+346
-147
lines changed

cmd/src/snapshot_upload.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -152,30 +152,30 @@ func copyDumpToBucket(ctx context.Context, src io.ReadSeeker, stat fs.FileInfo,
152152
// To assert against actual file size
153153
var totalWritten int64
154154

155-
// Do a partial copy that trims out unwanted statements
155+
// Do a partial copy, that filters out incompatible statements
156156
if trimExtensions {
157-
written, err := pgdump.PartialCopyWithoutExtensions(object, src, progressFn)
157+
written, err := pgdump.FilterInvalidLines(object, src, progressFn)
158158
if err != nil {
159-
return errors.Wrap(err, "trim extensions and upload")
159+
return errors.Wrap(err, "filter out incompatible statements and upload")
160160
}
161161
totalWritten += written
162162
}
163163

164-
// io.Copy is the best way to copy from a reader to writer in Go, and storage.Writer
165-
// has its own chunking mechanisms internally. io.Reader is stateful, so this copy
166-
// will just continue from where we left off if we use copyAndTrimExtensions.
164+
// io.Copy is the best way to copy from a reader to writer in Go,
165+
// storage.Writer has its own chunking mechanisms internally.
166+
// io.Reader is stateful, so this copy will just continue from where FilterInvalidLines left off, if used
167167
written, err := io.Copy(object, src)
168168
if err != nil {
169169
return errors.Wrap(err, "upload")
170170
}
171171
totalWritten += written
172172

173-
// Progress is not called on completion of io.Copy, so we call it manually after to
174-
// update our pretty progress bars.
173+
// Progress is not called on completion of io.Copy,
174+
// so we call it manually after to update our pretty progress bars.
175175
progressFn(written)
176176

177-
// Validate we have sent all data. copyAndTrimExtensions may add some bytes, so the
178-
// check is not a strict equality.
177+
// Validate we have sent all data.
178+
// FilterInvalidLines may add some bytes, so the check is not a strict equality.
179179
size := stat.Size()
180180
if totalWritten < size {
181181
return errors.Newf("expected to write %d bytes, but actually wrote %d bytes (diff: %d bytes)",

internal/pgdump/extensions.go

Lines changed: 0 additions & 67 deletions
This file was deleted.

internal/pgdump/extensions_test.go

Lines changed: 0 additions & 70 deletions
This file was deleted.
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package pgdump
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"io"
7+
8+
"github.com/sourcegraph/sourcegraph/lib/errors"
9+
)
10+
11+
// FilterInvalidLines copies the initial lines of the pg_dump-created .sql files,
12+
// from src to dst (the GCS bucket),
13+
// until it hits a line prefixed with a filterEndMarker,
14+
// while commenting out the linesToFilter which cause `gcloud sql import` to error out.
15+
// It then resets src to the position of the last contents written to dst.
16+
//
17+
// Filtering requires reading entire lines into memory,
18+
// this can be a very expensive operation, so when filtering is complete,
19+
// the more efficient io.Copy is used to perform the remainder of the copy in the calling funciton
20+
//
21+
// pg_dump writes these .sql files based on its own version,
22+
// not based on the Postgres version of either the source or destination database;
23+
// so self-hosted customers' diverse database environments
24+
// have inserted a variety of statements into the .sql files which cause the import to fail
25+
// For details, see https://cloud.google.com/sql/docs/postgres/import-export/import-export-dmp
26+
func FilterInvalidLines(dst io.Writer, src io.ReadSeeker, progressFn func(int64)) (int64, error) {
27+
var (
28+
reader = bufio.NewReader(src)
29+
30+
// Position we have consumed up to
31+
// Tracked separately because bufio.Reader may have read ahead on src
32+
// This allows us to reset src later
33+
consumed int64
34+
35+
// Number of bytes we have actually written to dst
36+
// It should always be returned
37+
written int64
38+
39+
// Set to true when we start to hit lines which indicate that we may be finished filtering
40+
noMoreLinesToFilter bool
41+
42+
filterEndMarkers = []string{
43+
"CREATE TABLE",
44+
"INSERT INTO",
45+
}
46+
47+
linesToFilter = []string{
48+
49+
"DROP DATABASE",
50+
"CREATE DATABASE",
51+
"COMMENT ON DATABASE",
52+
53+
"DROP SCHEMA",
54+
"CREATE SCHEMA",
55+
"COMMENT ON SCHEMA",
56+
57+
"DROP EXTENSION",
58+
"CREATE EXTENSION",
59+
"COMMENT ON EXTENSION",
60+
61+
"SET transaction_timeout", // pg_dump v17, importing to Postgres 16
62+
63+
"\\connect",
64+
65+
// Cloud instances' databases have been upgraded to Postgres v16.10,
66+
// which should include support for \restrict and \unrestrict
67+
// but leaving in the list in case we need to re-add them
68+
// "\\restrict",
69+
// To handle the \unrestrict command,
70+
// we'd have to add a search from the end of the file
71+
// "\\unrestrict",
72+
// Remove comments after databases are upgraded >= Postgres 17
73+
}
74+
)
75+
76+
for !noMoreLinesToFilter {
77+
78+
// Read up to a line, keeping track of our position in src
79+
line, err := reader.ReadBytes('\n')
80+
consumed += int64(len(line))
81+
82+
// If this function has read through the whole file without hitting a filterEndMarker,
83+
// then handle the last line correctly
84+
if err == io.EOF {
85+
noMoreLinesToFilter = true
86+
87+
// If the reader has found a different error,
88+
// then return what we've processed so far
89+
} else if err != nil {
90+
return written, err
91+
}
92+
93+
// Once we start seeing these lines,
94+
// we are probably done with the invalid statements,
95+
// so we can hand off the rest to the more efficient io.Copy implementation
96+
for _, filterEndMarker := range filterEndMarkers {
97+
98+
if bytes.HasPrefix(line, []byte(filterEndMarker)) {
99+
100+
// We are probably done with the invalid statements
101+
noMoreLinesToFilter = true
102+
break
103+
104+
}
105+
}
106+
107+
if !noMoreLinesToFilter {
108+
109+
for _, lineToFilter := range linesToFilter {
110+
111+
if bytes.HasPrefix(line, []byte(lineToFilter)) {
112+
113+
line = append([]byte("-- "), line...)
114+
break
115+
116+
}
117+
}
118+
}
119+
120+
// Write this line and update our progress before returning on error
121+
lineWritten, err := dst.Write(line)
122+
written += int64(lineWritten)
123+
progressFn(written)
124+
if err != nil {
125+
return written, err
126+
}
127+
}
128+
129+
// No more lines to filter
130+
// Reset src to the last actual consumed position
131+
_, err := src.Seek(consumed, io.SeekStart)
132+
if err != nil {
133+
return written, errors.Wrap(err, "reset src position")
134+
}
135+
return written, nil
136+
}

0 commit comments

Comments
 (0)