Skip to content

Commit

Permalink
Go: XINFO GROUPS. (#3106)
Browse files Browse the repository at this point in the history
* Go: `XINFO GROUPS`.

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored Feb 19, 2025
1 parent ae6a53c commit 1f56325
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Go: Add ZRangeStore ([3105](https://github.com/valkey-io/valkey-glide/pull/3105))
* Go: Add `ZUNION` ([#3119](https://github.com/valkey-io/valkey-glide/pull/3119))
* Go: Add `ZUNIONSTORE` ([#3136](https://github.com/valkey-io/valkey-glide/pull/3136))
* Go: Add `XINFO GROUPS` ([#3106](https://github.com/valkey-io/valkey-glide/pull/3106))

#### Breaking Changes

Expand Down
22 changes: 22 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5813,6 +5813,28 @@ func (client *baseClient) XInfoStreamFullWithOptions(
return handleStringToAnyMapResponse(result)
}

// Returns the list of all consumer groups and their attributes for the stream stored at `key`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
//
// Return value:
//
// An array of [api.XInfoGroupInfo], where each element represents the
// attributes of a consumer group for the stream at `key`.
//
// [valkey.io]: https://valkey.io/commands/xinfo-groups/
func (client *baseClient) XInfoGroups(key string) ([]XInfoGroupInfo, error) {
response, err := client.executeCommand(C.XInfoGroups, []string{key})
if err != nil {
return nil, err
}
return handleXInfoGroupsResponse(response)
}

// Reads or modifies the array of bits representing the string that is held at key
// based on the specified sub commands.
//
Expand Down
50 changes: 50 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,56 @@ func handleXPendingDetailResponse(response *C.struct_CommandResponse) ([]XPendin
return pendingDetails, nil
}

func handleXInfoGroupsResponse(response *C.struct_CommandResponse) ([]XInfoGroupInfo, error) {
defer C.free_command_response(response)

typeErr := checkResponseType(response, C.Array, false)
if typeErr != nil {
return nil, typeErr
}
arrData, err := parseArray(response)
if err != nil {
return nil, err
}
converted, err := arrayConverter[map[string]interface{}]{
nil,
false,
}.convert(arrData)
if err != nil {
return nil, err
}
arr, ok := converted.([]map[string]interface{})
if !ok {
return nil, &errors.RequestError{Msg: fmt.Sprintf("unexpected type: %T", converted)}
}

result := make([]XInfoGroupInfo, 0, len(arr))

for _, group := range arr {
info := XInfoGroupInfo{
Name: group["name"].(string),
Consumers: group["consumers"].(int64),
Pending: group["pending"].(int64),
LastDeliveredId: group["last-delivered-id"].(string),
}
switch lag := group["lag"].(type) {
case int64:
info.Lag = CreateInt64Result(lag)
default:
info.Lag = CreateNilInt64Result()
}
switch entriesRead := group["entries-read"].(type) {
case int64:
info.EntriesRead = CreateInt64Result(entriesRead)
default:
info.EntriesRead = CreateNilInt64Result()
}
result = append(result, info)
}

return result, nil
}

func handleStringToAnyMapResponse(response *C.struct_CommandResponse) (map[string]interface{}, error) {
defer C.free_command_response(response)

Expand Down
20 changes: 20 additions & 0 deletions go/api/response_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,23 @@ type XPendingDetail struct {
func CreateNilXPendingSummary() XPendingSummary {
return XPendingSummary{0, CreateNilStringResult(), CreateNilStringResult(), make([]ConsumerPendingMessage, 0)}
}

// XInfoGroupInfo represents a group information returned by `XInfoGroups` command.
type XInfoGroupInfo struct {
// The consumer group's name.
Name string
// The number of consumers in the group.
Consumers int64
// The length of the group's Pending Entries List (PEL), which are messages that were delivered but are yet to be
// acknowledged.
Pending int64
// The ID of the last entry delivered to the group's consumers.
LastDeliveredId string
// The logical "read counter" of the last entry delivered to the group's consumers.
// Included in the response only on valkey 7.0.0 and above.
EntriesRead Result[int64]
// The number of entries in the stream that are still waiting to be delivered to the group's consumers, or a `nil` when
// that number can't be determined.
// Included in the response only on valkey 7.0.0 and above.
Lag Result[int64]
}
2 changes: 2 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type StreamCommands interface {

XInfoStreamFullWithOptions(key string, options *options.XInfoStreamOptions) (map[string]any, error)

XInfoGroups(key string) ([]XInfoGroupInfo, error)

XRange(key string, start options.StreamBoundary, end options.StreamBoundary) ([]XRangeResponse, error)

XRangeWithOptions(
Expand Down
72 changes: 72 additions & 0 deletions go/api/stream_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,3 +1777,75 @@ func ExampleGlideClusterClient_XInfoStreamFullWithOptions() {
// "recorded-first-entry-id": "12345-1"
// }
}

func ExampleGlideClient_XInfoGroups() {
var client *GlideClient = getExampleGlideClient() // example helper function
key := uuid.NewString()
group := "myGroup"

// create an empty stream with a group
client.XGroupCreateWithOptions(key, group, "0-0", *options.NewXGroupCreateOptions().SetMakeStream())
// add couple of entries
client.XAddWithOptions(key, [][]string{{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}}, *options.NewXAddOptions().SetId("0-1"))
client.XAddWithOptions(key, [][]string{{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}}, *options.NewXAddOptions().SetId("0-2"))
// read them
client.XReadGroup(group, "myConsumer", map[string]string{key: ">"})
// get the info
response, err := client.XInfoGroups(key)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}

fmt.Println(response)
// Expanded:
fmt.Printf("Group name: %s\n", response[0].Name)
fmt.Printf("Consumers count: %d\n", response[0].Consumers)
fmt.Printf("PEL count: %d\n", response[0].Pending)
fmt.Printf("Last delivered message: %s\n", response[0].LastDeliveredId)
fmt.Printf("Entries read: %d\n", response[0].EntriesRead.Value()) // Added in version 7.0.0
fmt.Printf("Lag: %d\n", response[0].Lag.Value()) // Added in version 7.0.0
// Output:
// [{myGroup 1 2 0-2 {2 false} {0 false}}]
// Group name: myGroup
// Consumers count: 1
// PEL count: 2
// Last delivered message: 0-2
// Entries read: 2
// Lag: 0
}

func ExampleGlideClusterClient_XInfoGroups() {
var client *GlideClusterClient = getExampleGlideClusterClient() // example helper function
key := uuid.NewString()
group := "myGroup"

// create an empty stream with a group
client.XGroupCreateWithOptions(key, group, "0-0", *options.NewXGroupCreateOptions().SetMakeStream())
// add couple of entries
client.XAddWithOptions(key, [][]string{{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}}, *options.NewXAddOptions().SetId("0-1"))
client.XAddWithOptions(key, [][]string{{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}}, *options.NewXAddOptions().SetId("0-2"))
// read them
client.XReadGroup(group, "myConsumer", map[string]string{key: ">"})
// get the info
response, err := client.XInfoGroups(key)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}

fmt.Println(response)
// Expanded:
fmt.Printf("Group name: %s\n", response[0].Name)
fmt.Printf("Consumers count: %d\n", response[0].Consumers)
fmt.Printf("PEL count: %d\n", response[0].Pending)
fmt.Printf("Last delivered message: %s\n", response[0].LastDeliveredId)
fmt.Printf("Entries read: %d\n", response[0].EntriesRead.Value()) // Added in version 7.0.0
fmt.Printf("Lag: %d\n", response[0].Lag.Value()) // Added in version 7.0.0
// Output:
// [{myGroup 1 2 0-2 {2 false} {0 false}}]
// Group name: myGroup
// Consumers count: 1
// PEL count: 2
// Last delivered message: 0-2
// Entries read: 2
// Lag: 0
}
170 changes: 170 additions & 0 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7124,6 +7124,176 @@ func (suite *GlideTestSuite) TestXInfoStream() {
})
}

func (suite *GlideTestSuite) TestXInfoGroups() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.NewString()
group := uuid.NewString()
consumer := uuid.NewString()

suite.verifyOK(client.XGroupCreateWithOptions(key, group, "0-0", *options.NewXGroupCreateOptions().SetMakeStream()))

// one empty group exists
xinfo, err := client.XInfoGroups(key)
assert.NoError(suite.T(), err)
if suite.serverVersion < "7.0.0" {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 0,
Pending: 0,
LastDeliveredId: "0-0",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateNilInt64Result(),
},
}, xinfo)
} else {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 0,
Pending: 0,
LastDeliveredId: "0-0",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateInt64Result(0),
},
}, xinfo)
}

xadd, err := client.XAddWithOptions(
key,
[][]string{{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}},
*options.NewXAddOptions().SetId("0-1"),
)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "0-1", xadd.Value())
xadd, err = client.XAddWithOptions(
key,
[][]string{{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}},
*options.NewXAddOptions().SetId("0-2"),
)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "0-2", xadd.Value())
xadd, err = client.XAddWithOptions(key, [][]string{{"e3_f1", "e3_v1"}}, *options.NewXAddOptions().SetId("0-3"))
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "0-3", xadd.Value())

