Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

care partner alerts #715

Open
wants to merge 54 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
5396aec
adds List and Get methods to alerts client
ewollesen May 6, 2024
feba9d8
lift Repeat out of the base alert config
ewollesen May 6, 2024
8dc7421
adds activity tracking to alert configurations
ewollesen May 6, 2024
4a92a30
adds auth endpoint to retrieve a user's device tokens
ewollesen May 7, 2024
0ce49c1
adds the ability to retrieve device tokens to the auth client
ewollesen May 7, 2024
8915bf3
remove unused device tokens repo from data
ewollesen May 7, 2024
b411501
adds a pusher client for sending APNs push notifications
ewollesen May 7, 2024
ac0a3a9
adapts sarama.Logger to implement log.Logger
ewollesen Jun 26, 2024
3207677
adapts go-common's asyncevents.SaramaEventsConsumer for alerts
ewollesen May 8, 2024
062e365
allow invites to set an upload id
ewollesen Jul 2, 2024
ef03a86
integrates an APNs pusher into data service
ewollesen Jul 8, 2024
2e8ee0e
adds Evaluate methods to alerts.Config
ewollesen Jul 8, 2024
7b512db
adds the alerts events consumer to the data service
ewollesen Jul 8, 2024
3582438
remove some debugging logs
ewollesen Jul 11, 2024
bbe380e
small fixes from code review
ewollesen Jul 12, 2024
fb6208c
rename Note => Notification
ewollesen Jul 12, 2024
c48da7e
one mock of DeviceTokenRepository is enough
ewollesen Jul 19, 2024
8b22dd4
add a topic cascading retry mechanism for care partner alerts
ewollesen Jul 24, 2024
68d30eb
modifies DelayingConsumer to use a message header instead of a delay
ewollesen Sep 16, 2024
d6e0e2c
just a little more explanation of cascading consumer
ewollesen Sep 20, 2024
4f8bfbf
don't read topic and consumer group id from runtime configuration
ewollesen Oct 7, 2024
2b0b8f5
there's no longer a need to inject server session tokens
ewollesen Dec 10, 2024
f1653b0
removes out-of-date tests
ewollesen Dec 10, 2024
bec3b5c
improve test coverage
ewollesen Dec 11, 2024
d5955b7
add data set id to alerts Evaluation, improve test coverage
ewollesen Dec 11, 2024
099a7ef
implement no communication alerts
ewollesen Oct 24, 2024
3f2bf06
evaluate not looping conditions part 1
ewollesen Dec 12, 2024
e105d0b
re-working to handle alert resolution and sent tracking
ewollesen Jan 21, 2025
6323462
reduce kafka topics for care partner alerts outside of production
ewollesen Feb 4, 2025
80d09a9
bump go-common to get kafka CDC updates for CPA
ewollesen Feb 4, 2025
ca1dd3f
rename nontypesglucose -> dataBloodGlucose
ewollesen Feb 10, 2025
a802f02
renames Recorder & UsersWithoutCommunications
ewollesen Feb 10, 2025
1ebe778
pass a log.Logger to EvaluateNoCommunication
ewollesen Feb 10, 2025
10883b4
remove un-needed comment
ewollesen Feb 10, 2025
5494b1d
move care partner task definition to alerts package
ewollesen Feb 10, 2025
215ffeb
make GetRunnerDeadline() use a multiple of GetRunnerDurationMaximum()
ewollesen Feb 10, 2025
597869a
replace magic number with named constant
ewollesen Feb 11, 2025
58745b7
adds environment-based config for alerts retry delays
ewollesen Feb 12, 2025
503eeea
modify task service to allow tasks to repeat ASAP
ewollesen Feb 12, 2025
e8daea8
the alerts task's GetRunnerTimeout should be a multiple of its maximu…
ewollesen Feb 19, 2025
83cc656
rename API endpoint /v1/users/overdue_communications => /v1/overdue_c…
ewollesen Feb 19, 2025
fab9ceb
remove unused config struct
ewollesen Feb 19, 2025
de0015b
validate overdue communication minimum value
ewollesen Feb 19, 2025
e74e810
assume no alerts retry topics by default
ewollesen Feb 19, 2025
271772c
fix a receiver name to match others defined on the struct
ewollesen Feb 19, 2025
18d97b8
merge alertsDataRepo into DataRepository
ewollesen Feb 19, 2025
687c453
rename method receiver
ewollesen Feb 21, 2025
93a978a
consolidate care partner alerts pusher configs
ewollesen Feb 21, 2025
44fecf7
mark skipped activity updates
ewollesen Mar 12, 2025
ea8740a
prefix kafka consumer group ids
ewollesen Mar 14, 2025
452934e
add logging when an alerts event can't be consumed
ewollesen Mar 18, 2025
b33531a
deserialize CPA kafka deviceData messages into a types.Blood
ewollesen Mar 18, 2025
4aabbaa
add a context logger
ewollesen Mar 21, 2025
e1394ad
improved logging of marked messages
ewollesen Mar 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ ci-test-watch: ginkgo
go-test:
. ./env.test.sh && $(TIMING_CMD) go test $(GOTEST_FLAGS) $(GOTEST_PKGS)

