Skip to content

Commit d088b39

Browse files
committed
release: v0.7.2
1 parent 1b3c32f commit d088b39

4 files changed

Lines changed: 241 additions & 9 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
# Changelog
22

3-
## 0.7.2 - Unreleased
3+
## 0.7.2 - 2026-05-11
44

55
### Changes
66

7+
### Fixes
8+
9+
- Kept Git snapshot imports incremental when metadata, attachment, mention, or event tables changed alongside the message tail, avoiding full archive replays for routine updates.
10+
711
## 0.7.1 - 2026-05-11
812

913
### Changes

internal/share/share.go

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ func ImportIncremental(ctx context.Context, s *store.Store, opts Options, previo
320320
}
321321
return manifest, false, nil
322322
}
323-
opts.reportProgress(ImportProgress{Phase: "start", TotalRows: manifestRowCount(manifest)})
323+
opts.reportProgress(ImportProgress{Phase: "start", TotalRows: importPlanRowCount(plan)})
324324
restorePragmas, err := applyImportPragmas(ctx, s.DB())
325325
if err != nil {
326326
return Manifest{}, false, err
@@ -370,14 +370,27 @@ func ImportIncremental(ctx context.Context, s *store.Store, opts Options, previo
370370
}); err != nil {
371371
return Manifest{}, false, err
372372
}
373+
rebuildMessageFTS, rebuildMemberFTS := importPlanSearchRebuilds(plan)
374+
if rebuildMessageFTS {
375+
opts.reportProgress(ImportProgress{Phase: "rebuild_fts"})
376+
if err := s.RebuildMessageSearchIndex(ctx); err != nil {
377+
return Manifest{}, false, err
378+
}
379+
}
380+
if rebuildMemberFTS {
381+
opts.reportProgress(ImportProgress{Phase: "rebuild_member_fts"})
382+
if err := s.RebuildMemberSearchIndex(ctx); err != nil {
383+
return Manifest{}, false, err
384+
}
385+
}
373386
if err := MarkImported(ctx, s, manifest); err != nil {
374387
return Manifest{}, false, err
375388
}
376389
if err := restorePragmas(ctx); err != nil {
377390
return Manifest{}, false, err
378391
}
379392
pragmasRestored = true
380-
opts.reportProgress(ImportProgress{Phase: "done", TotalRows: manifestRowCount(manifest)})
393+
opts.reportProgress(ImportProgress{Phase: "done", TotalRows: importPlanRowCount(plan)})
381394
return manifest, true, nil
382395
}
383396

@@ -398,6 +411,41 @@ func manifestRowCount(manifest Manifest) int {
398411
return total
399412
}
400413

