Skip to content

Commit b8df119

Browse files
author
Hugo Rojas
committed
Add Segmentio implementation
1 parent 85ad499 commit b8df119

File tree

7 files changed

+169
-3
lines changed

7 files changed

+169
-3
lines changed

cmd/consumer/main.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/hrojas2021/go-kafka-mongodb/pkg/iface"
99
"github.com/hrojas2021/go-kafka-mongodb/pkg/kafka/confluentic"
1010
"github.com/hrojas2021/go-kafka-mongodb/pkg/kafka/sarama"
11+
"github.com/hrojas2021/go-kafka-mongodb/pkg/kafka/segmentio"
1112
)
1213

1314
func main() {
@@ -16,7 +17,7 @@ func main() {
1617
if err != nil {
1718
log.Fatal("unable to connecto to mongoDB ", err)
1819
}
19-
// handler, err := sarama.NewConsumerHandler(cf, db)
20+
2021
handler, err := getBroker(cf, db)
2122
if err != nil {
2223
log.Fatal("unable to create a kafka consumer handler ", err)
@@ -45,8 +46,10 @@ func getBroker(cf *config.Configuration, db *database.DB) (iface.ConsumerHandler
4546
switch cf.BROKER {
4647
case config.Sarama:
4748
broker, err = sarama.NewConsumerHandler(cf, db)
48-
default:
49+
case config.Confluentic:
4950
broker, err = confluentic.NewConsumerHandler(cf, db)
51+
default:
52+
broker, err = segmentio.NewConsumerHandler(cf, db)
5053
}
5154

5255
return broker, err

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/Shopify/sarama v1.38.1
77
github.com/confluentinc/confluent-kafka-go v1.9.2
88
github.com/gorilla/mux v1.8.0
9+
github.com/segmentio/kafka-go v0.4.39
910
github.com/spf13/viper v1.15.0
1011
go.mongodb.org/mongo-driver v1.11.2
1112
)

go.sum

+9
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ github.com/juju/qthttptest v0.1.1/go.mod h1:aTlAv8TYaflIiTDIQYzxnl1QdPjAg8Q8qJME
204204
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
205205
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
206206
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
207+
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
207208
github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc=
208209
github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
209210
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
@@ -232,6 +233,7 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ
232233
github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g=
233234
github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU=
234235
github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek=
236+
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
235237
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
236238
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
237239
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
@@ -250,6 +252,8 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE
250252
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
251253
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
252254
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0=
255+
github.com/segmentio/kafka-go v0.4.39 h1:75smaomhvkYRwtuOwqLsdhgCG30B82NsbdkdDfFbvrw=
256+
github.com/segmentio/kafka-go v0.4.39/go.mod h1:T0MLgygYvmqmBvC+s8aCcbVNfJN4znVne5j0Pzowp/Q=
253257
github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk=
254258
github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
255259
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=
@@ -286,6 +290,10 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k
286290
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
287291
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
288292
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
293+
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
294+
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
295+
github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4=
296+
github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
289297
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
290298
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
291299
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -381,6 +389,7 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v
381389
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
382390
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
383391
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
392+
golang.org/x/net v0.0.0-20220706163947-c90051bbdb60/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
384393
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
385394
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
386395
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=

pkg/kafka/confluentic/producer.go

+2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ func (k *kafkaP) saveJobToKafka(job model.Job) error {
6666
Value: []byte(word),
6767
}, nil)
6868
}
69+
70+
// k.Close()
6971
log.Println("The job event has been created successfully")
7072
return nil
7173
}

pkg/kafka/sarama/producer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (k *kafkaP) saveJobToKafka(job model.Job) error {
7070
if err != nil {
7171
return err
7272
}
73-
73+
// k.Close()
7474
log.Printf("The job event has been created in partition %d and offset %d \n", partition, offset)
7575
return nil
7676
}