go-ci-test: GOTEST_FLAGS += -count=1 -race -shuffle=on -cover
go-ci-test: override GOTEST_FLAGS += -count=1 -race -shuffle=on -cover
go-ci-test: GOTEST_PKGS = ./...
go-ci-test: go-test

Expand Down
80 changes: 57 additions & 23 deletions alerts/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package alerts
import (
"context"
"net/http"
"time"

"github.com/kelseyhightower/envconfig"

"github.com/tidepool-org/platform/auth"
"github.com/tidepool-org/platform/client"
"github.com/tidepool-org/platform/errors"
platformlog "github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/log/null"
"github.com/tidepool-org/platform/platform"
Expand All @@ -16,22 +17,20 @@ import (

// Client for managing alerts configs.
type Client struct {
client PlatformClient
logger platformlog.Logger
tokenProvider auth.ServerSessionTokenProvider
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Darin upstreamed some changes that remove the necessity of a separate token provider.

client PlatformClient
logger platformlog.Logger
}

// NewClient builds a client for interacting with alerts API endpoints.
//
// If no logger is provided, a null logger is used.
func NewClient(client PlatformClient, tokenProvider auth.ServerSessionTokenProvider, logger platformlog.Logger) *Client {
func NewClient(client PlatformClient, logger platformlog.Logger) *Client {
if logger == nil {
logger = null.NewLogger()
}
return &Client{
client: client,
logger: logger,
tokenProvider: tokenProvider,
client: client,
logger: logger,
}
}

Expand All @@ -44,34 +43,69 @@ type PlatformClient interface {

// request performs common operations before passing a request off to the
// underlying platform.Client.
func (c *Client) request(ctx context.Context, method, url string, body any) error {
func (c *Client) request(ctx context.Context, method, url string, reqBody, resBody any) error {
// Platform's client.Client expects a logger to exist in the request's
// context. If it doesn't exist, request processing will panic.
loggingCtx := platformlog.NewContextWithLogger(ctx, c.logger)
// Make sure the auth token is injected into the request's headers.
return c.requestWithAuth(loggingCtx, method, url, body)
}

// requestWithAuth injects an auth token before calling platform.Client.RequestData.
//
// At time of writing, this is the only way to inject credentials into
// platform.Client. It might be nice to be able to use a mutator, but the auth
// is specifically handled by the platform.Client via the context field, and
// if left blank, platform.Client errors.
func (c *Client) requestWithAuth(ctx context.Context, method, url string, body any) error {
return c.client.RequestData(auth.NewContextWithServerSessionTokenProvider(ctx, c.tokenProvider), method, url, nil, body, nil)
return c.client.RequestData(loggingCtx, method, url, nil, reqBody, resBody)
}

// Upsert updates cfg if it exists or creates it if it doesn't.
func (c *Client) Upsert(ctx context.Context, cfg *Config) error {
url := c.client.ConstructURL("v1", "users", cfg.FollowedUserID, "followers", cfg.UserID, "alerts")
return c.request(ctx, http.MethodPost, url, cfg)
return c.request(ctx, http.MethodPost, url, cfg, nil)
}

// Delete the alerts config.
func (c *Client) Delete(ctx context.Context, cfg *Config) error {
url := c.client.ConstructURL("v1", "users", cfg.FollowedUserID, "followers", cfg.UserID, "alerts")
return c.request(ctx, http.MethodDelete, url, nil)
return c.request(ctx, http.MethodDelete, url, nil, nil)
}

// Get a user's alerts configuration for the followed user.
func (c *Client) Get(ctx context.Context, followedUserID, userID string) (*Config, error) {
url := c.client.ConstructURL("v1", "users", followedUserID, "followers", userID, "alerts")
config := &Config{}
err := c.request(ctx, http.MethodGet, url, nil, config)
if err != nil {
return nil, errors.Wrap(err, "Unable to request alerts config")
}
return config, nil
}

// List the alerts configurations that follow the given user.
//
// This method should only be called via an authenticated service session.
func (c *Client) List(ctx context.Context, followedUserID string) ([]*Config, error) {
url := c.client.ConstructURL("v1", "users", followedUserID, "followers", "alerts")
configs := []*Config{}
err := c.request(ctx, http.MethodGet, url, nil, &configs)
if err != nil {
c.logger.Debugf("unable to request alerts configs list: %+v %T", err, err)
return nil, errors.Wrap(err, "Unable to request alerts configs list")
}
return configs, nil
}

// OverdueCommunications are those that haven't communicated in some time.
//
// This method should only be called via an authenticated service session.
func (c *Client) OverdueCommunications(ctx context.Context) ([]LastCommunication, error) {
url := c.client.ConstructURL("v1", "overdue_communications")
lastComms := []LastCommunication{}
err := c.request(ctx, http.MethodGet, url, nil, &lastComms)
if err != nil {
c.logger.Debugf("getting users overdue to communicate: \"%+v\" %T", err, err)
return nil, errors.Wrap(err, "Unable to list overdue communications")
}
return lastComms, nil
}

// LastCommunication records the last time data was received from a user.
type LastCommunication struct {
UserID string `bson:"userId" json:"userId"`
DataSetID string `bson:"dataSetId" json:"dataSetId"`
LastReceivedDeviceData time.Time `bson:"lastReceivedDeviceData" json:"lastReceivedDeviceData"`
}

// ConfigLoader abstracts the method by which config values are loaded.
Expand Down
123 changes: 78 additions & 45 deletions alerts/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/tidepool-org/platform/auth"
"github.com/tidepool-org/platform/client"
"github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/log/null"
"github.com/tidepool-org/platform/platform"
)

const testToken = "auth-me"
const testUserID = "test-user-id"
const testFollowedUserID = "test-followed-user-id"
const testDataSetID = "upid_000000000000"

var _ = Describe("Client", func() {
var test404Server, test200Server *httptest.Server
var testAuthServer func(*string) *httptest.Server
var test404Server *httptest.Server
var test200Server func(string) *httptest.Server

BeforeEach(func() {
t := GinkgoT()
Expand All @@ -28,87 +30,118 @@ var _ = Describe("Client", func() {
test404Server = testServer(t, func(w http.ResponseWriter, r *http.Request) {
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
})
test200Server = testServer(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
testAuthServer = func(token *string) *httptest.Server {
test200Server = func(resp string) *httptest.Server {
return testServer(t, func(w http.ResponseWriter, r *http.Request) {
*token = r.Header.Get(auth.TidepoolSessionTokenHeaderKey)
w.WriteHeader(http.StatusOK)
w.Write([]byte(resp))
})
}
})

Context("Delete", func() {
It("returns an error on non-200 responses", func() {
ItReturnsAnErrorOnNon200Responses := func(f func(context.Context, *Client) error) {
GinkgoHelper()
It("returns an error on non-200 respnoses", func() {
client, ctx := newAlertsClientTest(test404Server)
err := client.Delete(ctx, &Config{})
err := f(ctx, client)
Expect(err).Should(HaveOccurred())
Expect(err).To(MatchError(ContainSubstring("resource not found")))
})
}

It("returns nil on success", func() {
client, ctx := newAlertsClientTest(test200Server)
err := client.Delete(ctx, &Config{})
Expect(err).ShouldNot(HaveOccurred())
ItReturnsANilErrorOnSuccess := func(resp string, f func(context.Context, *Client) error) {
GinkgoHelper()
It("returns a nil error on success", func() {
client, ctx := newAlertsClientTest(test200Server(resp))
err := f(ctx, client)
Expect(err).To(Succeed())
})
}

Context("Delete", func() {
ItReturnsAnErrorOnNon200Responses(func(ctx context.Context, client *Client) error {
return client.Delete(ctx, &Config{})
})

It("injects an auth token", func() {
token := ""
client, ctx := newAlertsClientTest(testAuthServer(&token))
_ = client.Delete(ctx, &Config{})
Expect(token).To(Equal(testToken))
ItReturnsANilErrorOnSuccess("", func(ctx context.Context, client *Client) error {
return client.Delete(ctx, &Config{})
})
})

Context("Upsert", func() {
It("returns an error on non-200 responses", func() {
client, ctx := newAlertsClientTest(test404Server)
err := client.Upsert(ctx, &Config{})
Expect(err).Should(HaveOccurred())
Expect(err).To(MatchError(ContainSubstring("resource not found")))
ItReturnsAnErrorOnNon200Responses(func(ctx context.Context, client *Client) error {
return client.Upsert(ctx, &Config{})
})

ItReturnsANilErrorOnSuccess("", func(ctx context.Context, client *Client) error {
return client.Upsert(ctx, &Config{})
})
})

Context("Get", func() {
ItReturnsAnErrorOnNon200Responses(func(ctx context.Context, client *Client) error {
_, err := client.Get(ctx, testFollowedUserID, testUserID)
return err
})

It("returns nil on success", func() {
client, ctx := newAlertsClientTest(test200Server)
err := client.Upsert(ctx, &Config{})
Expect(err).ShouldNot(HaveOccurred())
ret := `{
"userId": "14ee703f-ca9b-4a6b-9ce3-41d886514e7f",
"followedUserId": "ce5863bc-cc0b-4177-97d7-e8de0c558820",
"uploadId": "upid_00000000000000000000000000000000"
}`
ItReturnsANilErrorOnSuccess(ret, func(ctx context.Context, client *Client) error {
_, err := client.Get(ctx, testFollowedUserID, testUserID)
return err
})
})

It("injects an auth token", func() {
token := ""
client, ctx := newAlertsClientTest(testAuthServer(&token))
_ = client.Upsert(ctx, &Config{})
Expect(token).To(Equal(testToken))
Context("List", func() {
ItReturnsAnErrorOnNon200Responses(func(ctx context.Context, client *Client) error {
_, err := client.List(ctx, "")
return err
})

ItReturnsANilErrorOnSuccess("[]", func(ctx context.Context, client *Client) error {
_, err := client.List(ctx, "")
return err
})
})

Context("OverdueCommunications", func() {
ItReturnsAnErrorOnNon200Responses(func(ctx context.Context, client *Client) error {
_, err := client.OverdueCommunications(ctx)
return err
})

ItReturnsANilErrorOnSuccess("[]", func(ctx context.Context, client *Client) error {
_, err := client.OverdueCommunications(ctx)
return err
})
})
})

func buildTestClient(s *httptest.Server) *Client {
pCfg := &platform.Config{
Config: &client.Config{
Address: s.URL,
},
Config: &client.Config{Address: s.URL},
ServiceSecret: "auth-me",
}
token := mockTokenProvider(testToken)
pc, err := platform.NewClient(pCfg, platform.AuthorizeAsService)
Expect(err).ToNot(HaveOccurred())
client := NewClient(pc, token, null.NewLogger())
client := NewClient(pc, null.NewLogger())
return client
}

func newAlertsClientTest(server *httptest.Server) (*Client, context.Context) {
return buildTestClient(server), contextWithNullLogger()
}

func contextWithNullLogger() context.Context {
return log.NewContextWithLogger(context.Background(), null.NewLogger())
func contextWithNullLoggerDeluxe() (context.Context, log.Logger) {
lgr := null.NewLogger()
return log.NewContextWithLogger(context.Background(), lgr), lgr
}

type mockTokenProvider string

func (p mockTokenProvider) ServerSessionToken() (string, error) {
return string(p), nil
func contextWithNullLogger() context.Context {
ctx, _ := contextWithNullLoggerDeluxe()
return ctx
}

func testServer(t GinkgoTInterface, handler http.HandlerFunc) *httptest.Server {
Expand Down
Loading