Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go: XINFO GROUPS. #3106

Merged
merged 5 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Go: Add ZRangeStore ([3105](https://github.com/valkey-io/valkey-glide/pull/3105))
* Go: Add `ZUNION` ([#3119](https://github.com/valkey-io/valkey-glide/pull/3119))
* Go: Add `ZUNIONSTORE` ([#3136](https://github.com/valkey-io/valkey-glide/pull/3136))
* Go: Add `XINFO GROUPS` ([#3106](https://github.com/valkey-io/valkey-glide/pull/3106))

#### Breaking Changes

Expand Down
22 changes: 22 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5813,6 +5813,28 @@ func (client *baseClient) XInfoStreamFullWithOptions(
return handleStringToAnyMapResponse(result)
}

// Returns the list of all consumer groups and their attributes for the stream stored at `key`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
//
// Return value:
//
// An array of [api.XInfoGroupInfo], where each element represents the
// attributes of a consumer group for the stream at `key`.
//
// [valkey.io]: https://valkey.io/commands/xinfo-groups/
func (client *baseClient) XInfoGroups(key string) ([]XInfoGroupInfo, error) {
response, err := client.executeCommand(C.XInfoGroups, []string{key})
if err != nil {
return nil, err
}
return handleXInfoGroupsResponse(response)
}

// Reads or modifies the array of bits representing the string that is held at key
// based on the specified sub commands.
//
Expand Down
50 changes: 50 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,56 @@ func handleXPendingDetailResponse(response *C.struct_CommandResponse) ([]XPendin
return pendingDetails, nil
}

func handleXInfoGroupsResponse(response *C.struct_CommandResponse) ([]XInfoGroupInfo, error) {
defer C.free_command_response(response)

typeErr := checkResponseType(response, C.Array, false)
if typeErr != nil {
return nil, typeErr
}
arrData, err := parseArray(response)
if err != nil {
return nil, err
}
converted, err := arrayConverter[map[string]interface{}]{
nil,
false,
}.convert(arrData)
if err != nil {
return nil, err
}
arr, ok := converted.([]map[string]interface{})
if !ok {
return nil, &errors.RequestError{Msg: fmt.Sprintf("unexpected type: %T", converted)}
}

result := make([]XInfoGroupInfo, 0, len(arr))

for _, group := range arr {
info := XInfoGroupInfo{
Name: group["name"].(string),
Consumers: group["consumers"].(int64),
Pending: group["pending"].(int64),
LastDeliveredId: group["last-delivered-id"].(string),
}
switch lag := group["lag"].(type) {
case int64:
info.Lag = CreateInt64Result(lag)
default:
info.Lag = CreateNilInt64Result()
}
switch entriesRead := group["entries-read"].(type) {
case int64:
info.EntriesRead = CreateInt64Result(entriesRead)
default:
info.EntriesRead = CreateNilInt64Result()
}
result = append(result, info)
}

return result, nil
}

func handleStringToAnyMapResponse(response *C.struct_CommandResponse) (map[string]interface{}, error) {
defer C.free_command_response(response)

Expand Down
20 changes: 20 additions & 0 deletions go/api/response_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,23 @@ type XPendingDetail struct {
func CreateNilXPendingSummary() XPendingSummary {
return XPendingSummary{0, CreateNilStringResult(), CreateNilStringResult(), make([]ConsumerPendingMessage, 0)}
}

// XInfoGroupInfo represents a group information returned by `XInfoGroups` command.
type XInfoGroupInfo struct {
// The consumer group's name.
Name string
// The number of consumers in the group.
Consumers int64
// The length of the group's Pending Entries List (PEL), which are messages that were delivered but are yet to be
// acknowledged.
Pending int64
// The ID of the last entry delivered to the group's consumers.
LastDeliveredId string
// The logical "read counter" of the last entry delivered to the group's consumers.
// Included in the response only on valkey 7.0.0 and above.
EntriesRead Result[int64]
// The number of entries in the stream that are still waiting to be delivered to the group's consumers, or a `nil` when
// that number can't be determined.
// Included in the response only on valkey 7.0.0 and above.
Lag Result[int64]
}
2 changes: 2 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type StreamCommands interface {

XInfoStreamFullWithOptions(key string, options *options.XInfoStreamOptions) (map[string]any, error)

XInfoGroups(key string) ([]XInfoGroupInfo, error)

XRange(key string, start options.StreamBoundary, end options.StreamBoundary) ([]XRangeResponse, error)

XRangeWithOptions(
Expand Down
72 changes: 72 additions & 0 deletions go/api/stream_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,3 +1777,75 @@ func ExampleGlideClusterClient_XInfoStreamFullWithOptions() {
// "recorded-first-entry-id": "12345-1"
// }
}

func ExampleGlideClient_XInfoGroups() {
var client *GlideClient = getExampleGlideClient() // example helper function
key := uuid.NewString()
group := "myGroup"

// create an empty stream with a group
client.XGroupCreateWithOptions(key, group, "0-0", *options.NewXGroupCreateOptions().SetMakeStream())
// add couple of entries
client.XAddWithOptions(key, [][]string{{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}}, *options.NewXAddOptions().SetId("0-1"))
client.XAddWithOptions(key, [][]string{{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}}, *options.NewXAddOptions().SetId("0-2"))
// read them
client.XReadGroup(group, "myConsumer", map[string]string{key: ">"})
// get the info
response, err := client.XInfoGroups(key)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}

fmt.Println(response)
// Expanded:
fmt.Printf("Group name: %s\n", response[0].Name)
fmt.Printf("Consumers count: %d\n", response[0].Consumers)
fmt.Printf("PEL count: %d\n", response[0].Pending)
fmt.Printf("Last delivered message: %s\n", response[0].LastDeliveredId)
fmt.Printf("Entries read: %d\n", response[0].EntriesRead.Value()) // Added in version 7.0.0
fmt.Printf("Lag: %d\n", response[0].Lag.Value()) // Added in version 7.0.0
// Output:
// [{myGroup 1 2 0-2 {2 false} {0 false}}]
// Group name: myGroup
// Consumers count: 1
// PEL count: 2
// Last delivered message: 0-2
// Entries read: 2
// Lag: 0
}

func ExampleGlideClusterClient_XInfoGroups() {
var client *GlideClusterClient = getExampleGlideClusterClient() // example helper function
key := uuid.NewString()
group := "myGroup"

// create an empty stream with a group
client.XGroupCreateWithOptions(key, group, "0-0", *options.NewXGroupCreateOptions().SetMakeStream())
// add couple of entries
client.XAddWithOptions(key, [][]string{{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}}, *options.NewXAddOptions().SetId("0-1"))
client.XAddWithOptions(key, [][]string{{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}}, *options.NewXAddOptions().SetId("0-2"))
// read them
client.XReadGroup(group, "myConsumer", map[string]string{key: ">"})
// get the info
response, err := client.XInfoGroups(key)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}

fmt.Println(response)
// Expanded:
fmt.Printf("Group name: %s\n", response[0].Name)
fmt.Printf("Consumers count: %d\n", response[0].Consumers)
fmt.Printf("PEL count: %d\n", response[0].Pending)
fmt.Printf("Last delivered message: %s\n", response[0].LastDeliveredId)
fmt.Printf("Entries read: %d\n", response[0].EntriesRead.Value()) // Added in version 7.0.0
fmt.Printf("Lag: %d\n", response[0].Lag.Value()) // Added in version 7.0.0
// Output:
// [{myGroup 1 2 0-2 {2 false} {0 false}}]
// Group name: myGroup
// Consumers count: 1
// PEL count: 2
// Last delivered message: 0-2
// Entries read: 2
// Lag: 0
}
170 changes: 170 additions & 0 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7124,6 +7124,176 @@ func (suite *GlideTestSuite) TestXInfoStream() {
})
}

func (suite *GlideTestSuite) TestXInfoGroups() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.NewString()
group := uuid.NewString()
consumer := uuid.NewString()

suite.verifyOK(client.XGroupCreateWithOptions(key, group, "0-0", *options.NewXGroupCreateOptions().SetMakeStream()))

// one empty group exists
xinfo, err := client.XInfoGroups(key)
assert.NoError(suite.T(), err)
if suite.serverVersion < "7.0.0" {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 0,
Pending: 0,
LastDeliveredId: "0-0",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateNilInt64Result(),
},
}, xinfo)
} else {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 0,
Pending: 0,
LastDeliveredId: "0-0",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateInt64Result(0),
},
}, xinfo)
}

xadd, err := client.XAddWithOptions(
key,
[][]string{{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}},
*options.NewXAddOptions().SetId("0-1"),
)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "0-1", xadd.Value())
xadd, err = client.XAddWithOptions(
key,
[][]string{{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}},
*options.NewXAddOptions().SetId("0-2"),
)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "0-2", xadd.Value())
xadd, err = client.XAddWithOptions(key, [][]string{{"e3_f1", "e3_v1"}}, *options.NewXAddOptions().SetId("0-3"))
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "0-3", xadd.Value())