414+
func importPlanRowCount(plan snapshot.ImportPlan) int {
415+
if plan.Full {
416+
return 0
417+
}
418+
total := 0
419+
for _, tablePlan := range plan.Tables {
420+
switch tablePlan.Mode {
421+
case snapshot.TableImportReplace:
422+
total += tablePlan.Table.Rows
423+
case snapshot.TableImportFiles:
424+
for _, file := range tablePlan.Files {
425+
total += file.Rows
426+
}
427+
}
428+
}
429+
return total
430+
}
431+
432+
func importPlanSearchRebuilds(plan snapshot.ImportPlan) (bool, bool) {
433+
rebuildMessageFTS := false
434+
rebuildMemberFTS := false
435+
for _, tablePlan := range plan.Tables {
436+
if tablePlan.Mode == snapshot.TableImportSkip {
437+
continue
438+
}
439+
switch tablePlan.Table.Name {
440+
case "channels":
441+
rebuildMessageFTS = true
442+
case "members":
443+
rebuildMemberFTS = true
444+
}
445+
}
446+
return rebuildMessageFTS, rebuildMemberFTS
447+
}
448+
401449
func ImportEmbeddings(ctx context.Context, s *store.Store, opts Options, manifest Manifest) error {
402450
tx, err := s.DB().BeginTx(ctx, nil)
403451
if err != nil {
@@ -590,19 +638,21 @@ func shareIncrementalPlan(plan snapshot.ImportPlan) (snapshot.ImportPlan, bool)
590638
switch tablePlan.Table.Name {
591639
case "messages":
592640
out.Tables = append(out.Tables, tablePlan)
593-
case "sync_state":
641+
case "guilds", "channels", "members", "message_events", "message_attachments", "mention_events", "sync_state":
594642
tablePlan.Mode = snapshot.TableImportReplace
595643
tablePlan.Files = nil
596-
tablePlan.Reason = "replace sync_state to avoid stale cursors"
644+
tablePlan.Reason = "replace changed " + tablePlan.Table.Name + " snapshot table"
597645
out.Tables = append(out.Tables, tablePlan)
598646
default:
599647
return plan, false
600648
}
601649
case snapshot.TableImportReplace:
602-
if tablePlan.Table.Name != "sync_state" {
650+
switch tablePlan.Table.Name {
651+
case "guilds", "channels", "members", "message_events", "message_attachments", "mention_events", "sync_state":
652+
out.Tables = append(out.Tables, tablePlan)
653+
default:
603654
return plan, false
604655
}
605-
out.Tables = append(out.Tables, tablePlan)
606656
default:
607657
return plan, false
608658
}

internal/share/share_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,121 @@ func TestImportIfChangedUsesIncrementalTailImport(t *testing.T) {
133133
require.Contains(t, state, `"file_manifests"`)
134134
}
135135

136+
func TestImportIfChangedUsesMixedIncrementalPlanForMetadataChanges(t *testing.T) {
137+
ctx := context.Background()
138+
src := seedStore(t, filepath.Join(t.TempDir(), "src.db"))
139+
defer func() { _ = src.Close() }()
140+
141+
repo := filepath.Join(t.TempDir(), "share")
142+
manifest, err := Export(ctx, src, Options{RepoPath: repo, Branch: "main"})
143+
require.NoError(t, err)
144+
145+
dst, err := store.Open(ctx, filepath.Join(t.TempDir(), "dst.db"))
146+
require.NoError(t, err)
147+
defer func() { _ = dst.Close() }()
148+
_, changed, err := ImportIfChanged(ctx, dst, Options{RepoPath: repo, Branch: "main"})
149+
require.NoError(t, err)
150+
require.True(t, changed)
151+
152+
now := time.Now().UTC().Format(time.RFC3339Nano)
153+
require.NoError(t, src.UpsertChannel(ctx, store.ChannelRecord{ID: "c1", GuildID: "g1", Kind: "text", Name: "launch", RawJSON: `{}`}))
154+
require.NoError(t, src.UpsertMember(ctx, store.MemberRecord{
155+
GuildID: "g1",
156+
UserID: "u1",
157+
Username: "peter",
158+
DisplayName: "Launch Peter",
159+
RoleIDsJSON: `[]`,
160+
RawJSON: `{"bio":"delta member"}`,
161+
}))
162+
require.NoError(t, src.UpsertMessages(ctx, []store.MessageMutation{{
163+
Record: store.MessageRecord{
164+
ID: "m2",
165+
GuildID: "g1",
166+
ChannelID: "c1",
167+
ChannelName: "launch",
168+
AuthorID: "u1",
169+
AuthorName: "Peter",
170+
MessageType: 0,
171+
CreatedAt: now,
172+
Content: "mixed delta landed",
173+
NormalizedContent: "mixed delta landed",
174+
RawJSON: `{"author":{"username":"Peter"}}`,
175+
},
176+
EventType: "upsert",
177+
PayloadJSON: `{"id":"m2"}`,
178+
Options: store.WriteOptions{AppendEvent: true},
179+
Attachments: []store.AttachmentRecord{{
180+
AttachmentID: "a2",
181+
MessageID: "m2",
182+
GuildID: "g1",
183+
ChannelID: "c1",
184+
AuthorID: "u1",
185+
Filename: "delta.txt",
186+
TextContent: "attached delta",
187+
}},
188+
Mentions: []store.MentionEventRecord{{
189+
MessageID: "m2",
190+
GuildID: "g1",
191+
ChannelID: "c1",
192+
AuthorID: "u1",
193+
TargetType: "role",
194+
TargetID: "r2",
195+
TargetName: "Launch",
196+
EventAt: now,
197+
}},
198+
}}))
199+
updated, err := Export(ctx, src, Options{RepoPath: repo, Branch: "main"})
200+
require.NoError(t, err)
201+
require.NotEqual(t, manifest.GeneratedAt, updated.GeneratedAt)
202+
203+
previous, ok := PreviousImportedManifest(ctx, dst, Options{RepoPath: repo, Branch: "main"})
204+
require.True(t, ok)
205+
planned, supported := shareIncrementalPlan(snapshot.PlanIncrementalImport(snapshotManifest(previous), snapshotManifest(updated)))
206+
require.True(t, supported, "%+v", planned)
207+
require.Equal(t, snapshot.TableImportReplace, importPlanTable(t, planned, "channels").Mode)
208+
require.Equal(t, snapshot.TableImportReplace, importPlanTable(t, planned, "members").Mode)
209+
require.Equal(t, snapshot.TableImportFiles, importPlanTable(t, planned, "messages").Mode)
210+
require.Equal(t, snapshot.TableImportReplace, importPlanTable(t, planned, "message_events").Mode)
211+
require.Equal(t, snapshot.TableImportReplace, importPlanTable(t, planned, "message_attachments").Mode)
212+
require.Equal(t, snapshot.TableImportReplace, importPlanTable(t, planned, "mention_events").Mode)
213+
214+
var progress []ImportProgress
215+
imported, changed, err := ImportIfChanged(ctx, dst, Options{
216+
RepoPath: repo,
217+
Branch: "main",
218+
Progress: func(p ImportProgress) { progress = append(progress, p) },
219+
})
220+
require.NoError(t, err)
221+
require.True(t, changed)
222+
require.Equal(t, updated.GeneratedAt, imported.GeneratedAt)
223+
require.Contains(t, progressPhases(progress), "rebuild_fts")
224+
require.Contains(t, progressPhases(progress), "rebuild_member_fts")
225+
require.Equal(t, importPlanRowCount(planned), progressTotalRows(t, progress, "start"))
226+
require.Positive(t, progressTotalRows(t, progress, "start"))
227+
228+
results, err := dst.SearchMessages(ctx, store.SearchOptions{Query: "checklist", Channel: "launch", Limit: 10})
229+
require.NoError(t, err)
230+
require.Len(t, results, 1)
231+
require.Equal(t, "m1", results[0].MessageID)
232+
233+
results, err = dst.SearchMessages(ctx, store.SearchOptions{Query: "mixed delta", Limit: 10})
234+
require.NoError(t, err)
235+
require.Len(t, results, 1)
236+
require.Equal(t, "m2", results[0].MessageID)
237+
_, rows, err := dst.ReadOnlyQuery(ctx, "select name from channels where id = 'c1'")
238+
require.NoError(t, err)
239+
require.Equal(t, "launch", rows[0][0])
240+
_, rows, err = dst.ReadOnlyQuery(ctx, "select count(*) from mention_events")
241+
require.NoError(t, err)
242+
require.Equal(t, "2", rows[0][0])
243+
_, rows, err = dst.ReadOnlyQuery(ctx, "select count(*) from message_events")
244+
require.NoError(t, err)
245+
require.Equal(t, "2", rows[0][0])
246+
_, rows, err = dst.ReadOnlyQuery(ctx, "select count(*) from member_fts where member_fts match 'delta'")
247+
require.NoError(t, err)
248+
require.Equal(t, "1", rows[0][0])
249+
}
250+
136251
func TestImportIfChangedInfersLegacyManifestFilesFromGit(t *testing.T) {
137252
ctx := context.Background()
138253
src := seedStore(t, filepath.Join(t.TempDir(), "src.db"))
@@ -1261,6 +1376,17 @@ func tableEntry(t *testing.T, manifest Manifest, name string) TableManifest {
12611376
return TableManifest{}
12621377
}
12631378

1379+
func importPlanTable(t *testing.T, plan snapshot.ImportPlan, name string) snapshot.TableImportPlan {
1380+
t.Helper()
1381+
for _, table := range plan.Tables {
1382+
if table.Table.Name == name {
1383+
return table
1384+
}
1385+
}
1386+
t.Fatalf("plan table %s not found", name)
1387+
return snapshot.TableImportPlan{}
1388+
}
1389+
12641390
func tableNames(manifest Manifest) []string {
12651391
names := make([]string, 0, len(manifest.Tables))
12661392
for _, table := range manifest.Tables {
@@ -1276,3 +1402,14 @@ func progressPhases(progress []ImportProgress) []string {
12761402
}
12771403
return phases
12781404
}
1405+
1406+
func progressTotalRows(t *testing.T, progress []ImportProgress, phase string) int {
1407+
t.Helper()
1408+
for _, item := range progress {
1409+
if item.Phase == phase {
1410+
return item.TotalRows
1411+
}
1412+
}
1413+
t.Fatalf("progress phase %s not found", phase)
1414+
return 0
1415+
}

internal/store/store.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,15 +206,56 @@ func (s *Store) RebuildSearchIndexes(ctx context.Context) error {
206206
if err := s.rebuildMemberFTS(ctx); err != nil {
207207
return err
208208
}
209+
return s.stampSearchIndexVersions(ctx, true, true)
210+
}
211+
212+
func (s *Store) RebuildMessageSearchIndex(ctx context.Context) error {
213+
if err := s.rebuildFTS(ctx); err != nil {
214+
return err
215+
}
216+
return s.stampSearchIndexVersions(ctx, true, false)
217+
}
218+
219+
func (s *Store) RebuildMemberSearchIndex(ctx context.Context) error {
220+
if err := s.rebuildMemberFTS(ctx); err != nil {
221+
return err
222+
}
223+
return s.stampSearchIndexVersions(ctx, false, true)
224+
}
225+
226+
func (s *Store) stampSearchIndexVersions(ctx context.Context, message, member bool) error {
209227
now := time.Now().UTC().Format(timeLayout)
210-
if _, err := s.db.ExecContext(ctx, `
228+
switch {
229+
case message && member:
230+
if _, err := s.db.ExecContext(ctx, `
211231
insert into sync_state(scope, cursor, updated_at)
212232
values(?, ?, ?), (?, ?, ?)
213233
on conflict(scope) do update set
214234
cursor=excluded.cursor,
215235
updated_at=excluded.updated_at
216236
`, "schema:message_fts_rowid_version", messageFTSVersion, now, "schema:member_fts_rowid_version", memberFTSVersion, now); err != nil {
217-
return fmt.Errorf("stamp search index versions: %w", err)
237+
return fmt.Errorf("stamp search index versions: %w", err)
238+
}
239+
case message:
240+
if _, err := s.db.ExecContext(ctx, `
241+
insert into sync_state(scope, cursor, updated_at)
242+
values(?, ?, ?)
243+
on conflict(scope) do update set
244+
cursor=excluded.cursor,
245+
updated_at=excluded.updated_at
246+
`, "schema:message_fts_rowid_version", messageFTSVersion, now); err != nil {
247+
return fmt.Errorf("stamp message search index version: %w", err)
248+
}
249+
case member:
250+
if _, err := s.db.ExecContext(ctx, `
251+
insert into sync_state(scope, cursor, updated_at)
252+
values(?, ?, ?)
253+
on conflict(scope) do update set
254+
cursor=excluded.cursor,
255+
updated_at=excluded.updated_at
256+
`, "schema:member_fts_rowid_version", memberFTSVersion, now); err != nil {
257+
return fmt.Errorf("stamp member search index version: %w", err)
258+
}
218259
}
219260
return nil
220261
}

0 commit comments

Comments
 (0)