Skip to content

Commit 22e02ca

Browse files
committed
add Aerospike example
1 parent c001624 commit 22e02ca

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

examples/aerospike/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
aerospike

examples/aerospike/main.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
aero "github.com/aerospike/aerospike-client-go"
9+
"github.com/reugn/go-streams"
10+
ext "github.com/reugn/go-streams/extension"
11+
"github.com/reugn/go-streams/flow"
12+
)
13+
14+
func main() {
15+
properties := &ext.AerospikeProperties{
16+
Policy: nil,
17+
Hostname: "localhost",
18+
Port: 3000,
19+
Namespase: "test",
20+
SetName: "streams",
21+
}
22+
ctx, cancelFunc := context.WithCancel(context.Background())
23+
24+
timer := time.NewTimer(time.Minute)
25+
go func() {
26+
select {
27+
case <-timer.C:
28+
cancelFunc()
29+
}
30+
}()
31+
32+
cnProperties := &ext.ChangeNotificationProperties{PollingInterval: time.Second * 3}
33+
source, err := ext.NewAerospikeSource(ctx, properties, nil, cnProperties)
34+
streams.Check(err)
35+
flow1 := flow.NewMap(transform, 1)
36+
sink, err := ext.NewAerospikeSink(ctx, properties, nil)
37+
streams.Check(err)
38+
39+
source.Via(flow1).To(sink)
40+
}
41+
42+
var transform = func(in interface{}) interface{} {
43+
msg := in.(*aero.Record)
44+
fmt.Println(msg.Bins)
45+
msg.Bins["ts"] = streams.NowNano()
46+
return ext.AerospikeKeyBins{Key: msg.Key, Bins: msg.Bins}
47+
}

0 commit comments

Comments
 (0)