// same as previous check, bug lag = 3, there are 3 messages unread
xinfo, err = client.XInfoGroups(key)
assert.NoError(suite.T(), err)
if suite.serverVersion < "7.0.0" {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 0,
Pending: 0,
LastDeliveredId: "0-0",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateNilInt64Result(),
},
}, xinfo)
} else {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 0,
Pending: 0,
LastDeliveredId: "0-0",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateInt64Result(3),
},
}, xinfo)
}

xReadGroup, err := client.XReadGroup(group, consumer, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
expectedResult := map[string]map[string][][]string{
key: {
"0-1": {{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}},
"0-2": {{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}},
"0-3": {{"e3_f1", "e3_v1"}},
},
}
assert.Equal(suite.T(), expectedResult, xReadGroup)

// after reading, `lag` is reset, and `pending`, consumer count and last ID are set
xinfo, err = client.XInfoGroups(key)
assert.NoError(suite.T(), err)
if suite.serverVersion < "7.0.0" {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 1,
Pending: 3,
LastDeliveredId: "0-3",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateNilInt64Result(),
},
}, xinfo)
} else {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 1,
Pending: 3,
LastDeliveredId: "0-3",
EntriesRead: api.CreateInt64Result(3),
Lag: api.CreateInt64Result(0),
},
}, xinfo)
}

