From 99954b2c1be71efc73e591cc2e5010b10447a4e4 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Tue, 9 Apr 2024 01:46:19 +0800 Subject: [PATCH 01/20] add github workflow --- .github/workflows/codeql.yaml | 53 ++++++++++++++++++++++++ .github/workflows/go.yml | 70 ++++++++++++++++++++++++++++++++ .github/workflows/goreleaser.yml | 34 ++++++++++++++++ 3 files changed, 157 insertions(+) create mode 100644 .github/workflows/codeql.yaml create mode 100644 .github/workflows/go.yml create mode 100644 .github/workflows/goreleaser.yml diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml new file mode 100644 index 0000000..beb8101 --- /dev/null +++ b/.github/workflows/codeql.yaml @@ -0,0 +1,53 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +name: "CodeQL" + +on: + push: + branches: [ main ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ main ] + schedule: + - cron: '30 1 * * 0' + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + + permissions: + # required for all workflows + security-events: write + + # only required for workflows in private repositories + actions: read + contents: read + + strategy: + fail-fast: false + matrix: + # Override automatic language detection by changing the below list + # Supported options are ['csharp', 'cpp', 'go', 'java', 'javascript', 'python'] + # TODO: Enable for javascript later + language: [ 'go'] + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..bb339bc --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,70 @@ +name: Run Testing +on: push + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: Setup go + uses: actions/setup-go@v3 + with: + go-version: '^1' + - name: Checkout repository + uses: actions/checkout@v3 + - name: Setup golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + version: latest + args: --verbose + + # Label of the container job + test: + strategy: + matrix: + os: [ubuntu-latest] + go: [1.17, 1.18, 1.19, '1.20'] + include: + - os: ubuntu-latest + go-build: ~/.cache/go-build + name: ${{ matrix.os }} @ Go ${{ matrix.go }} + runs-on: ${{ matrix.os }} + env: + GO111MODULE: on + GOPROXY: https://proxy.golang.org + + steps: + - name: Set up Go ${{ matrix.go }} + uses: actions/setup-go@v3 + with: + go-version: ${{ matrix.go }} + + - name: Checkout Code + uses: actions/checkout@v3 + with: + ref: ${{ github.ref }} + + - uses: actions/cache@v3 + with: + path: | + ${{ matrix.go-build }} + ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go- + + # - uses: mer-team/rabbitmq-mng-action@v1.2 + # with: + # RABBITMQ_USER: 'guest' + # RABBITMQ_PASS: 'guest' + # RABBITMQ_PORT: 5672 + # RABBITMQ_MNG_PORT: 15672 + # RABBITMQ_TAG: '3-management-alpine' + + - name: Run Tests + run: | + go test -v -covermode=atomic -coverprofile=coverage.out + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + flags: ${{ matrix.os }},go-${{ matrix.go }} diff --git a/.github/workflows/goreleaser.yml b/.github/workflows/goreleaser.yml new file mode 100644 index 0000000..3af3a45 --- /dev/null +++ b/.github/workflows/goreleaser.yml @@ -0,0 +1,34 @@ +name: Goreleaser + +on: + push: + tags: + - '*' + +permissions: + contents: write + +jobs: + goreleaser: + runs-on: ubuntu-latest + steps: + - + name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - + name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: 1.17 + - + name: Run GoReleaser + uses: goreleaser/goreleaser-action@v4 + with: + # either 'goreleaser' (default) or 'goreleaser-pro' + distribution: goreleaser + version: latest + args: release --rm-dist + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} From 287293a5bbb5eb5a208bf046e71b2846c5c87fe4 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Tue, 9 Apr 2024 02:08:42 +0800 Subject: [PATCH 02/20] create project --- .github/dependabot.yml | 10 ++++++++++ go.mod | 19 +++++++++++++++++++ options.go | 0 options_test.go | 7 +++++++ 4 files changed, 36 insertions(+) create mode 100644 .github/dependabot.yml create mode 100644 go.mod create mode 100644 options.go create mode 100644 options_test.go diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..ec73510 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,10 @@ +version: 2 +updates: + - package-ecosystem: github-actions + directory: / + schedule: + interval: weekly + - package-ecosystem: gomod + directory: / + schedule: + interval: weekly \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e1e7244 --- /dev/null +++ b/go.mod @@ -0,0 +1,19 @@ +module github.com/golang-queue/kafka + +// go 1.22.0 + +go 1.18 + +require ( + github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 + github.com/segmentio/kafka-go v0.4.47 + github.com/stretchr/testify v1.8.1 + go.uber.org/goleak v1.2.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/goccy/go-json v0.10.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/options.go b/options.go new file mode 100644 index 0000000..e69de29 diff --git a/options_test.go b/options_test.go new file mode 100644 index 0000000..798d585 --- /dev/null +++ b/options_test.go @@ -0,0 +1,7 @@ +package kafka + +// Option for queue system +type Option func(*options) + +type options struct { +} From 24b9bd273be3a0906240987251aeffaa9ff21c20 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Tue, 9 Apr 2024 02:23:09 +0800 Subject: [PATCH 03/20] no message --- kafka.go | 1 + kafka_test.go | 1 + options.go | 7 +++++++ options_test.go | 6 ------ 4 files changed, 9 insertions(+), 6 deletions(-) create mode 100644 kafka.go create mode 100644 kafka_test.go diff --git a/kafka.go b/kafka.go new file mode 100644 index 0000000..82b3441 --- /dev/null +++ b/kafka.go @@ -0,0 +1 @@ +package kafka diff --git a/kafka_test.go b/kafka_test.go new file mode 100644 index 0000000..82b3441 --- /dev/null +++ b/kafka_test.go @@ -0,0 +1 @@ +package kafka diff --git a/options.go b/options.go index e69de29..798d585 100644 --- a/options.go +++ b/options.go @@ -0,0 +1,7 @@ +package kafka + +// Option for queue system +type Option func(*options) + +type options struct { +} diff --git a/options_test.go b/options_test.go index 798d585..82b3441 100644 --- a/options_test.go +++ b/options_test.go @@ -1,7 +1 @@ package kafka - -// Option for queue system -type Option func(*options) - -type options struct { -} From 4eafe2e0aa609bf7c5c64b2231a9c56315b815ea Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Tue, 9 Apr 2024 02:41:29 +0800 Subject: [PATCH 04/20] update readme --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c38b226..a70c5a0 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ # Kafka -[Kafka](https://kafka.apache.org/) as backend for [Queue package](https://github.com/golang-queue/queue) +[Kafka](https://kafka.apache.org/) as backend for [Queue package](https://github.com/golang-queue/queue). See the [Go kafka Client Library]( github.com/segmentio/kafka-go). + + From 5be97cabd105134675617c875a922b3d859bc5d0 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Tue, 9 Apr 2024 19:09:19 +0800 Subject: [PATCH 05/20] add options, kafka basic code --- LICENSE | 2 +- go.mod | 3 +++ go.sum | 64 +++++++++++++++++++++++++++++++++++++++++++++++++ kafka.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++ options.go | 36 ++++++++++++++++++++++++++++ options_test.go | 38 +++++++++++++++++++++++++++++ 6 files changed, 203 insertions(+), 1 deletion(-) create mode 100644 go.sum diff --git a/LICENSE b/LICENSE index b645a5c..716cb64 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2022 golang-queue +Copyright (c) 2022 golang-queue/kafka Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/go.mod b/go.mod index e1e7244..07c0caa 100644 --- a/go.mod +++ b/go.mod @@ -17,3 +17,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + + +replace github.com/golang-queue/kafka => ../../ \ No newline at end of file diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..1fa0c67 --- /dev/null +++ b/go.sum @@ -0,0 +1,64 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= +github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 h1:T2DoUcMWZr6uSUQAr5wCEzOiwHB1zJOiATAZ4BUAefg= +github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98/go.mod h1:8P7IgwdxwKh0/W1I9yCuQQGI8OHIuc7fIHi4OYr1COU= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kafka.go b/kafka.go index 82b3441..6bef479 100644 --- a/kafka.go +++ b/kafka.go @@ -1 +1,62 @@ package kafka + +import ( + "context" + "sync" + + "github.com/golang-queue/queue" + "github.com/golang-queue/queue/core" +) + +var _ core.Worker = (*Worker)(nil) + +type Worker struct { + stop chan struct{} + stopFlag int32 + stopOnce sync.Once + startOnce sync.Once + opts options +} + +func NewWorker(opts ...Option) *Worker { + //var err error + w := &Worker{ + opts: newOptions(opts...), + } + return w +} + +func (w *Worker) startConsumer() (err error) { + + // err := nil + + // if err != nil { + // // + // } + return err +} + +// Run start the worker +func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error { + return w.opts.runFunc(ctx, task) +} + +// Shutdown worker +func (w *Worker) Shutdown() (err error) { + + return err +} + +// Queue send notification to queue +func (w *Worker) Queue(job core.QueuedMessage) (err error) { + //err := nil + + return err + +} + +func (w *Worker) Request() (core.QueuedMessage, error) { + + return nil, queue.ErrNoTaskInQueue + +} diff --git a/options.go b/options.go index 798d585..2cf378a 100644 --- a/options.go +++ b/options.go @@ -1,7 +1,43 @@ package kafka +import ( + "context" + + "github.com/golang-queue/queue" + "github.com/golang-queue/queue/core" +) + // Option for queue system type Option func(*options) type options struct { + runFunc func(context.Context, core.QueuedMessage) error + logger queue.Logger + queue string +} + +// WithQueue setup the queue name +func WithQueue(val string) Option { + return func(w *options) { + w.queue = val + } +} + +// WithRunFunc setup the run func of queue +func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option { + return func(w *options) { + w.runFunc = fn + } +} + +// WithLogger set custom logger +func WithLogger(l queue.Logger) Option { + return func(w *options) { + w.logger = l + } +} + +func newOptions(opts ...Option) options { + defaultOpts := options{} + return defaultOpts } diff --git a/options_test.go b/options_test.go index 82b3441..06e66fe 100644 --- a/options_test.go +++ b/options_test.go @@ -1 +1,39 @@ package kafka + +import ( + "testing" + "time" + + "github.com/golang-queue/queue" + "github.com/stretchr/testify/assert" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +type mockMessage struct { + Message string +} + +func (m mockMessage) Bytes() []byte { + return []byte(m.Message) +} + +func TestShutdownWorkFlow(t *testing.T) { + w := NewWorker( + WithQueue("test"), + ) + q, err := queue.NewQueue( + queue.WithWorker(w), + queue.WithWorkerCount(2), + ) + assert.NoError(t, err) + q.Start() + time.Sleep(1 * time.Second) + q.Shutdown() + // check shutdown once + q.Shutdown() + q.Wait() +} From 39fff07e033ccd1665681c000627cdf50a976591 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Tue, 9 Apr 2024 23:42:02 +0800 Subject: [PATCH 06/20] update necessary options and solve module name conflict --- go.mod | 11 ++++++----- go.sum | 21 ++++++++++++++++++--- kafka.go | 4 ++++ options.go | 30 +++++++++++++++++++++++++++--- 4 files changed, 55 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 07c0caa..8dc1760 100644 --- a/go.mod +++ b/go.mod @@ -6,17 +6,18 @@ go 1.18 require ( github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 - github.com/segmentio/kafka-go v0.4.47 - github.com/stretchr/testify v1.8.1 - go.uber.org/goleak v1.2.0 + github.com/segmentio/kafka-go v0.4.47 + github.com/stretchr/testify v1.9.0 + go.uber.org/goleak v1.3.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/goccy/go-json v0.10.0 // indirect + github.com/klauspost/compress v1.15.9 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/golang-queue/kafka => ../../ \ No newline at end of file +replace github.com/golang-queue/kafka => ../../ diff --git a/go.sum b/go.sum index 1fa0c67..7c9bf92 100644 --- a/go.sum +++ b/go.sum @@ -1,25 +1,36 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 h1:T2DoUcMWZr6uSUQAr5wCEzOiwHB1zJOiATAZ4BUAefg= github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98/go.mod h1:8P7IgwdxwKh0/W1I9yCuQQGI8OHIuc7fIHi4OYr1COU= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= @@ -30,6 +41,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -53,6 +65,7 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -60,5 +73,7 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kafka.go b/kafka.go index 6bef479..71636db 100644 --- a/kafka.go +++ b/kafka.go @@ -6,16 +6,20 @@ import ( "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" + kafkaAPI "github.com/segmentio/kafka-go" ) var _ core.Worker = (*Worker)(nil) type Worker struct { + // + //shutdown func() // stop chan struct{} stopFlag int32 stopOnce sync.Once startOnce sync.Once opts options + conn kafkaAPI.Dialer } func NewWorker(opts ...Option) *Worker { diff --git a/options.go b/options.go index 2cf378a..4c4c88c 100644 --- a/options.go +++ b/options.go @@ -11,9 +11,33 @@ import ( type Option func(*options) type options struct { - runFunc func(context.Context, core.QueuedMessage) error - logger queue.Logger - queue string + runFunc func(context.Context, core.QueuedMessage) error + logger queue.Logger + addr string + queue string + topic string + partition int //kafka's partition +} + +// WithAddr setup the URI +func WithAddr(addr string) Option { + return func(w *options) { + w.addr = addr + } +} + +// WithTopic setup the Topic +func WithTopic(topic string) Option { + return func(w *options) { + w.topic = topic + } +} + +// WithPartition setup the partition +func WithPartition(partition int) Option { + return func(w *options) { + w.partition = partition + } } // WithQueue setup the queue name From 43b836a5a38e0c08306e62d9f37f22f33b5e01d3 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Tue, 9 Apr 2024 23:55:03 +0800 Subject: [PATCH 07/20] update go.mod version --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 8dc1760..b2b7155 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,8 @@ go 1.18 require ( github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 github.com/segmentio/kafka-go v0.4.47 - github.com/stretchr/testify v1.9.0 - go.uber.org/goleak v1.3.0 + github.com/stretchr/testify v1.8.4 + go.uber.org/goleak v1.2.1 ) require ( diff --git a/go.sum b/go.sum index 7c9bf92..83ddbf6 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -29,8 +29,8 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= From 824829f44e1bb90ad9fd71018b6884d088e51181 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Wed, 10 Apr 2024 00:01:46 +0800 Subject: [PATCH 08/20] avoid lint error --- kafka.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/kafka.go b/kafka.go index 71636db..0e41727 100644 --- a/kafka.go +++ b/kafka.go @@ -2,11 +2,9 @@ package kafka import ( "context" - "sync" "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" - kafkaAPI "github.com/segmentio/kafka-go" ) var _ core.Worker = (*Worker)(nil) @@ -14,12 +12,12 @@ var _ core.Worker = (*Worker)(nil) type Worker struct { // //shutdown func() // - stop chan struct{} - stopFlag int32 - stopOnce sync.Once - startOnce sync.Once - opts options - conn kafkaAPI.Dialer + // stop chan struct{} + // stopFlag int32 + // stopOnce sync.Once + // startOnce sync.Once + opts options + // conn kafkaAPI.Dialer } func NewWorker(opts ...Option) *Worker { @@ -60,7 +58,7 @@ func (w *Worker) Queue(job core.QueuedMessage) (err error) { } func (w *Worker) Request() (core.QueuedMessage, error) { - + _ = w.startConsumer() return nil, queue.ErrNoTaskInQueue } From c470982eb02287436d3b63dec78f01c8eccdc322 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Wed, 10 Apr 2024 00:05:55 +0800 Subject: [PATCH 09/20] avoid lint error which is caused by unused "mockmessage" --- options_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/options_test.go b/options_test.go index 06e66fe..5b3d250 100644 --- a/options_test.go +++ b/options_test.go @@ -13,13 +13,13 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } -type mockMessage struct { - Message string -} +// type mockMessage struct { +// Message string +// } -func (m mockMessage) Bytes() []byte { - return []byte(m.Message) -} +// func (m mockMessage) Bytes() []byte { +// return []byte(m.Message) +// } func TestShutdownWorkFlow(t *testing.T) { w := NewWorker( From fe9556d91ae6d628b4d33f7ed2a4cc1f5c972014 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Wed, 10 Apr 2024 00:24:13 +0800 Subject: [PATCH 10/20] avoid options_test.go's lint error --- options_test.go | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/options_test.go b/options_test.go index 5b3d250..55cb0a3 100644 --- a/options_test.go +++ b/options_test.go @@ -2,10 +2,7 @@ package kafka import ( "testing" - "time" - "github.com/golang-queue/queue" - "github.com/stretchr/testify/assert" "go.uber.org/goleak" ) @@ -21,19 +18,19 @@ func TestMain(m *testing.M) { // return []byte(m.Message) // } -func TestShutdownWorkFlow(t *testing.T) { - w := NewWorker( - WithQueue("test"), - ) - q, err := queue.NewQueue( - queue.WithWorker(w), - queue.WithWorkerCount(2), - ) - assert.NoError(t, err) - q.Start() - time.Sleep(1 * time.Second) - q.Shutdown() - // check shutdown once - q.Shutdown() - q.Wait() -} +// func TestShutdownWorkFlow(t *testing.T) { +// w := NewWorker( +// WithQueue("test"), +// ) +// q, err := queue.NewQueue( +// queue.WithWorker(w), +// queue.WithWorkerCount(2), +// ) +// assert.NoError(t, err) +// q.Start() +// time.Sleep(1 * time.Second) +// q.Shutdown() +// // check shutdown once +// q.Shutdown() +// q.Wait() +// } From 8e5134f597eb86cdb36bb6e1803a49552883f672 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Wed, 10 Apr 2024 01:02:30 +0800 Subject: [PATCH 11/20] until 4/9 --- kafka.go | 23 +++++++++++++++++++---- options.go | 7 +++++++ 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/kafka.go b/kafka.go index 0e41727..bdb82f0 100644 --- a/kafka.go +++ b/kafka.go @@ -2,9 +2,12 @@ package kafka import ( "context" + "net" + "sync/atomic" "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" + kafkaAPI "github.com/segmentio/kafka-go" ) var _ core.Worker = (*Worker)(nil) @@ -13,18 +16,28 @@ type Worker struct { // //shutdown func() // // stop chan struct{} - // stopFlag int32 + stopFlag int32 // stopOnce sync.Once // startOnce sync.Once opts options - // conn kafkaAPI.Dialer + conn *kafkaAPI.Conn } func NewWorker(opts ...Option) *Worker { - //var err error + var err error w := &Worker{ opts: newOptions(opts...), } + w.conn, err = + //conn, err := + (&kafkaAPI.Dialer{ + Resolver: &net.Resolver{}, + }).DialLeader(context.Background(), w.opts.network, + w.opts.addr, w.opts.topic, 0) + if err != nil { + w.opts.logger.Fatal("can't connect kafka: ", err) + } + return w } @@ -52,7 +65,9 @@ func (w *Worker) Shutdown() (err error) { // Queue send notification to queue func (w *Worker) Queue(job core.QueuedMessage) (err error) { //err := nil - + if atomic.LoadInt32(&w.stopFlag) == 1 { + return queue.ErrQueueShutdown + } return err } diff --git a/options.go b/options.go index 4c4c88c..b7773f7 100644 --- a/options.go +++ b/options.go @@ -14,6 +14,7 @@ type options struct { runFunc func(context.Context, core.QueuedMessage) error logger queue.Logger addr string + network string queue string topic string partition int //kafka's partition @@ -26,6 +27,12 @@ func WithAddr(addr string) Option { } } +func WithNetwork(network string) Option { + return func(w *options) { + w.network = network + } +} + // WithTopic setup the Topic func WithTopic(topic string) Option { return func(w *options) { From b2b9a561431d6eaf37e409eb434d326e3168273f Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Thu, 11 Apr 2024 23:44:46 +0800 Subject: [PATCH 12/20] queue complete --- kafka.go | 17 ++++++++++++++++- options.go | 23 ++++++++++++++++------- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/kafka.go b/kafka.go index bdb82f0..f14ddc5 100644 --- a/kafka.go +++ b/kafka.go @@ -4,6 +4,7 @@ import ( "context" "net" "sync/atomic" + "time" "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" @@ -68,8 +69,22 @@ func (w *Worker) Queue(job core.QueuedMessage) (err error) { if atomic.LoadInt32(&w.stopFlag) == 1 { return queue.ErrQueueShutdown } - return err + // send message + base := time.Now() + msg := kafkaAPI.Message{ + Time: base.Truncate(time.Millisecond), + Value: job.Bytes(), + } + if w.opts.compression == nil { + _, err = w.conn.WriteMessages(msg) + } else { + _, err = w.conn.WriteCompressedMessages(w.opts.compression, msg) + } + // if err != nil { + // t.Fatal(err) + // } + return err } func (w *Worker) Request() (core.QueuedMessage, error) { diff --git a/options.go b/options.go index b7773f7..06dbd3b 100644 --- a/options.go +++ b/options.go @@ -5,19 +5,21 @@ import ( "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" + "github.com/segmentio/kafka-go/compress" ) // Option for queue system type Option func(*options) type options struct { - runFunc func(context.Context, core.QueuedMessage) error - logger queue.Logger - addr string - network string - queue string - topic string - partition int //kafka's partition + runFunc func(context.Context, core.QueuedMessage) error + logger queue.Logger + addr string + network string + queue string + topic string + partition int //kafka's partition + compression compress.Codec } // WithAddr setup the URI @@ -54,6 +56,13 @@ func WithQueue(val string) Option { } } +// WithAddr setup the URI +func WithCompress(compress compress.Codec) Option { + return func(w *options) { + w.compression = compress + } +} + // WithRunFunc setup the run func of queue func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option { return func(w *options) { From d97575042246b79a8dcdf749050916f3031d4b97 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Sat, 13 Apr 2024 12:43:31 +0800 Subject: [PATCH 13/20] adjust kafka due to the difference between kafka and rabbitmq --- kafka.go | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++ kafka_test.go | 38 +++++++++++++ options.go | 6 ++ 3 files changed, 192 insertions(+) diff --git a/kafka.go b/kafka.go index f14ddc5..2c50957 100644 --- a/kafka.go +++ b/kafka.go @@ -2,17 +2,165 @@ package kafka import ( "context" + "fmt" "net" + "sync" "sync/atomic" "time" "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" kafkaAPI "github.com/segmentio/kafka-go" + ktesting "github.com/segmentio/kafka-go/testing" ) var _ core.Worker = (*Worker)(nil) +// one consumer connect to kafka broker +var kafkaConsumer *KafkaConsumer + +type KafkaConsumer struct { + //stopFlag int32 + opts options + //conn *kafkaAPI.Conn + client *kafkaAPI.Client + shutdown func() +} +type ConnWaitGroup struct { + DialFunc func(context.Context, string, string) (net.Conn, error) + sync.WaitGroup +} + +// start consumer, get message from kafka +func InitConsumer(opts ...Option) { + //var err error + kafkaConsumer = &KafkaConsumer{ + opts: newOptions(opts...), + } + // // connect to broker + // // 需要解决自动重连的问题 + // conn, err := + // (&kafkaAPI.Dialer{ + // Resolver: &net.Resolver{}, + // }).DialLeader(context.Background(), kafkaConsumer.opts.network, + // kafkaConsumer.opts.addr, kafkaConsumer.opts.topic, 0) + // kafkaConsumer.conn = conn + + // 创建client,创建topic,创建shutdown + client, shutdown := newLocalClientAndTopic(kafkaConsumer.opts.topic) + kafkaConsumer.client = client + kafkaConsumer.shutdown = shutdown + //return err +} + +func newLocalClient(address string) (*kafkaAPI.Client, func()) { + return newClient(kafkaAPI.TCP(address)) +} + +func newClient(addr net.Addr) (*kafkaAPI.Client, func()) { + conns := &ktesting.ConnWaitGroup{ + DialFunc: (&net.Dialer{}).DialContext, + } + + transport := &kafkaAPI.Transport{ + Dial: conns.Dial, + Resolver: kafkaAPI.NewBrokerResolver(nil), + } + + client := &kafkaAPI.Client{ + Addr: addr, + Timeout: 5 * time.Second, + Transport: transport, + } + + return client, func() { transport.CloseIdleConnections(); conns.Wait() } +} + +func newLocalClientAndTopic(topic string) (*kafkaAPI.Client, func()) { + //topic := makeTopic() + client, shutdown := newLocalClientWithTopic(topic, 1) + return client, shutdown +} + +func newLocalClientWithTopic(topic string, partitions int) (*kafkaAPI.Client, func()) { + client, shutdown := newLocalClient(topic) + if err := clientCreateTopic(client, topic, partitions); err != nil { + shutdown() + panic(err) + } + return client, func() { + client.DeleteTopics(context.Background(), &kafkaAPI.DeleteTopicsRequest{ + Topics: []string{topic}, + }) + shutdown() + } +} + +func clientCreateTopic(client *kafkaAPI.Client, topic string, partitions int) error { + _, err := client.CreateTopics(context.Background(), &kafkaAPI.CreateTopicsRequest{ + Topics: []kafkaAPI.TopicConfig{{ + Topic: topic, + NumPartitions: partitions, + ReplicationFactor: 1, + }}, + }) + if err != nil { + return err + } + + // Topic creation seems to be asynchronous. Metadata for the topic partition + // layout in the cluster is available in the controller before being synced + // with the other brokers, which causes "Error:[3] Unknown Topic Or Partition" + // when sending requests to the partition leaders. + // + // This loop will wait up to 2 seconds polling the cluster until no errors + // are returned. + for i := 0; i < 20; i++ { + r, err := client.Fetch(context.Background(), &kafkaAPI.FetchRequest{ + Topic: topic, + Partition: 0, + Offset: 0, + }) + if err == nil && r.Error == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + return nil +} + +// 获取消息发送到队列中去 +func GetData() { + for { + // select { + // case <-time.After(leftTime): + // return //context.DeadlineExceeded + // // case err := <-done: // job finish + // // return err + // // case p := <-panicChan: + // // panic(p) + // default: + // 接收消息 + fmt.Printf("start fetch data") + res, err := kafkaConsumer.client.Fetch(context.Background(), &kafkaAPI.FetchRequest{ + Topic: kafkaConsumer.opts.topic, + Partition: 0, + Offset: 0, + MinBytes: 1, + MaxBytes: 64 * 1024, + MaxWait: 100 * time.Millisecond, + }) + if err != nil { + //t.Fatal(err) + fmt.Printf("%v", err) + } + // 打印出消息,后续放入队列中去 + fmt.Printf("%v", res) + // } + } +} + type Worker struct { // //shutdown func() // diff --git a/kafka_test.go b/kafka_test.go index 82b3441..527f988 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -1 +1,39 @@ package kafka + +import ( + "fmt" + "testing" +) + +// import ( +// "testing" +// "time" + +// "github.com/golang-queue/queue" +// "github.com/stretchr/testify/assert" +// ) + +func TestFetchData(t *testing.T) { + // m := mockMessage{ + // Message: "foo", + // } + // w := NewWorker() + // q, err := queue.NewQueue( + // queue.WithWorker(w), + // queue.WithWorkerCount(2), + // ) + // assert.NoError(t, err) + // q.Start() + // time.Sleep(50 * time.Millisecond) + // q.Shutdown() + // // can't queue task after shutdown + // err = q.Queue(m) + // assert.Error(t, err) + // assert.Equal(t, queue.ErrQueueShutdown, err) + // q.Wait() + fmt.Printf("start") + InitConsumer(WithAddr("localhost"), + WithPartition(1), + WithTopic("hello")) + fmt.Printf("end") +} diff --git a/options.go b/options.go index 06dbd3b..90cc8be 100644 --- a/options.go +++ b/options.go @@ -79,5 +79,11 @@ func WithLogger(l queue.Logger) Option { func newOptions(opts ...Option) options { defaultOpts := options{} + + for _, opt := range opts { + // Call the option giving the instantiated + opt(&defaultOpts) + } + return defaultOpts } From 5911f100d1f2dd82acf9bcb7a0d066bac6334d14 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Sat, 13 Apr 2024 16:53:48 +0800 Subject: [PATCH 14/20] prepare to change from fetch (nonblock) to reader (block) --- kafka.go | 18 ++++++++++++------ kafka_test.go | 4 ++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/kafka.go b/kafka.go index 2c50957..890a3ab 100644 --- a/kafka.go +++ b/kafka.go @@ -47,9 +47,15 @@ func InitConsumer(opts ...Option) { // kafkaConsumer.conn = conn // 创建client,创建topic,创建shutdown - client, shutdown := newLocalClientAndTopic(kafkaConsumer.opts.topic) + client, shutdown := newLocalClientAndTopic(kafkaConsumer.opts.addr, kafkaConsumer.opts.topic, + kafkaConsumer.opts.partition) kafkaConsumer.client = client kafkaConsumer.shutdown = shutdown + + fmt.Printf("get data.\n") + GetData() + shutdown() + fmt.Printf("shutdown now!!!\n") //return err } @@ -76,14 +82,14 @@ func newClient(addr net.Addr) (*kafkaAPI.Client, func()) { return client, func() { transport.CloseIdleConnections(); conns.Wait() } } -func newLocalClientAndTopic(topic string) (*kafkaAPI.Client, func()) { +func newLocalClientAndTopic(address string, topic string, partition int) (*kafkaAPI.Client, func()) { //topic := makeTopic() - client, shutdown := newLocalClientWithTopic(topic, 1) + client, shutdown := newLocalClientWithTopic(address, topic, partition) return client, shutdown } -func newLocalClientWithTopic(topic string, partitions int) (*kafkaAPI.Client, func()) { - client, shutdown := newLocalClient(topic) +func newLocalClientWithTopic(address string, topic string, partitions int) (*kafkaAPI.Client, func()) { + client, shutdown := newLocalClient(address) if err := clientCreateTopic(client, topic, partitions); err != nil { shutdown() panic(err) @@ -145,7 +151,7 @@ func GetData() { fmt.Printf("start fetch data") res, err := kafkaConsumer.client.Fetch(context.Background(), &kafkaAPI.FetchRequest{ Topic: kafkaConsumer.opts.topic, - Partition: 0, + Partition: kafkaConsumer.opts.partition, Offset: 0, MinBytes: 1, MaxBytes: 64 * 1024, diff --git a/kafka_test.go b/kafka_test.go index 527f988..0b31f8c 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -31,9 +31,9 @@ func TestFetchData(t *testing.T) { // assert.Error(t, err) // assert.Equal(t, queue.ErrQueueShutdown, err) // q.Wait() - fmt.Printf("start") + fmt.Printf("start\n") InitConsumer(WithAddr("localhost"), WithPartition(1), WithTopic("hello")) - fmt.Printf("end") + fmt.Printf("end\n") } From 3826a85e4acc4c5a8e365c1a73f4b46f0a11f13e Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Sat, 13 Apr 2024 20:09:39 +0800 Subject: [PATCH 15/20] convert client to reader --- kafka.go | 202 +++++++++++++++++++++++++++---------------------------- 1 file changed, 98 insertions(+), 104 deletions(-) diff --git a/kafka.go b/kafka.go index 890a3ab..1f82c87 100644 --- a/kafka.go +++ b/kafka.go @@ -11,7 +11,6 @@ import ( "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" kafkaAPI "github.com/segmentio/kafka-go" - ktesting "github.com/segmentio/kafka-go/testing" ) var _ core.Worker = (*Worker)(nil) @@ -23,8 +22,9 @@ type KafkaConsumer struct { //stopFlag int32 opts options //conn *kafkaAPI.Conn - client *kafkaAPI.Client - shutdown func() + //client *kafkaAPI.Client + reader *kafkaAPI.Reader + //shutdown func() } type ConnWaitGroup struct { DialFunc func(context.Context, string, string) (net.Conn, error) @@ -37,104 +37,105 @@ func InitConsumer(opts ...Option) { kafkaConsumer = &KafkaConsumer{ opts: newOptions(opts...), } - // // connect to broker - // // 需要解决自动重连的问题 - // conn, err := - // (&kafkaAPI.Dialer{ - // Resolver: &net.Resolver{}, - // }).DialLeader(context.Background(), kafkaConsumer.opts.network, - // kafkaConsumer.opts.addr, kafkaConsumer.opts.topic, 0) - // kafkaConsumer.conn = conn - // 创建client,创建topic,创建shutdown - client, shutdown := newLocalClientAndTopic(kafkaConsumer.opts.addr, kafkaConsumer.opts.topic, - kafkaConsumer.opts.partition) - kafkaConsumer.client = client - kafkaConsumer.shutdown = shutdown + _, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // 创建client,创建topic,创建shutdown + // client, shutdown := newLocalClientAndTopic(kafkaConsumer.opts.addr, kafkaConsumer.opts.topic, + // kafkaConsumer.opts.partition) + reader := kafkaAPI.NewReader(kafkaAPI.ReaderConfig{ + Brokers: []string{fmt.Sprintf("%s:9092", kafkaConsumer.opts.addr)}, + Topic: kafkaConsumer.opts.topic, + MinBytes: 1, + MaxBytes: 10e6, + MaxWait: 100 * time.Millisecond, + //Logger: newTestKafkaLogger(t, ""), + }) + kafkaConsumer.reader = reader + //kafkaConsumer.shutdown = shutdown fmt.Printf("get data.\n") GetData() - shutdown() fmt.Printf("shutdown now!!!\n") - //return err -} - -func newLocalClient(address string) (*kafkaAPI.Client, func()) { - return newClient(kafkaAPI.TCP(address)) -} - -func newClient(addr net.Addr) (*kafkaAPI.Client, func()) { - conns := &ktesting.ConnWaitGroup{ - DialFunc: (&net.Dialer{}).DialContext, - } - - transport := &kafkaAPI.Transport{ - Dial: conns.Dial, - Resolver: kafkaAPI.NewBrokerResolver(nil), - } - - client := &kafkaAPI.Client{ - Addr: addr, - Timeout: 5 * time.Second, - Transport: transport, - } - - return client, func() { transport.CloseIdleConnections(); conns.Wait() } -} - -func newLocalClientAndTopic(address string, topic string, partition int) (*kafkaAPI.Client, func()) { - //topic := makeTopic() - client, shutdown := newLocalClientWithTopic(address, topic, partition) - return client, shutdown -} - -func newLocalClientWithTopic(address string, topic string, partitions int) (*kafkaAPI.Client, func()) { - client, shutdown := newLocalClient(address) - if err := clientCreateTopic(client, topic, partitions); err != nil { - shutdown() - panic(err) - } - return client, func() { - client.DeleteTopics(context.Background(), &kafkaAPI.DeleteTopicsRequest{ - Topics: []string{topic}, - }) - shutdown() - } -} - -func clientCreateTopic(client *kafkaAPI.Client, topic string, partitions int) error { - _, err := client.CreateTopics(context.Background(), &kafkaAPI.CreateTopicsRequest{ - Topics: []kafkaAPI.TopicConfig{{ - Topic: topic, - NumPartitions: partitions, - ReplicationFactor: 1, - }}, - }) - if err != nil { - return err - } - - // Topic creation seems to be asynchronous. Metadata for the topic partition - // layout in the cluster is available in the controller before being synced - // with the other brokers, which causes "Error:[3] Unknown Topic Or Partition" - // when sending requests to the partition leaders. - // - // This loop will wait up to 2 seconds polling the cluster until no errors - // are returned. - for i := 0; i < 20; i++ { - r, err := client.Fetch(context.Background(), &kafkaAPI.FetchRequest{ - Topic: topic, - Partition: 0, - Offset: 0, - }) - if err == nil && r.Error == nil { - break - } - time.Sleep(100 * time.Millisecond) - } - - return nil -} + defer reader.Close() +} + +// func newLocalClient(address string) (*kafkaAPI.Client, func()) { +// return newClient(kafkaAPI.TCP(address)) +// } + +// func newClient(addr net.Addr) (*kafkaAPI.Client, func()) { +// conns := &ktesting.ConnWaitGroup{ +// DialFunc: (&net.Dialer{}).DialContext, +// } + +// transport := &kafkaAPI.Transport{ +// Dial: conns.Dial, +// Resolver: kafkaAPI.NewBrokerResolver(nil), +// } + +// client := &kafkaAPI.Client{ +// Addr: addr, +// Timeout: 5 * time.Second, +// Transport: transport, +// } + +// return client, func() { transport.CloseIdleConnections(); conns.Wait() } +// } + +// func newLocalClientAndTopic(address string, topic string, partition int) (*kafkaAPI.Client, func()) { +// //topic := makeTopic() +// client, shutdown := newLocalClientWithTopic(address, topic, partition) +// return client, shutdown +// } + +// func newLocalClientWithTopic(address string, topic string, partitions int) (*kafkaAPI.Client, func()) { +// client, shutdown := newLocalClient(address) +// if err := clientCreateTopic(client, topic, partitions); err != nil { +// shutdown() +// panic(err) +// } +// return client, func() { +// client.DeleteTopics(context.Background(), &kafkaAPI.DeleteTopicsRequest{ +// Topics: []string{topic}, +// }) +// shutdown() +// } +// } + +// func clientCreateTopic(client *kafkaAPI.Client, topic string, partitions int) error { +// _, err := client.CreateTopics(context.Background(), &kafkaAPI.CreateTopicsRequest{ +// Topics: []kafkaAPI.TopicConfig{{ +// Topic: topic, +// NumPartitions: partitions, +// ReplicationFactor: 1, +// }}, +// }) +// if err != nil { +// return err +// } + +// // Topic creation seems to be asynchronous. Metadata for the topic partition +// // layout in the cluster is available in the controller before being synced +// // with the other brokers, which causes "Error:[3] Unknown Topic Or Partition" +// // when sending requests to the partition leaders. +// // +// // This loop will wait up to 2 seconds polling the cluster until no errors +// // are returned. +// for i := 0; i < 20; i++ { +// r, err := client.Fetch(context.Background(), &kafkaAPI.FetchRequest{ +// Topic: topic, +// Partition: 0, +// Offset: 0, +// }) +// if err == nil && r.Error == nil { +// break +// } +// time.Sleep(100 * time.Millisecond) +// } + +// return nil +// } // 获取消息发送到队列中去 func GetData() { @@ -149,14 +150,7 @@ func GetData() { // default: // 接收消息 fmt.Printf("start fetch data") - res, err := kafkaConsumer.client.Fetch(context.Background(), &kafkaAPI.FetchRequest{ - Topic: kafkaConsumer.opts.topic, - Partition: kafkaConsumer.opts.partition, - Offset: 0, - MinBytes: 1, - MaxBytes: 64 * 1024, - MaxWait: 100 * time.Millisecond, - }) + res, err := kafkaConsumer.reader.ReadMessage(context.Background()) if err != nil { //t.Fatal(err) fmt.Printf("%v", err) From 9c65671526059074ede4f28665ac7df798c7d94b Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Sat, 13 Apr 2024 23:18:31 +0800 Subject: [PATCH 16/20] test d2lang function --- images/flow01.d2 | 5 +++ images/flow01.svg | 97 +++++++++++++++++++++++++++++++++++++++++++++++ kafka.go | 5 +-- 3 files changed, 103 insertions(+), 4 deletions(-) create mode 100644 images/flow01.d2 create mode 100644 images/flow01.svg diff --git a/images/flow01.d2 b/images/flow01.d2 new file mode 100644 index 0000000..47c991e --- /dev/null +++ b/images/flow01.d2 @@ -0,0 +1,5 @@ +donut: { shape: circle } +database.shape: cylinder +you: { + shape: person +} \ No newline at end of file diff --git a/images/flow01.svg b/images/flow01.svg new file mode 100644 index 0000000..0943ffb --- /dev/null +++ b/images/flow01.svg @@ -0,0 +1,97 @@ +donutdatabaseyou + + + + + diff --git a/kafka.go b/kafka.go index 1f82c87..89eb1c2 100644 --- a/kafka.go +++ b/kafka.go @@ -20,11 +20,8 @@ var kafkaConsumer *KafkaConsumer type KafkaConsumer struct { //stopFlag int32 - opts options - //conn *kafkaAPI.Conn - //client *kafkaAPI.Client + opts options reader *kafkaAPI.Reader - //shutdown func() } type ConnWaitGroup struct { DialFunc func(context.Context, string, string) (net.Conn, error) From 32c77af6aca970e4c256dfc2063988ee7af0abb6 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Mon, 15 Apr 2024 10:28:48 +0800 Subject: [PATCH 17/20] adjust queue's dependent version, old version doesn't exist ring file --- go.mod | 16 ++++++++++------ go.sum | 13 +++++-------- kafka.go | 1 + 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index b2b7155..5bca5fc 100644 --- a/go.mod +++ b/go.mod @@ -4,20 +4,24 @@ module github.com/golang-queue/kafka go 1.18 +// require ( +// github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 +// github.com/segmentio/kafka-go v0.4.47 +// github.com/stretchr/testify v1.8.4 +// go.uber.org/goleak v1.2.1 +// ) + +// 0c677f44188bc2c3e6a require ( - github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 + github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b github.com/segmentio/kafka-go v0.4.47 - github.com/stretchr/testify v1.8.4 go.uber.org/goleak v1.2.1 ) require ( - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/goccy/go-json v0.10.0 // indirect + github.com/jpillora/backoff v1.0.0 // indirect github.com/klauspost/compress v1.15.9 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) replace github.com/golang-queue/kafka => ../../ diff --git a/go.sum b/go.sum index 83ddbf6..5efea86 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,14 @@ +github.com/appleboy/com v0.1.7 h1:4lYTFNoMAAXGGIC8lDxVg/NY+1aXbYqfAWN05cZhd0M= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= -github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 h1:T2DoUcMWZr6uSUQAr5wCEzOiwHB1zJOiATAZ4BUAefg= -github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98/go.mod h1:8P7IgwdxwKh0/W1I9yCuQQGI8OHIuc7fIHi4OYr1COU= +github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b h1:EfOci2gtTtCMgxv2Coh+i0iEARmvnCrxcY0Mm08KzMw= +github.com/golang-queue/queue v0.1.4-0.20240218073423-0c677f44188b/go.mod h1:5nEkJTzw9Boc8ZCylQlrJK5f/Vd8Uo58yAssRli5ckg= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -21,7 +20,6 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -73,7 +71,6 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kafka.go b/kafka.go index 89eb1c2..f35eec3 100644 --- a/kafka.go +++ b/kafka.go @@ -22,6 +22,7 @@ type KafkaConsumer struct { //stopFlag int32 opts options reader *kafkaAPI.Reader + ring queueAPI.Ring } type ConnWaitGroup struct { DialFunc func(context.Context, string, string) (net.Conn, error) From 65c815951aa0aa2326266831d6fc2af4d511afc9 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Mon, 15 Apr 2024 17:44:07 +0800 Subject: [PATCH 18/20] =?UTF-8?q?until=202024/04/15=2017=EF=BC=9A43,=20bef?= =?UTF-8?q?ore=20ring=20request=20modification?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/kafka.go b/kafka.go index f35eec3..f0cc083 100644 --- a/kafka.go +++ b/kafka.go @@ -10,6 +10,7 @@ import ( "github.com/golang-queue/queue" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" kafkaAPI "github.com/segmentio/kafka-go" ) @@ -22,7 +23,7 @@ type KafkaConsumer struct { //stopFlag int32 opts options reader *kafkaAPI.Reader - ring queueAPI.Ring + ring *queue.Ring } type ConnWaitGroup struct { DialFunc func(context.Context, string, string) (net.Conn, error) @@ -53,7 +54,7 @@ func InitConsumer(opts ...Option) { kafkaConsumer.reader = reader //kafkaConsumer.shutdown = shutdown fmt.Printf("get data.\n") - GetData() + //GetData() fmt.Printf("shutdown now!!!\n") defer reader.Close() } @@ -136,7 +137,7 @@ func InitConsumer(opts ...Option) { // } // 获取消息发送到队列中去 -func GetData() { +func (kafkaConsumer *KafkaConsumer) GetData() { for { // select { // case <-time.After(leftTime): @@ -154,7 +155,12 @@ func GetData() { fmt.Printf("%v", err) } // 打印出消息,后续放入队列中去 - fmt.Printf("%v", res) + m := &job.Message{ + Timeout: 100 * time.Millisecond, + Payload: res.Value, + } + kafkaConsumer.ring.Queue(m) + fmt.Printf("%v", m) // } } } @@ -168,6 +174,7 @@ type Worker struct { // startOnce sync.Once opts options conn *kafkaAPI.Conn + //ring *queue.Ring } func NewWorker(opts ...Option) *Worker { From e29365aa1d1e2ca484d137d77f1340a0eed08081 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Fri, 19 Apr 2024 00:33:34 +0800 Subject: [PATCH 19/20] complete, ready to write test code --- kafka.go | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/kafka.go b/kafka.go index f0cc083..9c8fed0 100644 --- a/kafka.go +++ b/kafka.go @@ -17,7 +17,6 @@ import ( var _ core.Worker = (*Worker)(nil) // one consumer connect to kafka broker -var kafkaConsumer *KafkaConsumer type KafkaConsumer struct { //stopFlag int32 @@ -31,9 +30,9 @@ type ConnWaitGroup struct { } // start consumer, get message from kafka -func InitConsumer(opts ...Option) { +func InitConsumer(opts ...Option) *KafkaConsumer { //var err error - kafkaConsumer = &KafkaConsumer{ + tKafkaConsumer := &KafkaConsumer{ opts: newOptions(opts...), } @@ -44,19 +43,20 @@ func InitConsumer(opts ...Option) { // client, shutdown := newLocalClientAndTopic(kafkaConsumer.opts.addr, kafkaConsumer.opts.topic, // kafkaConsumer.opts.partition) reader := kafkaAPI.NewReader(kafkaAPI.ReaderConfig{ - Brokers: []string{fmt.Sprintf("%s:9092", kafkaConsumer.opts.addr)}, - Topic: kafkaConsumer.opts.topic, + Brokers: []string{fmt.Sprintf("%s:9092", tKafkaConsumer.opts.addr)}, + Topic: tKafkaConsumer.opts.topic, MinBytes: 1, MaxBytes: 10e6, MaxWait: 100 * time.Millisecond, //Logger: newTestKafkaLogger(t, ""), }) - kafkaConsumer.reader = reader + tKafkaConsumer.reader = reader //kafkaConsumer.shutdown = shutdown fmt.Printf("get data.\n") //GetData() fmt.Printf("shutdown now!!!\n") - defer reader.Close() + // defer reader.Close() + return tKafkaConsumer } // func newLocalClient(address string) (*kafkaAPI.Client, func()) { @@ -165,6 +165,7 @@ func (kafkaConsumer *KafkaConsumer) GetData() { } } +// 这里包含了回调函数,没有线程 type Worker struct { // //shutdown func() // @@ -175,6 +176,7 @@ type Worker struct { opts options conn *kafkaAPI.Conn //ring *queue.Ring + kafkaConsumer *KafkaConsumer } func NewWorker(opts ...Option) *Worker { @@ -191,20 +193,13 @@ func NewWorker(opts ...Option) *Worker { if err != nil { w.opts.logger.Fatal("can't connect kafka: ", err) } - + // 启动kakfaConsumer + w.kafkaConsumer = InitConsumer(opts...) + // 开始启动协程,获取数据 + go w.kafkaConsumer.GetData() return w } -func (w *Worker) startConsumer() (err error) { - - // err := nil - - // if err != nil { - // // - // } - return err -} - // Run start the worker func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error { return w.opts.runFunc(ctx, task) @@ -212,7 +207,9 @@ func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error { // Shutdown worker func (w *Worker) Shutdown() (err error) { + // 关闭fakfa的连接 + w.kafkaConsumer.ring.Shutdown() return err } @@ -240,8 +237,11 @@ func (w *Worker) Queue(job core.QueuedMessage) (err error) { return err } +// get data from ring +// 这个函数是倍queue.go中的coroutine调用的回调函数 func (w *Worker) Request() (core.QueuedMessage, error) { - _ = w.startConsumer() - return nil, queue.ErrNoTaskInQueue - + //_ = w.startConsumer() + //从ring中获取数据 + return w.kafkaConsumer.ring.Request() + //return nil, queue.ErrNoTaskInQueue } From 512366fbf949621b6f634f4ba6a0bead6f5bd7a3 Mon Sep 17 00:00:00 2001 From: "1916310286@qq.com" <1916310286@qq.com> Date: Fri, 19 Apr 2024 15:07:11 +0800 Subject: [PATCH 20/20] First test can fetch, but it still have problem --- go.mod | 5 ++++ go.sum | 4 +++ kafka.go | 2 ++ kafka_test.go | 74 +++++++++++++++++++++++++++++++-------------------- 4 files changed, 56 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 5bca5fc..d914e90 100644 --- a/go.mod +++ b/go.mod @@ -18,10 +18,15 @@ require ( go.uber.org/goleak v1.2.1 ) +require github.com/stretchr/testify v1.8.4 + require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/klauspost/compress v1.15.9 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) replace github.com/golang-queue/kafka => ../../ diff --git a/go.sum b/go.sum index 5efea86..37a446f 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2E github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -20,6 +22,7 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -71,6 +74,7 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kafka.go b/kafka.go index 9c8fed0..cf9f8ef 100644 --- a/kafka.go +++ b/kafka.go @@ -39,6 +39,8 @@ func InitConsumer(opts ...Option) *KafkaConsumer { _, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + //初始化ring + tKafkaConsumer.ring = queue.NewRing() // 创建client,创建topic,创建shutdown // client, shutdown := newLocalClientAndTopic(kafkaConsumer.opts.addr, kafkaConsumer.opts.topic, // kafkaConsumer.opts.partition) diff --git a/kafka_test.go b/kafka_test.go index 0b31f8c..1453207 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -1,39 +1,55 @@ package kafka import ( - "fmt" "testing" -) -// import ( -// "testing" -// "time" + "github.com/golang-queue/queue" + "github.com/stretchr/testify/assert" + //"github.com/stretchr/testify/assert" +) -// "github.com/golang-queue/queue" -// "github.com/stretchr/testify/assert" -// ) +// func TestFetchData(t *testing.T) { +// // m := mockMessage{ +// // Message: "foo", +// // } +// // w := NewWorker() +// // q, err := queue.NewQueue( +// // queue.WithWorker(w), +// // queue.WithWorkerCount(2), +// // ) +// // assert.NoError(t, err) +// // q.Start() +// // time.Sleep(50 * time.Millisecond) +// // q.Shutdown() +// // // can't queue task after shutdown +// // err = q.Queue(m) +// // assert.Error(t, err) +// // assert.Equal(t, queue.ErrQueueShutdown, err) +// // q.Wait() +// fmt.Printf("start\n") +// InitConsumer(WithAddr("localhost"), +// WithPartition(1), +// WithTopic("hello")) +// fmt.Printf("end\n") +// } -func TestFetchData(t *testing.T) { - // m := mockMessage{ - // Message: "foo", - // } - // w := NewWorker() - // q, err := queue.NewQueue( - // queue.WithWorker(w), - // queue.WithWorkerCount(2), - // ) - // assert.NoError(t, err) - // q.Start() - // time.Sleep(50 * time.Millisecond) +func TestNewWork(t *testing.T) { + w := + NewWorker( + WithAddr("localhost"), + WithNetwork("tcp"), + WithPartition(1), + WithTopic("hello"), + ) + q, err := queue.NewQueue( + queue.WithWorker(w), + queue.WithWorkerCount(1), + ) + assert.NoError(t, err) + q.Start() + // time.Sleep(100 * time.Millisecond) + // //assert.Equal(t, 1, int(q.metric.BusyWorkers())) + // time.Sleep(600 * time.Millisecond) // q.Shutdown() - // // can't queue task after shutdown - // err = q.Queue(m) - // assert.Error(t, err) - // assert.Equal(t, queue.ErrQueueShutdown, err) // q.Wait() - fmt.Printf("start\n") - InitConsumer(WithAddr("localhost"), - WithPartition(1), - WithTopic("hello")) - fmt.Printf("end\n") }