Skip to content

Commit 6db5378

Browse files
committed
chore(dataobj): Use common base config for indexobj and logsobj builders
Use the same config struct for both the indexobj builder and the logsobj builder, since they are configured identically, only with different default values.
1 parent f09e7ca commit 6db5378

File tree

19 files changed

+165
-182
lines changed

19 files changed

+165
-182
lines changed

docs/sources/shared/configuration.md

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1353,29 +1353,35 @@ dataobj:
13531353
[topic: <string> | default = ""]
13541354

13551355
index:
1356-
# The size of the target page to use for the index object builder.
1356+
# The target maximum amount of uncompressed data to hold in data pages (for
1357+
# columnar sections). Uncompressed size is used for consistent I/O and
1358+
# planning.
13571359
# CLI flag: -dataobj-index-builder.target-page-size
13581360
[target_page_size: <int> | default = 128KiB]
13591361

1360-
# The maximum row count for pages to use for the index builder. A value of 0
1361-
# means no limit.
1362+
# The maximum row count for pages to use for the data object builder. A
1363+
# value of 0 means no limit.
13621364
# CLI flag: -dataobj-index-builder.max-page-rows
13631365
[max_page_rows: <int> | default = 0]
13641366

1365-
# The size of the target object to use for the index object builder.
1366-
# CLI flag: -dataobj-index-builder.target-object-size
1367+
# The target maximum size of the encoded object and all of its encoded
1368+
# sections (after compression), to limit memory usage of a builder.
1369+
# CLI flag: -dataobj-index-builder.target-builder-memory-limit
13671370
[target_object_size: <int> | default = 64MiB]
13681371

1369-
# Configures a maximum size for sections, for sections that support it.
1372+
# The target maximum amount of uncompressed data to hold in sections, for
1373+
# sections that support being limited by size. Uncompressed size is used for
1374+
# consistent I/O and planning.
13701375
# CLI flag: -dataobj-index-builder.target-section-size
13711376
[target_section_size: <int> | default = 16MiB]
13721377

1373-
# The size of the buffer to use for sorting logs.
1378+
# The size of logs to buffer in memory before adding into columnar builders,
1379+
# used to reduce CPU load of sorting.
13741380
# CLI flag: -dataobj-index-builder.buffer-size
13751381
[buffer_size: <int> | default = 2MiB]
13761382

1377-
# The maximum number of stripes to merge into a section at once. Must be
1378-
# greater than 1.
1383+
# The maximum number of log section stripes to merge into a section at once.
1384+
# Must be greater than 1.
13791385
# CLI flag: -dataobj-index-builder.section-stripe-merge-limit
13801386
[section_stripe_merge_limit: <int> | default = 2]
13811387