xack, err := client.XAck(key, group, []string{"0-1"})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(1), xack)

// once message ack'ed, pending counter decreased
xinfo, err = client.XInfoGroups(key)
assert.NoError(suite.T(), err)
if suite.serverVersion < "7.0.0" {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 1,
Pending: 2,
LastDeliveredId: "0-3",
EntriesRead: api.CreateNilInt64Result(),
Lag: api.CreateNilInt64Result(),
},
}, xinfo)
} else {
assert.Equal(suite.T(), []api.XInfoGroupInfo{
{
Name: group,
Consumers: 1,
Pending: 2,
LastDeliveredId: "0-3",
EntriesRead: api.CreateInt64Result(3),
Lag: api.CreateInt64Result(0),
},
}, xinfo)
}

// Passing a non-existing key raises an error
key = uuid.NewString()
_, err = client.XInfoGroups(key)
assert.IsType(suite.T(), &errors.RequestError{}, err)

// key exists, but it is not a stream
suite.verifyOK(client.Set(key, key))
_, err = client.XInfoGroups(key)
assert.IsType(suite.T(), &errors.RequestError{}, err)

// create a second stream
key = uuid.NewString()
_, err = client.XAdd(key, [][]string{{"1", "2"}})
assert.NoError(suite.T(), err)
// no group yet exists
xinfo, err = client.XInfoGroups(key)
assert.NoError(suite.T(), err)
assert.Empty(suite.T(), xinfo)
})
}

func (suite *GlideTestSuite) TestSetBit_SetSingleBit() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
Expand Down

0 comments on commit 1f56325

Please sign in to comment.