// same as previous check, bug lag = 3, there are 3 messages unread
xinfo, err = client.XInfoGroups(key)
assert.NoError(suite.T(), err)
if suite.serverVersion < "7.0.0" {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 0,
Pending: 0,
LastDeliveredId: "0-0",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateNilInt64Result(),
},
}, xinfo)
} else {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 0,
Pending: 0,
LastDeliveredId: "0-0",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateInt64Result(3),
},
}, xinfo)
}

xReadGroup, err := client.XReadGroup(group, consumer, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
expectedResult := map[string]map[string][][]string{
key: {
"0-1": {{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}},
"0-2": {{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}},
"0-3": {{"e3_f1", "e3_v1"}},
},
}
assert.Equal(suite.T(), expectedResult, xReadGroup)

// after reading, `lag` is reset, and `pending`, consumer count and last ID are set
xinfo, err = client.XInfoGroups(key)
assert.NoError(suite.T(), err)
if suite.serverVersion < "7.0.0" {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 1,
Pending: 3,
LastDeliveredId: "0-3",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateNilInt64Result(),
},
}, xinfo)
} else {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 1,
Pending: 3,
LastDeliveredId: "0-3",
EntriesRead: api.CreateInt64Result(3),
Lag: api.CreateInt64Result(0),
},
}, xinfo)
}

