-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathsync.go
More file actions
182 lines (161 loc) · 4.09 KB
/
sync.go
File metadata and controls
182 lines (161 loc) · 4.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package main
import (
"bufio"
"bytes"
"context"
"fmt"
"net/http"
"time"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/wrappers"
"fiatjaf.com/nostr/nip77"
"github.com/fiatjaf/pyramid/global"
"github.com/fiatjaf/pyramid/layout"
)
func streamingSync(
ctx context.Context,
loggedUser nostr.PubKey,
remoteUrl string,
upload,
download bool,
w http.ResponseWriter,
) {
// set up streaming response
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
writer := bufio.NewWriter(w)
// send initial HTML
initialHTML := bytes.NewBuffer(nil)
layout.Layout(loggedUser, "sync").Render(ctx, initialHTML)
spl := bytes.Split(initialHTML.Bytes(), []byte("<header"))
writer.Write(spl[0])
writer.Write([]byte("<table class='w-full border border-separate'>"))
writer.Flush()
flusher.Flush()
// create a channel to capture progress updates
progress := make(chan string, 100)
done := make(chan error, 1)
// start sync in goroutine
go func() {
// send initial message
progress <- "starting sync"
// create wrappers to adapt IndexingLayer to nostr interfaces
local := wrappers.StorePublisher{
Store: global.IL.Main,
MaxLimit: 1_000_000, // large limit for comprehensive sync
}
var source nostr.Querier = local
var target nostr.Publisher = local
if !download {
target = nil
}
if !upload {
source = nil
}
// use the wrappers as source and target for two-way sync
err := nip77.NegentropySync(
ctx,
remoteUrl,
nostr.Filter{
Authors: []nostr.PubKey{loggedUser},
},
source,
target,
func(ctx context.Context, dir nip77.Direction) {
source := "local"
target := remoteUrl
if dir.From != local {
source = remoteUrl
target = "local"
}
select {
case progress <- "syncing events from " + source + " to " + target:
case <-ctx.Done():
return
}
// this is only necessary because relays are too ratelimiting
batch := make([]nostr.ID, 0, 50)
seen := make(map[nostr.ID]struct{})
for item := range dir.Items {
if _, ok := seen[item]; ok {
continue
}
select {
case progress <- fmt.Sprintf("event %s found on %s", item.Hex(), source):
case <-ctx.Done():
return
}
seen[item] = struct{}{}
batch = append(batch, item)
if len(batch) == 50 {
for evt := range dir.From.QueryEvents(nostr.Filter{IDs: batch}) {
select {
case progress <- fmt.Sprintf("publishing %s to %s", evt, target):
case <-ctx.Done():
return
}
dir.To.Publish(ctx, evt)
}
batch = batch[:0]
}
}
if len(batch) > 0 {
for evt := range dir.From.QueryEvents(nostr.Filter{IDs: batch}) {
select {
case progress <- fmt.Sprintf("publishing %s to %s", evt, target):
case <-ctx.Done():
return
}
dir.To.Publish(ctx, evt)
}
}
})
select {
case done <- err:
case <-ctx.Done():
}
}()
// handle progress updates and completion
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case msg := <-progress:
fmt.Fprintf(writer, `<tr><td class="px-3 border">%s</td></tr>`, msg)
writer.Flush()
flusher.Flush()
case err := <-done:
if err != nil {
fmt.Fprintf(writer, `<tr><td class="px-3 border text-red-500">sync failed: %s</td></tr>`, err.Error())
} else {
fmt.Fprint(writer, `<tr><td class="px-3 border text-emerald-500">sync complete</td></tr>`)
}
writer.Flush()
flusher.Flush()
// close HTML
fmt.Fprint(writer, `
</table>
</body>
</html>`)
writer.Flush()
flusher.Flush()
return
case <-ticker.C:
// periodic flush to keep connection alive
writer.Flush()
flusher.Flush()
case <-ctx.Done():
fmt.Fprint(writer, `<tr><td class="px-3 border text-amber-500">sync interrupted</td></tr>`)
writer.Flush()
flusher.Flush()
return
}
}
}