Skip to content

Commit 7fe6188

Browse files
wmedvedergdoliveira
authored andcommitted
kogito-serverless-operator-353: Enable the sending of the process definition event when the data-index is present (apache#358)
1 parent 1691e2f commit 7fe6188

9 files changed

+182
-458
lines changed

controllers/platform/k8s.go

+28-28
Original file line numberDiff line numberDiff line change
@@ -63,31 +63,31 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S
6363
}
6464

6565
if platform.Spec.Services.DataIndex != nil {
66-
if err := createServiceComponents(ctx, action.client, platform, services.NewDataIndexService(platform)); err != nil {
66+
if err := createServiceComponents(ctx, action.client, platform, services.NewDataIndexHandler(platform)); err != nil {
6767
return nil, err
6868
}
6969
}
7070

7171
if platform.Spec.Services.JobService != nil {
72-
if err := createServiceComponents(ctx, action.client, platform, services.NewJobService(platform)); err != nil {
72+
if err := createServiceComponents(ctx, action.client, platform, services.NewJobServiceHandler(platform)); err != nil {
7373
return nil, err
7474
}
7575
}
7676

7777
return platform, nil
7878
}
7979

80-
func createServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
81-
if err := createConfigMap(ctx, client, platform, ps); err != nil {
80+
func createServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
81+
if err := createConfigMap(ctx, client, platform, psh); err != nil {
8282
return err
8383
}
84-
if err := createDeployment(ctx, client, platform, ps); err != nil {
84+
if err := createDeployment(ctx, client, platform, psh); err != nil {
8585
return err
8686
}
87-
return createService(ctx, client, platform, ps)
87+
return createService(ctx, client, platform, psh)
8888
}
8989

90-
func createDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
90+
func createDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
9191
readyProbe := &corev1.Probe{
9292
ProbeHandler: corev1.ProbeHandler{
9393
HTTPGet: &corev1.HTTPGetAction{
@@ -105,9 +105,9 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
105105
liveProbe := readyProbe.DeepCopy()
106106
liveProbe.ProbeHandler.HTTPGet.Path = common.QuarkusHealthPathLive
107107
dataDeployContainer := &corev1.Container{
108-
Image: ps.GetServiceImageName(constants.PersistenceTypeEphemeral),
109-
Env: ps.GetEnvironmentVariables(),
110-
Resources: ps.GetPodResourceRequirements(),
108+
Image: psh.GetServiceImageName(constants.PersistenceTypeEphemeral),
109+
Env: psh.GetEnvironmentVariables(),
110+
Resources: psh.GetPodResourceRequirements(),
111111
ReadinessProbe: readyProbe,
112112
LivenessProbe: liveProbe,
113113
Ports: []corev1.ContainerPort{
@@ -125,17 +125,17 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
125125
},
126126
},
127127
}
128-
dataDeployContainer = ps.ConfigurePersistence(dataDeployContainer)
129-
dataDeployContainer, err := ps.MergeContainerSpec(dataDeployContainer)
128+
dataDeployContainer = psh.ConfigurePersistence(dataDeployContainer)
129+
dataDeployContainer, err := psh.MergeContainerSpec(dataDeployContainer)
130130
if err != nil {
131131
return err
132132
}
133133

134134
// immutable
135-
dataDeployContainer.Name = ps.GetContainerName()
135+
dataDeployContainer.Name = psh.GetContainerName()
136136

137-
replicas := ps.GetReplicaCount()
138-
lbl, selectorLbl := getLabels(platform, ps)
137+
replicas := psh.GetReplicaCount()
138+
lbl, selectorLbl := getLabels(platform, psh)
139139
dataDeploySpec := appsv1.DeploymentSpec{
140140
Selector: &metav1.LabelSelector{
141141
MatchLabels: selectorLbl,
@@ -152,7 +152,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
152152
VolumeSource: corev1.VolumeSource{
153153
ConfigMap: &corev1.ConfigMapVolumeSource{
154154
LocalObjectReference: corev1.LocalObjectReference{
155-
Name: ps.GetServiceCmName(),
155+
Name: psh.GetServiceCmName(),
156156
},
157157
},
158158
},
@@ -162,7 +162,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
162162
},
163163
}
164164

165-
dataDeploySpec.Template.Spec, err = ps.MergePodSpec(dataDeploySpec.Template.Spec)
165+
dataDeploySpec.Template.Spec, err = psh.MergePodSpec(dataDeploySpec.Template.Spec)
166166
if err != nil {
167167
return err
168168
}
@@ -171,7 +171,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
171171
dataDeploy := &appsv1.Deployment{
172172
ObjectMeta: metav1.ObjectMeta{
173173
Namespace: platform.Namespace,
174-
Name: ps.GetServiceName(),
174+
Name: psh.GetServiceName(),
175175
Labels: lbl,
176176
}}
177177
if err := controllerutil.SetControllerReference(platform, dataDeploy, client.Scheme()); err != nil {
@@ -192,8 +192,8 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
192192
return nil
193193
}
194194

195-
func createService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
196-
lbl, selectorLbl := getLabels(platform, ps)
195+
func createService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
196+
lbl, selectorLbl := getLabels(platform, psh)
197197
dataSvcSpec := corev1.ServiceSpec{
198198
Ports: []corev1.ServicePort{
199199
{
@@ -208,7 +208,7 @@ func createService(ctx context.Context, client client.Client, platform *operator
208208
dataSvc := &corev1.Service{
209209
ObjectMeta: metav1.ObjectMeta{
210210
Namespace: platform.Namespace,
211-
Name: ps.GetServiceName(),
211+
Name: psh.GetServiceName(),
212212
Labels: lbl,
213213
}}
214214
if err := controllerutil.SetControllerReference(platform, dataSvc, client.Scheme()); err != nil {
@@ -229,26 +229,26 @@ func createService(ctx context.Context, client client.Client, platform *operator
229229
return nil
230230
}
231231

232-
func getLabels(platform *operatorapi.SonataFlowPlatform, ps services.Platform) (map[string]string, map[string]string) {
232+
func getLabels(platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (map[string]string, map[string]string) {
233233
lbl := map[string]string{
234234
workflowproj.LabelApp: platform.Name,
235-
workflowproj.LabelService: ps.GetServiceName(),
235+
workflowproj.LabelService: psh.GetServiceName(),
236236
}
237237
selectorLbl := map[string]string{
238-
workflowproj.LabelService: ps.GetServiceName(),
238+
workflowproj.LabelService: psh.GetServiceName(),
239239
}
240240
return lbl, selectorLbl
241241
}
242242

243-
func createConfigMap(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
244-
handler, err := services.NewServiceAppPropertyHandler(ps)
243+
func createConfigMap(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
244+
handler, err := services.NewServiceAppPropertyHandler(psh)
245245
if err != nil {
246246
return err
247247
}
248-
lbl, _ := getLabels(platform, ps)
248+
lbl, _ := getLabels(platform, psh)
249249
configMap := &corev1.ConfigMap{
250250
ObjectMeta: metav1.ObjectMeta{
251-
Name: ps.GetServiceCmName(),
251+
Name: psh.GetServiceCmName(),
252252
Namespace: platform.Namespace,
253253
Labels: lbl,
254254
},

controllers/platform/services/properties.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ var (
4747

4848
type serviceAppPropertyHandler struct {
4949
userProperties string
50-
platform Platform
50+
serviceHandler PlatformServiceHandler
5151
defaultMutableProperties *properties.Properties
5252
}
5353

@@ -59,9 +59,9 @@ type ServiceAppPropertyHandler interface {
5959
// NewServiceAppPropertyHandler creates the default service configurations property handler
6060
// The set of properties is initialized with the operator provided immutable properties.
6161
// The set of defaultMutableProperties is initialized with the operator provided properties that the user might override.
62-
func NewServiceAppPropertyHandler(ps Platform) (ServiceAppPropertyHandler, error) {
62+
func NewServiceAppPropertyHandler(serviceHandler PlatformServiceHandler) (ServiceAppPropertyHandler, error) {
6363
handler := &serviceAppPropertyHandler{}
64-
props, err := ps.GenerateServiceProperties()
64+
props, err := serviceHandler.GenerateServiceProperties()
6565
if err != nil {
6666
return nil, err
6767
}
@@ -83,7 +83,7 @@ func (a *serviceAppPropertyHandler) Build() string {
8383
props, propErr = properties.LoadString(a.userProperties)
8484
}
8585
if propErr != nil {
86-
klog.V(log.D).InfoS("Can't load user's property", "service", a.platform.GetServiceName(), "properties", a.userProperties)
86+
klog.V(log.D).InfoS("Can't load user's property", "service", a.serviceHandler.GetServiceName(), "properties", a.userProperties)
8787
props = properties.NewProperties()
8888
}
8989
props = utils.NewApplicationPropertiesBuilder().
@@ -158,10 +158,12 @@ func generateReactiveURL(postgresSpec *operatorapi.PersistencePostgreSql, schema
158158
// Never nil.
159159
func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
160160
props := properties.NewProperties()
161-
props.Set(constants.KogitoProcessInstancesEnabled, "false")
161+
props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "false")
162+
props.Set(constants.KogitoProcessInstancesEventsEnabled, "false")
162163
if workflow != nil && !profiles.IsDevProfile(workflow) && dataIndexEnabled(platform) {
163-
props.Set(constants.KogitoProcessInstancesEnabled, "true")
164-
di := NewDataIndexService(platform)
164+
props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "true")
165+
props.Set(constants.KogitoProcessInstancesEventsEnabled, "true")
166+
di := NewDataIndexHandler(platform)
165167
p, err := di.GenerateWorkflowProperties()
166168
if err != nil {
167169
return nil, err
@@ -182,7 +184,7 @@ func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, plat
182184
props.Set(constants.JobServiceRequestEventsConnector, constants.QuarkusHTTP)
183185
props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol))
184186
if workflow != nil && !profiles.IsDevProfile(workflow) && jobServiceEnabled(platform) {
185-
js := NewJobService(platform)
187+
js := NewJobServiceHandler(platform)
186188
p, err := js.GenerateWorkflowProperties()
187189
if err != nil {
188190
return nil, err

controllers/platform/services/properties_services_test.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@
2020
package services
2121

2222
import (
23+
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
2324
. "github.com/onsi/ginkgo/v2"
2425
. "github.com/onsi/gomega"
2526

26-
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
27-
2827
"github.com/magiconair/properties"
2928
)
3029

@@ -33,23 +32,23 @@ var (
3332
disabled = false
3433
)
3534

36-
var _ = Describe("Platform properties", func() {
35+
var _ = Describe("PlatformServiceHandler properties", func() {
3736

3837
var _ = Context("for service properties", func() {
3938

4039
var _ = Context("defining the application properties generated for the deployment of the", func() {
4140

4241
DescribeTable("Job Service",
4342
func(plfm *operatorapi.SonataFlowPlatform, expectedProperties *properties.Properties) {
44-
js := NewJobService(plfm)
43+
js := NewJobServiceHandler(plfm)
4544
handler, err := NewServiceAppPropertyHandler(js)
4645
Expect(err).NotTo(HaveOccurred())
4746
p, err := properties.LoadString(handler.Build())
4847
Expect(err).NotTo(HaveOccurred())
4948
p.Sort()
5049
Expect(p).To(Equal(expectedProperties))
5150
},
52-
Entry("with an empty spec", generatePlatform(emptyJobServiceSpec()),
51+
Entry("with an empty spec", generatePlatform(emptyJobServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default")),
5352
generateJobServiceDeploymentDevProperties()),
5453
Entry("with enabled field undefined and with ephemeral persistence",
5554
generatePlatform(setJobServiceEnabledValue(nil), setPlatformName("foo"), setPlatformNamespace("default")),
@@ -78,15 +77,15 @@ var _ = Describe("Platform properties", func() {
7877
)
7978

8079
DescribeTable("Data Index", func(plfm *operatorapi.SonataFlowPlatform, expectedProperties *properties.Properties) {
81-
di := NewDataIndexService(plfm)
80+
di := NewDataIndexHandler(plfm)
8281
handler, err := NewServiceAppPropertyHandler(di)
8382
Expect(err).NotTo(HaveOccurred())
8483
p, err := properties.LoadString(handler.Build())
8584
Expect(err).NotTo(HaveOccurred())
8685
p.Sort()
8786
Expect(p).To(Equal(expectedProperties))
8887
},
89-
Entry("with ephemeral persistence", generatePlatform(emptyDataIndexServiceSpec()), generateDataIndexDeploymentProperties()),
88+
Entry("with ephemeral persistence", generatePlatform(emptyDataIndexServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default")), generateDataIndexDeploymentProperties()),
9089
Entry("with postgreSQL persistence", generatePlatform(emptyDataIndexServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default"), setJobServiceJDBC("jdbc:postgresql://postgres:5432/sonataflow?currentSchema=myschema")),
9190
generateDataIndexDeploymentProperties()),
9291
)
@@ -98,6 +97,7 @@ var _ = Describe("Platform properties", func() {
9897

9998
func generateJobServiceDeploymentDevProperties() *properties.Properties {
10099
p := properties.NewProperties()
100+
p.Set("kogito.service.url", "http://foo-jobs-service.default")
101101
p.Set("quarkus.devservices.enabled", "false")
102102
p.Set("quarkus.http.host", "0.0.0.0")
103103
p.Set("quarkus.http.port", "8080")
@@ -109,6 +109,7 @@ func generateJobServiceDeploymentDevProperties() *properties.Properties {
109109

110110
func generateDataIndexDeploymentProperties() *properties.Properties {
111111
p := properties.NewProperties()
112+
p.Set("kogito.service.url", "http://foo-data-index-service.default")
112113
p.Set("quarkus.devservices.enabled", "false")
113114
p.Set("quarkus.http.host", "0.0.0.0")
114115
p.Set("quarkus.http.port", "8080")
@@ -120,6 +121,7 @@ func generateDataIndexDeploymentProperties() *properties.Properties {
120121

121122
func generateJobServiceDeploymentWithPostgreSQLProperties() *properties.Properties {
122123
p := properties.NewProperties()
124+
p.Set("kogito.service.url", "http://foo-jobs-service.default")
123125
p.Set("quarkus.devservices.enabled", "false")
124126
p.Set("quarkus.http.host", "0.0.0.0")
125127
p.Set("quarkus.http.port", "8080")
@@ -132,6 +134,7 @@ func generateJobServiceDeploymentWithPostgreSQLProperties() *properties.Properti
132134

133135
func generateJobServiceDeploymentWithDataIndexAndEphemeralProperties() *properties.Properties {
134136
p := properties.NewProperties()
137+
p.Set("kogito.service.url", "http://foo-jobs-service.default")
135138
p.Set("kogito.jobs-service.http.job-status-change-events", "true")
136139
p.Set("mp.messaging.outgoing.kogito-job-service-job-status-events-http.url", "http://foo-data-index-service.default/jobs")
137140
p.Set("quarkus.devservices.enabled", "false")
@@ -145,6 +148,7 @@ func generateJobServiceDeploymentWithDataIndexAndEphemeralProperties() *properti
145148

146149
func generateJobServiceDeploymentWithDataIndexAndPostgreSQLProperties() *properties.Properties {
147150
p := properties.NewProperties()
151+
p.Set("kogito.service.url", "http://foo-jobs-service.default")
148152
p.Set("kogito.jobs-service.http.job-status-change-events", "true")
149153
p.Set("mp.messaging.outgoing.kogito-job-service-job-status-events-http.url", "http://foo-data-index-service.default/jobs")
150154
p.Set("quarkus.devservices.enabled", "false")

0 commit comments

Comments
 (0)