xack, err := client.XAck(key, group, []string{"0-1"})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(1), xack)

// once message ack'ed, pending counter decreased
xinfo, err = client.XInfoGroups(key)
assert.NoError(suite.T(), err)
if suite.serverVersion < "7.0.0" {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 1,
Pending: 2,
LastDeliveredId: "0-3",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateNilInt64Result(),
},
}, xinfo)
} else {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 1,
Pending: 2,
LastDeliveredId: "0-3",
EntriesRead: api.CreateInt64Result(3),
Lag: api.CreateInt64Result(0),
},
}, xinfo)
}

// Passing a non-existing key raises an error
key = uuid.NewString()
_, err = client.XInfoGroups(key)
assert.IsType(suite.T(), &errors.RequestError{}, err)

// key exists, but it is not a stream
suite.verifyOK(client.Set(key, key))
_, err = client.XInfoGroups(key)
assert.IsType(suite.T(), &errors.RequestError{}, err)

// create a second stream
key = uuid.NewString()
_, err = client.XAdd(key, [][]string{{"1", "2"}})
assert.NoError(suite.T(), err)
// no group yet exists
xinfo, err = client.XInfoGroups(key)
assert.NoError(suite.T(), err)
assert.Empty(suite.T(), xinfo)
})
}

func (suite *GlideTestSuite) TestSetBit_SetSingleBit() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
Expand Down
Loading