Skip to content

Commit 725d85e

Browse files
committed
fix(sip): implicitly paginate sip trunk queries to skirt twirp limits
1 parent e0e8548 commit 725d85e

File tree

3 files changed

+148
-4
lines changed

3 files changed

+148
-4
lines changed

cmd/lk/proto.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"google.golang.org/protobuf/proto"
2727

2828
"github.com/livekit/livekit-cli/v2/pkg/util"
29+
"github.com/livekit/protocol/livekit"
2930
)
3031

3132
const flagRequest = "request"
@@ -44,6 +45,16 @@ type protoTypeValidator[T any] interface {
4445
Validate() error
4546
}
4647

48+
type paginatedType[T any] interface {
49+
protoType[T]
50+
GetPage() *livekit.Pagination
51+
}
52+
53+
type paginatedResponseType[T any, D any] interface {
54+
protoType[T]
55+
GetItems() []*D
56+
}
57+
4758
func ReadRequest[T any, P protoType[T]](cmd *cli.Command) (*T, error) {
4859
return ReadRequestFileOrLiteral[T, P](cmd.String(flagRequest))
4960
}
@@ -99,6 +110,35 @@ func RequestDesc[T any, _ protoType[T]]() string {
99110
return typ + " as JSON file"
100111
}
101112

113+
// Repeatedly calls a paginated list function until all items are retrieved,
114+
// requiring the caller to update the request's pagination parameters as needed.
115+
func ExhaustivePaginatedList[
116+
ReqT any, Req paginatedType[ReqT],
117+
ResT any, ResD any, Res paginatedResponseType[ResT, ResD],
118+
](
119+
ctx context.Context,
120+
req Req,
121+
list func(context.Context, Req) (Res, error),
122+
iter func(items []*ResD),
123+
page *livekit.Pagination,
124+
) error {
125+
for {
126+
res, err := list(ctx, req)
127+
if err != nil {
128+
return err
129+
}
130+
resultItems := res.GetItems()
131+
if len(resultItems) > 0 {
132+
iter(resultItems)
133+
}
134+
// List exhausted
135+
if len(resultItems) < int(page.Limit) {
136+
break
137+
}
138+
}
139+
return nil
140+
}
141+
102142
func createAndPrintFile[T any, P protoTypeValidator[T], R any](
103143
ctx context.Context,
104144
cmd *cli.Command, file string,
@@ -206,7 +246,7 @@ func listAndPrint[
206246
](
207247
ctx context.Context,
208248
cmd *cli.Command,
209-
getList func(ctx context.Context, req Req) (Resp, error), req Req,
249+
getList func(context.Context, Req) (Resp, error), req Req,
210250
header []string, tableRow func(item *T) []string,
211251
) error {
212252
res, err := getList(ctx, req)

cmd/lk/sip.go

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -790,9 +790,33 @@ func listSipTrunk(ctx context.Context, cmd *cli.Command) error {
790790
func listSipInboundTrunk(ctx context.Context, cmd *cli.Command) error {
791791
cli, err := createSIPClient(ctx, cmd)
792792
if err != nil {
793-
return err
793+
return fmt.Errorf("could not create SIP client: %w", err)
794+
}
795+
796+
// NOTE: twirp has a maximum payload size of 4MB, which some customer data may exceed.
797+
// We implement pagination here behind the scenes to split requests into manageable chunks
798+
// unlikely to exceed the limit. This should be used on all listing commands that may
799+
// return a large number of items.
800+
page := &livekit.Pagination{Limit: 500}
801+
req := &livekit.ListSIPInboundTrunkRequest{Page: page}
802+
list := func(ctx context.Context, req *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error) {
803+
res := &livekit.ListSIPInboundTrunkResponse{}
804+
if err := ExhaustivePaginatedList(
805+
ctx,
806+
req,
807+
cli.ListSIPInboundTrunk,
808+
func(items []*livekit.SIPInboundTrunkInfo) {
809+
res.Items = append(res.Items, items...)
810+
page.AfterId = items[len(items)-1].SipTrunkId
811+
},
812+
page,
813+
); err != nil {
814+
return nil, fmt.Errorf("could not list SIP inbound trunks: %w", err)
815+
}
816+
return res, nil
794817
}
795-
return listAndPrint(ctx, cmd, cli.ListSIPInboundTrunk, &livekit.ListSIPInboundTrunkRequest{}, []string{
818+
819+
return listAndPrint(ctx, cmd, list, req, []string{
796820
"SipTrunkID", "Name", "Numbers",
797821
"AllowedAddresses", "AllowedNumbers",
798822
"Authentication",
@@ -816,7 +840,31 @@ func listSipOutboundTrunk(ctx context.Context, cmd *cli.Command) error {
816840
if err != nil {
817841
return err
818842
}
819-
return listAndPrint(ctx, cmd, cli.ListSIPOutboundTrunk, &livekit.ListSIPOutboundTrunkRequest{}, []string{
843+
844+
// NOTE: twirp has a maximum payload size of 4MB, which some customer data may exceed.
845+
// We implement pagination here behind the scenes to split requests into manageable chunks
846+
// unlikely to exceed the limit. This should be used on all listing commands that may
847+
// return a large number of items.
848+
page := &livekit.Pagination{Limit: 500}
849+
req := &livekit.ListSIPOutboundTrunkRequest{Page: page}
850+
list := func(ctx context.Context, req *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error) {
851+
res := &livekit.ListSIPOutboundTrunkResponse{}
852+
if err := ExhaustivePaginatedList(
853+
ctx,
854+
req,
855+
cli.ListSIPOutboundTrunk,
856+
func(items []*livekit.SIPOutboundTrunkInfo) {
857+
res.Items = append(res.Items, items...)
858+
page.AfterId = items[len(items)-1].SipTrunkId
859+
},
860+
page,
861+
); err != nil {
862+
return nil, fmt.Errorf("could not list SIP outbound trunks: %w", err)
863+
}
864+
return res, nil
865+
}
866+
867+
return listAndPrint(ctx, cmd, list, req, []string{
820868
"SipTrunkID", "Name",
821869
"Address", "Transport",
822870
"Numbers",

pkg/util/pagination.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2023-2024 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package util
16+
17+
import (
18+
// "github.com/livekit/protocol/livekit"
19+
"google.golang.org/protobuf/proto"
20+
)
21+
22+
type ProtoType[T any] interface {
23+
*T
24+
proto.Message
25+
}
26+
27+
type Res[T any] interface {
28+
proto.Message
29+
GetItems() []*T
30+
}
31+
32+
// func FetchExhaustiveWithPagination[ReqT any, Req ProtoType[ReqT]](fetch func(pagination *livekit.Pagination) error, initialPage *livekit.Pagination) error {
33+
// page := initialPage
34+
//
35+
// for {
36+
// if err := fetch(page); err != nil {
37+
// return err
38+
// }
39+
// if
40+
// }
41+
//
42+
// var response = livekit.ListSIPInboundTrunkResponse{}
43+
// for {
44+
// subResponse, err := s.sipClient.ListSIPInboundTrunk(ctx, in)
45+
// if err != nil {
46+
// return nil, err
47+
// }
48+
// response.Items = append(response.Items, subResponse.Items...)
49+
// if len(subResponse.Items) < int(in.Page.Limit) {
50+
// break
51+
// }
52+
// in.Page.AfterId = subResponse.Items[len(subResponse.Items)-1].SipTrunkId
53+
// }
54+
// return &response, nil
55+
//
56+
// }

0 commit comments

Comments
 (0)