Skip to content

Commit dd6cbaf

Browse files
committed
WIP fix handling of dosing decisions
1 parent ea8740a commit dd6cbaf

File tree

4 files changed

+31
-18
lines changed

4 files changed

+31
-18
lines changed

Diff for: data/events/alerts.go

+24-14
Original file line numberDiff line numberDiff line change
@@ -112,36 +112,46 @@ func isActivityAndActivityOnly(updatedFields []string) bool {
112112
return hasActivity
113113
}
114114

115+
// deviceDataIdentifiers are common to DosingDecision and Glucose.
116+
//
117+
// They facilitate queries of the latest data for alerts evaluation.
118+
type deviceDataIdentifiers struct {
119+
UploadID string `json:"uploadId,omitempty" bson:"uploadId,omitempty"`
120+
UserID string `json:"-" bson:"_userId,omitempty"`
121+
}
122+
115123
func (c *Consumer) consumeDeviceData(ctx context.Context,
116124
session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error {
117125

118-
datum := &Glucose{}
119-
if _, err := unmarshalMessageValue(msg.Value, datum); err != nil {
120-
return err
121-
}
122126
lgr := c.logger(ctx)
123-
lgr.WithField("data", datum).Info("consuming a device data message")
124127

125-
if datum.UserID == nil {
126-
return errors.New("Unable to retrieve alerts configs: userID is nil")
128+
id := &deviceDataIdentifiers{}
129+
if _, err := unmarshalMessageValue(msg.Value, id); err != nil {
130+
return errors.Wrap(err, "Unable to unmarshal device data message")
131+
}
132+
133+
lgr.WithField("identifiers", id).Info("consuming a device data message")
134+
135+
if id.UserID == "" {
136+
return errors.New("Unable to retrieve alerts configs: userID is empty")
127137
}
128-
if datum.UploadID == nil {
129-
return errors.New("Unable to retrieve alerts configs: DataSetID is nil")
138+
if id.UploadID == "" {
139+
return errors.New("Unable to retrieve alerts configs: DataSetID is empty")
130140
}
131-
ctx = log.NewContextWithLogger(ctx, lgr.WithField("followedUserID", *datum.UserID))
141+
ctx = log.NewContextWithLogger(ctx, lgr.WithField("followedUserID", id.UserID))
132142
lastComm := alerts.LastCommunication{
133-
UserID: *datum.UserID,
143+
UserID: id.UserID,
134144
LastReceivedDeviceData: time.Now(),
135-
DataSetID: *datum.UploadID,
145+
DataSetID: id.UploadID,
136146
}
137147
err := c.LastCommunications.RecordReceivedDeviceData(ctx, lastComm)
138148
if err != nil {
139149
lgr.WithError(err).Info("Unable to record device data received")
140150
}
141-
notes, err := c.Evaluator.EvaluateData(ctx, *datum.UserID, *datum.UploadID)
151+
notes, err := c.Evaluator.EvaluateData(ctx, id.UserID, id.UploadID)
142152
if err != nil {
143153
format := "Unable to evalaute device data triggered event for user %s"
144-
return errors.Wrapf(err, format, *datum.UserID)
154+
return errors.Wrapf(err, format, id.UserID)
145155
}
146156
for idx, note := range notes {
147157
lgr.WithField("idx", idx).WithField("note", note).Debug("notes")

Diff for: data/events/alerts_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -84,27 +84,27 @@ var _ = Describe("Consumer", func() {
8484
Expect(deps.Session.MarkCalls).To(Equal(1))
8585
})
8686

87-
It("errors out when the datum's UserID is nil", func() {
87+
It("errors out when the datum's UserID is empty", func() {
8888
blood := newTestStaticDatumMmolL(7.2)
8989
blood.UserID = nil
9090
kafkaMsg := newAlertsMockConsumerMessage(".data.deviceData.alerts", blood)
9191
docs := []interface{}{bson.M{}}
9292
c, deps := newConsumerTestDeps(docs)
9393

9494
Expect(c.Consume(deps.Context, deps.Session, kafkaMsg)).
95-
To(MatchError(ContainSubstring("userID is nil")))
95+
To(MatchError(ContainSubstring("userID is empty")))
9696
Expect(deps.Session.MarkCalls).To(Equal(0))
9797
})
9898

99-
It("errors out when the datum's UploadID is nil", func() {
99+
It("errors out when the datum's UploadID is empty", func() {
100100
blood := newTestStaticDatumMmolL(7.2)
101101
blood.UploadID = nil
102102
kafkaMsg := newAlertsMockConsumerMessage(".data.deviceData.alerts", blood)
103103
docs := []interface{}{bson.M{}}
104104
c, deps := newConsumerTestDeps(docs)
105105

106106
Expect(c.Consume(deps.Context, deps.Session, kafkaMsg)).
107-
To(MatchError(ContainSubstring("DataSetID is nil")))
107+
To(MatchError(ContainSubstring("DataSetID is empty")))
108108
Expect(deps.Session.MarkCalls).To(Equal(0))
109109
})
110110

Diff for: data/events/events.go

+2
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,14 @@ func CappedExponentialBinaryDelay(cap time.Duration) func(int) time.Duration {
171171

172172
type AlertsEventsConsumer struct {
173173
Consumer asyncevents.SaramaMessageConsumer
174+
Logger log.Logger
174175
}
175176

176177
func (c *AlertsEventsConsumer) Consume(ctx context.Context,
177178
session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage) error {
178179
err := c.Consumer.Consume(ctx, session, message)
179180
if err != nil {
181+
c.Logger.WithError(err).Info("Unable to consumer alerts event")
180182
session.MarkMessage(message, fmt.Sprintf("I have given up after error: %s", err))
181183
return err
182184
}

Diff for: data/service/service/standard.go

+1
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,7 @@ func (s *Standard) initializeAlertsEventsHandler() error {
507507
Topics: prefixedTopics,
508508
Sarama: commonConfig.SaramaConfig,
509509
MessageConsumer: &dataEvents.AlertsEventsConsumer{
510+
Logger: s.Logger(),
510511
Consumer: ec,
511512
},
512513
}

0 commit comments

Comments
 (0)