pkg/kafka/segmentio/consumer.go

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package segmentio
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
8+
"github.com/hrojas2021/go-kafka-mongodb/pkg/config"
9+
"github.com/hrojas2021/go-kafka-mongodb/pkg/database"
10+
"github.com/hrojas2021/go-kafka-mongodb/pkg/iface"
11+
"github.com/hrojas2021/go-kafka-mongodb/pkg/model"
12+
"github.com/segmentio/kafka-go"
13+
)
14+
15+
type kafkaC struct {
16+
*kafka.Reader
17+
}
18+
19+
type consumerHandler struct {
20+
config *config.Configuration
21+
db *database.DB
22+
kafkaC kafkaC
23+
}
24+
25+
func NewConsumerHandler(cf *config.Configuration, db *database.DB) (iface.ConsumerHandler, error) {
26+
r := kafka.NewReader(kafka.ReaderConfig{
27+
Brokers: []string{cf.KAFKAURL},
28+
Topic: cf.KAFKATOPIC,
29+
})
30+
31+
return &consumerHandler{
32+
db: db,
33+
config: cf,
34+
kafkaC: kafkaC{r},
35+
}, nil
36+
}
37+
38+
func (h *consumerHandler) ReadMessagesFromKafka() error {
39+
return h.kafkaC.readMessagesFromKafka(h)
40+
}
41+
42+
func (h *consumerHandler) Close() error {
43+
return h.kafkaC.Close()
44+
}
45+
46+
func (h *consumerHandler) Subscribe() error {
47+
return nil
48+
}
49+
50+
func (k *kafkaC) readMessagesFromKafka(h *consumerHandler) error {
51+
fmt.Println("Start reading Kafka messages in SEGMENTIO")
52+
var err error
53+
for {
54+
msg, err := k.ReadMessage(context.Background())
55+
if err != nil {
56+
break
57+
}
58+
59+
fmt.Printf("\nReceived msg from Kafka partition with key %d: %s\n", msg.Partition, string(msg.Key))
60+
var job model.Job
61+
err = json.Unmarshal(msg.Value, &job)
62+
if err != nil {
63+
break
64+
}
65+
err = h.db.SaveJob(&job)
66+
if err != nil {
67+
break
68+
}
69+
}
70+
return err
71+
}

pkg/kafka/segmentio/producer.go

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package segmentio
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io/ioutil"
8+
"log"
9+
"net/http"
10+
11+
"github.com/hrojas2021/go-kafka-mongodb/pkg/config"
12+
"github.com/hrojas2021/go-kafka-mongodb/pkg/model"
13+
"github.com/segmentio/kafka-go"
14+
)
15+
16+
type producerHandler struct {
17+
kafkaP kafkaP
18+
}
19+
20+
type kafkaP struct {
21+
*kafka.Writer
22+
*config.Configuration
23+
counter int
24+
}
25+
26+
func NewProducerHandler(cf *config.Configuration) (*producerHandler, error) {
27+
w := kafka.NewWriter(kafka.WriterConfig{
28+
Brokers: []string{cf.KAFKAURL},
29+
Topic: cf.KAFKATOPIC,
30+
})
31+
32+
return &producerHandler{
33+
kafkaP: kafkaP{w, cf, 1},
34+
}, nil
35+
}
36+
37+
// TODO: move this to handlers folder and call with interface saveJobToKafka
38+
func (h *producerHandler) JobsPostHandler(w http.ResponseWriter, r *http.Request) {
39+
body, err := ioutil.ReadAll(r.Body)
40+
if err != nil {
41+
log.Fatal("unable to read the payload ", err)
42+
}
43+
defer r.Body.Close()
44+
45+
var job model.Job
46+
err = json.Unmarshal(body, &job)
47+
if err != nil {
48+
log.Fatal("unable to unmarshall the body ", err)
49+
}
50+
51+
err = h.kafkaP.saveJobToKafka(job)
52+
if err != nil {
53+
log.Fatal("unable to produce the event ", err)
54+
}
55+
56+
w.Header().Set("content-type", "application/json")
57+
w.WriteHeader(http.StatusOK)
58+
w.Write(body)
59+
}
60+
61+
func (k *kafkaP) saveJobToKafka(job model.Job) error {
62+
d, err := json.Marshal(job)
63+
if err != nil {
64+
return err
65+
}
66+
67+
msg := kafka.Message{
68+
Key: []byte(fmt.Sprintf("key-%d", k.counter)),
69+
Value: d,
70+
}
71+
72+
err = k.WriteMessages(context.Background(), msg)
73+
if err != nil {
74+
return err
75+
}
76+
77+
// k.Close()
78+
79+
return nil
80+
}

0 commit comments

Comments
 (0)