Skip to content

Commit

Permalink
chore: use nats kv store for tracking message
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Feb 23, 2025
1 parent abd7cdd commit ba02edd
Show file tree
Hide file tree
Showing 53 changed files with 2,340 additions and 2,393 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ ui-test: ui-build
./hack/test-ui.sh

.PHONY: image
image: clean ui-build dist/$(BINARY_NAME)-linux-$(HOST_ARCH)
image: clean dist/$(BINARY_NAME)-linux-$(HOST_ARCH)
ifdef GITHUB_ACTIONS
# The binary will be built in a separate Github Actions job
cp -pv numaflow-rs-linux-amd64 dist/numaflow-rs-linux-amd64
Expand Down
14 changes: 7 additions & 7 deletions cmd/commands/isbsvc_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import (
func NewISBSvcCreateCommand() *cobra.Command {

var (
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStreams []string
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStore string
)

command := &cobra.Command{
Expand Down Expand Up @@ -89,7 +89,7 @@ func NewISBSvcCreateCommand() *cobra.Command {
return fmt.Errorf("unsupported isb service type %q", isbSvcType)
}

if err = isbsClient.CreateBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStreams, opts...); err != nil {
if err = isbsClient.CreateBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStore, opts...); err != nil {
logger.Errorw("Failed to create buffers, buckets and side inputs store.", zap.Error(err))
return err
}
Expand All @@ -102,6 +102,6 @@ func NewISBSvcCreateCommand() *cobra.Command {
command.Flags().StringSliceVar(&buffers, "buffers", []string{}, "Buffers to create") // --buffers=a,b, --buffers=c
command.Flags().StringSliceVar(&buckets, "buckets", []string{}, "Buckets to create") // --buckets=xxa,xxb --buckets=xxc
command.Flags().StringVar(&sideInputsStore, "side-inputs-store", "", "Name of the side inputs store")
command.Flags().StringSliceVar(&servingSourceStreams, "serving-source-streams", []string{}, "Serving source streams to create") // --serving-source-streams=a,b, --serving-source-streams=c
command.Flags().StringVar(&servingSourceStore, "serving-source-store", "", "Serving source streams to create") // --serving-source-store=a
return command
}
14 changes: 7 additions & 7 deletions cmd/commands/isbsvc_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ import (

func NewISBSvcDeleteCommand() *cobra.Command {
var (
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStreams []string
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStore string
)

command := &cobra.Command{
Expand Down Expand Up @@ -74,7 +74,7 @@ func NewISBSvcDeleteCommand() *cobra.Command {
cmd.HelpFunc()(cmd, args)
return fmt.Errorf("unsupported isb service type %q", isbSvcType)
}
if err = isbsClient.DeleteBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStreams); err != nil {
if err = isbsClient.DeleteBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStore); err != nil {
logger.Errorw("Failed on buffers, buckets and side inputs store deletion.", zap.Error(err))
return err
}
Expand All @@ -86,6 +86,6 @@ func NewISBSvcDeleteCommand() *cobra.Command {
command.Flags().StringSliceVar(&buffers, "buffers", []string{}, "Buffers to delete") // --buffers=a,b, --buffers=c
command.Flags().StringSliceVar(&buckets, "buckets", []string{}, "Buckets to delete") // --buckets=xxa,xxb --buckets=xxc return command
command.Flags().StringVar(&sideInputsStore, "side-inputs-store", "", "Name of the side inputs store")
command.Flags().StringSliceVar(&servingSourceStreams, "serving-source-streams", []string{}, "Serving source streams to delete") // --serving-source-streams=a,b, --serving-source-streams=c
command.Flags().StringVar(&servingSourceStore, "serving-source-store", "", "Serving source store to delete") // --serving-source-store=a
return command
}
14 changes: 7 additions & 7 deletions cmd/commands/isbsvc_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import (
func NewISBSvcValidateCommand() *cobra.Command {

var (
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStreams []string
isbSvcType string
buffers []string
buckets []string
sideInputsStore string
servingSourceStore string
)

command := &cobra.Command{
Expand Down Expand Up @@ -77,7 +77,7 @@ func NewISBSvcValidateCommand() *cobra.Command {
return fmt.Errorf("unsupported isb service type")
}
_ = wait.ExponentialBackoffWithContext(ctx, sharedutil.DefaultRetryBackoff, func(_ context.Context) (bool, error) {
if err = isbsClient.ValidateBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStreams); err != nil {
if err = isbsClient.ValidateBuffersAndBuckets(ctx, buffers, buckets, sideInputsStore, servingSourceStore); err != nil {
logger.Infow("Buffers, buckets and side inputs store might have not been created yet, will retry if the limit is not reached", zap.Error(err))
return false, nil
}
Expand All @@ -95,7 +95,7 @@ func NewISBSvcValidateCommand() *cobra.Command {
command.Flags().StringSliceVar(&buffers, "buffers", []string{}, "Buffers to validate") // --buffers=a,b, --buffers=c
command.Flags().StringSliceVar(&buckets, "buckets", []string{}, "Buckets to validate") // --buckets=xxa,xxb --buckets=xxc
command.Flags().StringVar(&sideInputsStore, "side-inputs-store", "", "Name of the side inputs store")
command.Flags().StringSliceVar(&servingSourceStreams, "serving-source-streams", []string{}, "Serving source streams to validate") // --serving-source-streams=a,b, --serving-source-streams=c
command.Flags().StringVar(&servingSourceStore, "serving-source-store", "", "Serving source store to validate") // --serving-source-store=a

return command
}
5 changes: 3 additions & 2 deletions examples/1-simple-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ spec:
scale:
min: 1
udf:
builtin:
name: cat # A built-in UDF which simply cats the message
container:
image: quay.io/numaio/numaflow-go/map-flatmap-stream:stable
imagePullPolicy: Never
- name: out
scale:
min: 1
Expand Down
34 changes: 21 additions & 13 deletions examples/15-serving-source-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,42 @@ kind: Pipeline
metadata:
name: simple-pipeline
spec:
templates:
vertex:
metadata:
annotations:
numaflow.numaproj.io/callback: "true"
vertices:
- name: in
- name: serving-in
scale:
min: 1
source:
serving:
service: true
msgIDHeaderKey: "X-Request-ID"
msgIDHeaderKey: "X-Numaflow-Id"
store:
url: "redis://redis:6379"

- name: cat
scale:
min: 1
udf:
builtin:
name: cat # A built-in UDF which simply cats the message
- name: out
container:
image: quay.io/numaio/numaflow-go/map-forward-message:stable
env:
- name: RUST_BACKTRACE
value: "1"

- name: serve-sink
scale:
min: 1
sink:
log: {}
udsink:
container:
image: docker.intuit.com/quay-rmt/numaio/servesink:v1.5.0-alpha1
env:
- name: NUMAFLOW_CALLBACK_URL_KEY
value: "X-Numaflow-Callback-Url"
- name: NUMAFLOW_MSG_ID_HEADER_KEY
value: "X-Numaflow-Id"

edges:
- from: in
- from: serving-in
to: cat
- from: cat
to: out
to: serve-sink
10 changes: 2 additions & 8 deletions pkg/apis/numaflow/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,8 @@ func (p Pipeline) GetSideInputsStoreName() string {
return fmt.Sprintf("%s-%s", p.Namespace, p.Name)
}

func (p Pipeline) GetServingSourceStreamNames() []string {
var servingSourceNames []string
for _, srcVertex := range p.Spec.Vertices {
if srcVertex.IsASource() && srcVertex.Source.Serving != nil {
servingSourceNames = append(servingSourceNames, fmt.Sprintf("%s-%s-serving-source", p.Name, srcVertex.Name))
}
}
return servingSourceNames
func (p Pipeline) GetServingSourceStoreName() string {
return fmt.Sprintf("%s-%s", p.Namespace, p.Name)
}

func (p Pipeline) GetSideInputsManagerDeployments(req GetSideInputDeploymentReq) ([]*appv1.Deployment, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/numaflow/v1alpha1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func TestGetServingSourceStreamNames(t *testing.T) {
},
}
var expected []string
assert.Equal(t, expected, p.GetServingSourceStreamNames())
assert.Equal(t, expected, p.GetServingSourceStoreName())
})

t.Run("with serving sources", func(t *testing.T) {
Expand All @@ -545,7 +545,7 @@ func TestGetServingSourceStreamNames(t *testing.T) {
},
}
expected := []string{"test-pipeline-v1-serving-source", "test-pipeline-v2-serving-source"}
assert.Equal(t, expected, p.GetServingSourceStreamNames())
assert.Equal(t, expected, p.GetServingSourceStoreName())
})
}

Expand Down
62 changes: 62 additions & 0 deletions pkg/apis/proto/serving/v1/store.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

syntax = "proto3";

option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/serving/v1";
option java_package = "io.numaproj.numaflow.serving.v1";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

package map.v1;

service ServingStore {
rpc Put(PutRequest) returns (PutResponse);

rpc Get(GetRequest) returns (GetResponse);

rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

message Payload {
string id = 1;
bytes value = 2;
}


message PutRequest {
repeated Payload payload = 1;
}

message PutResponse {
bool success = 1;
}

message GetRequest {
string id = 1;
}

message GetResponse {
repeated Payload payload = 1;
}

/**
* ReadyResponse is the health check result.
*/
message ReadyResponse {
bool ready = 1;
}
6 changes: 3 additions & 3 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ func (ms *mockIsbSvcClient) GetBufferInfo(ctx context.Context, buffer string) (*
}, nil
}

func (ms *mockIsbSvcClient) CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string, opts ...isbsvc.CreateOption) error {
func (ms *mockIsbSvcClient) CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStore string, opts ...isbsvc.CreateOption) error {
return nil
}

func (ms *mockIsbSvcClient) DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string) error {
func (ms *mockIsbSvcClient) DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStore string) error {
return nil
}

func (ms *mockIsbSvcClient) ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string) error {
func (ms *mockIsbSvcClient) ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStore string) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/isbsvc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
// ISBService is an interface used to do the operations on ISBSvc
type ISBService interface {
// CreateBuffersAndBuckets creates buffers and buckets
CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string, opts ...CreateOption) error
CreateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStore string, opts ...CreateOption) error
// DeleteBuffersAndBuckets deletes buffers and buckets
DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStreams []string) error
DeleteBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStore string) error
// ValidateBuffersAndBuckets validates buffers and buckets
ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceSTreams []string) error
ValidateBuffersAndBuckets(ctx context.Context, buffers, buckets []string, sideInputsStore string, servingSourceStore string) error
// GetBufferInfo returns buffer info for the given buffer
GetBufferInfo(ctx context.Context, buffer string) (*BufferInfo, error)
// CreateWatermarkStores creates watermark stores
Expand Down
Loading

0 comments on commit ba02edd

Please sign in to comment.