pkg/dataobj/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ func (cfg *Config) Validate() error {
4343
if err := cfg.Consumer.Validate(); err != nil {
4444
return err
4545
}
46+
if err := cfg.Index.Validate(); err != nil {
47+
return err
48+
}
4649
if err := cfg.Metastore.Validate(); err != nil {
4750
return err
4851
}

pkg/dataobj/consumer/logsobj/builder.go

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ const (
3838
sortTimestampDESC = "timestamp-desc"
3939
)
4040

41-
// BuilderConfig configures a [Builder].
42-
type BuilderConfig struct {
41+
// BuilderBaseConfig configures a data object builder.
42+
type BuilderBaseConfig struct {
4343
// TargetPageSize configures a target size for encoded pages within the data
4444
// object. TargetPageSize accounts for encoding, but not for compression.
4545
TargetPageSize flagext.Bytes `yaml:"target_page_size"`
@@ -74,30 +74,20 @@ type BuilderConfig struct {
7474
// values of MergeSize trade off lower memory overhead for higher time spent
7575
// merging.
7676
SectionStripeMergeLimit int `yaml:"section_stripe_merge_limit"`
77-
78-
// DataobjSortOrder defines the order in which the rows of the logs sections are sorted.
79-
// They can either be sorted by [streamID ASC, timestamp DESC] or [timestamp DESC, streamID ASC].
80-
DataobjSortOrder string `yaml:"dataobj_sort_order" doc:"hidden"`
8177
}
8278

8379
// RegisterFlagsWithPrefix registers flags with the given prefix.
84-
func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
85-
_ = cfg.TargetPageSize.Set("2MB")
86-
_ = cfg.TargetObjectSize.Set("1GB")
87-
_ = cfg.BufferSize.Set("16MB")
88-
_ = cfg.TargetSectionSize.Set("128MB")
89-
80+
func (cfg *BuilderBaseConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
9081
f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The target maximum amount of uncompressed data to hold in data pages (for columnar sections). Uncompressed size is used for consistent I/O and planning.")
9182
f.IntVar(&cfg.MaxPageRows, prefix+"max-page-rows", 0, "The maximum row count for pages to use for the data object builder. A value of 0 means no limit.")
9283
f.Var(&cfg.TargetObjectSize, prefix+"target-builder-memory-limit", "The target maximum size of the encoded object and all of its encoded sections (after compression), to limit memory usage of a builder.")
9384
f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "The target maximum amount of uncompressed data to hold in sections, for sections that support being limited by size. Uncompressed size is used for consistent I/O and planning.")
9485
f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of logs to buffer in memory before adding into columnar builders, used to reduce CPU load of sorting.")
9586
f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of log section stripes to merge into a section at once. Must be greater than 1.")
96-
f.StringVar(&cfg.DataobjSortOrder, prefix+"dataobj-sort-order", sortStreamASC, "The desired sort order of the logs section. Can either be `stream-asc` (order by streamID ascending and timestamp descending) or `timestamp-desc` (order by timestamp descending and streamID ascending).")
9787
}
9888

9989
// Validate validates the BuilderConfig.
100-
func (cfg *BuilderConfig) Validate() error {
90+
func (cfg *BuilderBaseConfig) Validate() error {
10191
var errs []error
10292

10393
if cfg.TargetPageSize <= 0 {
@@ -122,9 +112,42 @@ func (cfg *BuilderConfig) Validate() error {
122112
errs = append(errs, errors.New("LogsMergeStripesMax must be greater than 1"))
123113
}
124114

115+
return errors.Join(errs...)
116+
}
117+
118+
// BuilderConfig configures a [Builder].
119+
type BuilderConfig struct {
120+
BuilderBaseConfig `yaml:",inline"`
121+
122+
// DataobjSortOrder defines the order in which the rows of the logs sections are sorted.
123+
// They can either be sorted by [streamID ASC, timestamp DESC] or [timestamp DESC, streamID ASC].
124+
DataobjSortOrder string `yaml:"dataobj_sort_order" doc:"hidden"`
125+
}
126+
127+
// RegisterFlagsWithPrefix registers flags with the given prefix.
128+
func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
129+
// Set defaults for base builder configuration
130+
_ = cfg.BuilderBaseConfig.TargetPageSize.Set("2MB")
131+
_ = cfg.BuilderBaseConfig.TargetObjectSize.Set("1GB")
132+
_ = cfg.BuilderBaseConfig.BufferSize.Set("16MB")
133+
_ = cfg.BuilderBaseConfig.TargetSectionSize.Set("128MB")
134+
cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f)
135+
136+
f.StringVar(&cfg.DataobjSortOrder, prefix+"dataobj-sort-order", sortStreamASC, "The desired sort order of the logs section. Can either be `stream-asc` (order by streamID ascending and timestamp descending) or `timestamp-desc` (order by timestamp descending and streamID ascending).")
137+
}
138+
139+
// Validate validates the BuilderConfig.
140+
func (cfg *BuilderConfig) Validate() error {
141+
var errs []error
142+
143+
if err := cfg.BuilderBaseConfig.Validate(); err != nil {
144+
errs = append(errs, err)
145+
}
146+
125147
if cfg.DataobjSortOrder == "" {
126148
cfg.DataobjSortOrder = sortStreamASC // default to [streamID ASC, timestamp DESC] sorting
127149
}
150+
128151
if cfg.DataobjSortOrder != sortStreamASC && cfg.DataobjSortOrder != sortTimestampDESC {
129152
errs = append(errs, fmt.Errorf("invalid dataobj sort order. must be one of `stream-asc` or `timestamp-desc`, got: %s", cfg.DataobjSortOrder))
130153
}

pkg/dataobj/consumer/logsobj/builder_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ import (
2121
)
2222

2323
var testBuilderConfig = BuilderConfig{
24-
TargetPageSize: 2048,
25-
TargetObjectSize: 1 << 20, // 1 MiB
26-
TargetSectionSize: 8 << 10, // 8 KiB
27-
28-
BufferSize: 2048 * 8,
29-
SectionStripeMergeLimit: 2,
30-
DataobjSortOrder: sortTimestampDESC,
24+
BuilderBaseConfig: BuilderBaseConfig{
25+
TargetPageSize: 2048,
26+
TargetObjectSize: 1 << 20, // 1 MiB
27+
TargetSectionSize: 8 << 10, // 8 KiB
28+
BufferSize: 2048 * 8,
29+
SectionStripeMergeLimit: 2,
30+
},
31+
DataobjSortOrder: sortTimestampDESC,
3132
}
3233

3334
func TestBuilder(t *testing.T) {

pkg/dataobj/consumer/partition_processor_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ import (
2121
)
2222

2323
var testBuilderConfig = logsobj.BuilderConfig{
24-
TargetPageSize: 2048,
25-
MaxPageRows: 10,
26-
TargetObjectSize: 1 << 22, // 4 MiB
27-
TargetSectionSize: 1 << 22, // 4 MiB
28-
BufferSize: 2048 * 8,
29-
SectionStripeMergeLimit: 2,
24+
BuilderBaseConfig: logsobj.BuilderBaseConfig{
25+
TargetPageSize: 2048,
26+
MaxPageRows: 10,
27+
TargetObjectSize: 1 << 22, // 4 MiB
28+
TargetSectionSize: 1 << 22, // 4 MiB
29+
BufferSize: 2048 * 8,
30+
SectionStripeMergeLimit: 2,
31+
},
3032
}
3133

3234
func TestPartitionProcessor_Flush(t *testing.T) {

pkg/dataobj/index/builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func NewIndexBuilder(
135135
}
136136

137137
// Create index building dependencies
138-
builder, err := indexobj.NewBuilder(cfg.BuilderConfig, scratchStore)
138+
builder, err := indexobj.NewBuilder(cfg.BuilderBaseConfig, scratchStore)
139139
if err != nil {
140140
return nil, fmt.Errorf("failed to create index builder: %w", err)
141141
}

pkg/dataobj/index/builder_test.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@ import (
1717

1818
"github.com/grafana/loki/v3/pkg/dataobj"
1919
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
20-
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
2120
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
2221
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
2322
"github.com/grafana/loki/v3/pkg/kafka"
2423
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
2524
"github.com/grafana/loki/v3/pkg/logproto"
2625
)
2726

28-
var testBuilderConfig = indexobj.BuilderConfig{
27+
var testBuilderConfig = logsobj.BuilderBaseConfig{
2928
TargetPageSize: 128 * 1024,
3029
TargetObjectSize: 4 * 1024 * 1024,
3130
TargetSectionSize: 2 * 1024 * 1024,
@@ -53,8 +52,8 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) {
5352
// Create a builder with mocks for dependencies
5453
builder, err := NewIndexBuilder(
5554
Config{
56-
BuilderConfig: testBuilderConfig,
57-
EventsPerIndex: 1,
55+
BuilderBaseConfig: testBuilderConfig,
56+
EventsPerIndex: 1,
5857
},
5958
metastore.Config{},
6059
kafka.Config{},
@@ -121,8 +120,8 @@ func TestIndexBuilder(t *testing.T) {
121120

122121
p, err := NewIndexBuilder(
123122
Config{
124-
BuilderConfig: testBuilderConfig,
125-
EventsPerIndex: 3,
123+
BuilderBaseConfig: testBuilderConfig,
124+
EventsPerIndex: 3,
126125
},
127126
metastore.Config{},
128127
kafka.Config{},
@@ -231,13 +230,13 @@ func (m *mockKafkaClient) Close() {}
231230

232231
func buildLogObject(t *testing.T, app string, path string, bucket objstore.Bucket) {
233232
candidate, err := logsobj.NewBuilder(logsobj.BuilderConfig{
234-
TargetPageSize: 128 * 1024,
235-
TargetObjectSize: 4 * 1024 * 1024,
236-
TargetSectionSize: 2 * 1024 * 1024,
237-
238-
BufferSize: 4 * 1024 * 1024,
239-
SectionStripeMergeLimit: 2,
240-
233+
BuilderBaseConfig: logsobj.BuilderBaseConfig{
234+
TargetPageSize: 128 * 1024,
235+
TargetObjectSize: 4 * 1024 * 1024,
236+
TargetSectionSize: 2 * 1024 * 1024,
237+
BufferSize: 4 * 1024 * 1024,
238+
SectionStripeMergeLimit: 2,
239+
},
241240
DataobjSortOrder: "stream-asc",
242241
}, nil)
243242
require.NoError(t, err)

pkg/dataobj/index/calculate_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/grafana/loki/pkg/push"
2525
)
2626

27-
var testCalculatorConfig = indexobj.BuilderConfig{
27+
var testCalculatorConfig = logsobj.BuilderBaseConfig{
2828
TargetPageSize: 2048,
2929
TargetObjectSize: 1 << 22, // 4 MiB
3030
BufferSize: 2048 * 8,
@@ -39,11 +39,13 @@ func createTestLogObject(t *testing.T, tenants int) *dataobj.Object {
3939
t.Helper()
4040

4141
builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{
42-
TargetPageSize: 2048,
43-
TargetObjectSize: 1 << 22,
44-
TargetSectionSize: 1 << 21,
45-
BufferSize: 2048 * 8,
46-
SectionStripeMergeLimit: 2,
42+
BuilderBaseConfig: logsobj.BuilderBaseConfig{
43+
TargetPageSize: 2048,
44+
TargetObjectSize: 1 << 22,
45+
TargetSectionSize: 1 << 21,
46+
BufferSize: 2048 * 8,
47+
SectionStripeMergeLimit: 2,
48+
},
4749
}, nil)
4850
require.NoError(t, err)
4951

pkg/dataobj/index/config.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,44 @@
11
package index
22

33
import (
4+
"errors"
45
"flag"
56
"time"
67

7-
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
8+
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
89
)
910

1011
type Config struct {
11-
indexobj.BuilderConfig `yaml:",inline"`
12-
EventsPerIndex int `yaml:"events_per_index" experimental:"true"`
13-
FlushInterval time.Duration `yaml:"flush_interval" experimental:"true"`
14-
MaxIdleTime time.Duration `yaml:"max_idle_time" experimental:"true"`
12+
logsobj.BuilderBaseConfig `yaml:",inline"`
13+
EventsPerIndex int `yaml:"events_per_index" experimental:"true"`
14+
FlushInterval time.Duration `yaml:"flush_interval" experimental:"true"`
15+
MaxIdleTime time.Duration `yaml:"max_idle_time" experimental:"true"`
1516
}
1617

1718
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
1819
cfg.RegisterFlagsWithPrefix("dataobj-index-builder.", f)
1920
}
2021

2122
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
22-
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
23+
// Set defaults for base builder configuration
24+
_ = cfg.BuilderBaseConfig.TargetPageSize.Set("128KB")
25+
_ = cfg.BuilderBaseConfig.TargetObjectSize.Set("64MB")
26+
_ = cfg.BuilderBaseConfig.BufferSize.Set("2MB")
27+
_ = cfg.BuilderBaseConfig.TargetSectionSize.Set("16MB")
28+
cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f)
29+
2330
f.IntVar(&cfg.EventsPerIndex, prefix+"events-per-index", 32, "Experimental: The number of events to batch before building an index")
2431
f.DurationVar(&cfg.FlushInterval, prefix+"flush-interval", 1*time.Minute, "Experimental: How often to check for stale partitions to flush")
2532
f.DurationVar(&cfg.MaxIdleTime, prefix+"max-idle-time", 30*time.Minute, "Experimental: Maximum time to wait before flushing buffered events")
2633
}
34+
35+
// Validate validates the BuilderConfig.
36+
func (cfg *Config) Validate() error {
37+
var errs []error
38+
39+
if err := cfg.BuilderBaseConfig.Validate(); err != nil {
40+
errs = append(errs, err)
41+
}
42+
43+
return errors.Join(errs...)
44+
}

0 commit comments

Comments
 (0)