Skip to content

Commit

Permalink
Make xlocale cached local loader
Browse files Browse the repository at this point in the history
See: <golang/go#26106>
time.LoadLocation shall not be called on a hot path, since it's do a lot of syscalls and actually read files every time, despite this file is a same one.
This package introduce a naive static cache, locales by the nature of it very static, so added up a cache layer before it.

Before:

```
BenchmarkReceiverTest
BenchmarkReceiverTest/parse
BenchmarkReceiverTest/parse/tz
BenchmarkReceiverTest/parse/tz-10      	   37561	     30858 ns/op	   12645 B/op	      77 allocs/op
BenchmarkReceiverTest/parse/no-tz
BenchmarkReceiverTest/parse/no-tz-10   	   58916	     20015 ns/op	    8963 B/op	      53 allocs/op
PASS
```

After:

```

BenchmarkReceiverTest/parse
BenchmarkReceiverTest/parse/tz
BenchmarkReceiverTest/parse/tz-10      	   58303	     20599 ns/op	    8964 B/op	      53 allocs/op
BenchmarkReceiverTest/parse/no-tz
BenchmarkReceiverTest/parse/no-tz-10   	   59144	     19973 ns/op	    8963 B/op	      53 allocs/op
```
commit_hash:abe8a3155d7a10363e2dc14ace42e81a72d268fd
  • Loading branch information
laskoviymishka committed Jan 29, 2025
1 parent fb18d20 commit a996331
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 8 deletions.
2 changes: 2 additions & 0 deletions .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,7 @@
"pkg/debezium/pg/tests/gotest/canondata/result.json":"transfer_manager/go/pkg/debezium/pg/tests/gotest/canondata/result.json",
"pkg/debezium/pg/tests/original_type_info_test.go":"transfer_manager/go/pkg/debezium/pg/tests/original_type_info_test.go",
"pkg/debezium/pg/tests/params_test.go":"transfer_manager/go/pkg/debezium/pg/tests/params_test.go",
"pkg/debezium/pg/tests/receiver_bench_test.go":"transfer_manager/go/pkg/debezium/pg/tests/receiver_bench_test.go",
"pkg/debezium/pg/tests/receiver_test.go":"transfer_manager/go/pkg/debezium/pg/tests/receiver_test.go",
"pkg/debezium/pg/tests/testdata/README.md":"transfer_manager/go/pkg/debezium/pg/tests/testdata/README.md",
"pkg/debezium/pg/tests/testdata/emitter_chain_test__canon_change_item_final_not_wiped.txt":"transfer_manager/go/pkg/debezium/pg/tests/testdata/emitter_chain_test__canon_change_item_final_not_wiped.txt",
Expand Down Expand Up @@ -2171,6 +2172,7 @@
"pkg/util/validators/validators_test.go":"transfer_manager/go/pkg/util/validators/validators_test.go",
"pkg/util/xd_array.go":"transfer_manager/go/pkg/util/xd_array.go",
"pkg/util/xd_array_test.go":"transfer_manager/go/pkg/util/xd_array_test.go",
"pkg/util/xlocale/cached_loader.go":"transfer_manager/go/pkg/util/xlocale/cached_loader.go",
"pkg/worker/tasks/activate_delivery.go":"transfer_manager/go/pkg/worker/tasks/activate_delivery.go",
"pkg/worker/tasks/add_tables.go":"transfer_manager/go/pkg/worker/tasks/add_tables.go",
"pkg/worker/tasks/asynchronous_snapshot_state.go":"transfer_manager/go/pkg/worker/tasks/asynchronous_snapshot_state.go",
Expand Down
7 changes: 4 additions & 3 deletions pkg/debezium/pg/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/doublecloud/transfer/pkg/debezium/typeutil"
"github.com/doublecloud/transfer/pkg/providers/postgres"
"github.com/doublecloud/transfer/pkg/util/jsonx"
"github.com/doublecloud/transfer/pkg/util/xlocale"
ytschema "go.ytsaurus.tech/yt/go/schema"
)

Expand Down Expand Up @@ -163,7 +164,7 @@ func (t *TimestampWithoutTimeZone) Do(in int64, originalType *debeziumcommon.Ori
if timeZone == "" {
return timestamp, nil
}
tz, err := time.LoadLocation(timeZone)
tz, err := xlocale.Load(timeZone)
if err != nil {
return time.Time{}, xerrors.Errorf("unable to load timezone %s, err: %w", timeZone, err)
}
Expand Down Expand Up @@ -298,8 +299,8 @@ func (t *TimestampWithTimeZone) Do(in string, _ *debeziumcommon.OriginalTypeInfo
if err != nil {
return time.Time{}, xerrors.Errorf("unable to parse timestamp with timezone: %s, err: %w", in, err)
}
tz, _ := time.LoadLocation("UTC")
return time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), timestamp.Hour(), timestamp.Minute(), timestamp.Second(), timestamp.Nanosecond(), tz), nil

return time.Date(timestamp.Year(), timestamp.Month(), timestamp.Day(), timestamp.Hour(), timestamp.Minute(), timestamp.Second(), timestamp.Nanosecond(), time.UTC), nil
}

type Enum struct {
Expand Down
90 changes: 90 additions & 0 deletions pkg/debezium/pg/tests/receiver_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package tests

import (
"testing"

"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/debezium"
debeziumcommon "github.com/doublecloud/transfer/pkg/debezium/common"
debeziumparameters "github.com/doublecloud/transfer/pkg/debezium/parameters"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
)

func BenchmarkReceiverTest(b *testing.B) {
b.Run("parse", func(b *testing.B) {
b.Run("tz", func(b *testing.B) {
canonDebeziumMsgWithoutSequence := wipeSequenceAndIncremental(debeziumMsg30)
receiver := debezium.NewReceiver(map[abstract.TableID]map[string]*debeziumcommon.OriginalTypeInfo{
{Namespace: "public", Name: "basic_types"}: {
"id": {OriginalType: "pg:integer"},
"val": {OriginalType: `pg:timestamp without time zone`, Properties: map[string]string{"timezone": "Europe/Moscow"}},
},
}, nil)

b.ResetTimer()
for n := 0; n < b.N; n++ {
co, err := receiver.Receive(canonDebeziumMsgWithoutSequence)
require.NoError(b, err)
b.SetBytes(int64(co.Size.Values))
}
b.ReportAllocs()
})
b.Run("no-tz", func(b *testing.B) {
canonDebeziumMsgWithoutSequence := wipeSequenceAndIncremental(debeziumMsg30)
receiver := debezium.NewReceiver(map[abstract.TableID]map[string]*debeziumcommon.OriginalTypeInfo{
{Namespace: "public", Name: "basic_types"}: {
"id": {OriginalType: "pg:integer"},
"val": {OriginalType: `pg:timestamp without time zone`},
},
}, nil)

b.ResetTimer()
for n := 0; n < b.N; n++ {
co, err := receiver.Receive(canonDebeziumMsgWithoutSequence)
require.NoError(b, err)
b.SetBytes(int64(co.Size.Values))
}
b.ReportAllocs()
})
})
b.Run("serialize", func(b *testing.B) {
changeItem := extractCI(b, debeziumMsg31, map[abstract.TableID]map[string]*debeziumcommon.OriginalTypeInfo{
{Namespace: "public", Name: "basic_types"}: {
"id": {OriginalType: "pg:integer"},
"val": {OriginalType: `pg:timestamp with time zone`},
},
})
emitter, err := debezium.NewMessagesEmitter(map[string]string{
debeziumparameters.DatabaseDBName: "pguser",
debeziumparameters.TopicPrefix: "fullfillment",
debeziumparameters.AddOriginalTypes: "false",
debeziumparameters.SourceType: "pg",
}, "1.8.0.Final", false, logger.LoggerWithLevel(zapcore.WarnLevel))
require.NoError(b, err)

b.ResetTimer()
for n := 0; n < b.N; n++ {
result, err := emitter.EmitKV(&changeItem, debezium.GetPayloadTSMS(&changeItem), false, nil)
require.NoError(b, err)
require.Equal(b, len(result), 1)
b.SetBytes(int64(len(*result[0].DebeziumVal)))
}
b.ReportAllocs()
})
}

func extractCI(
t require.TestingT,
debeziumMsg string,
originalTypes map[abstract.TableID]map[string]*debeziumcommon.OriginalTypeInfo,
) abstract.ChangeItem {
canonDebeziumMsgWithoutSequence := wipeSequenceAndIncremental(debeziumMsg)
receiver := debezium.NewReceiver(originalTypes, nil)
changeItemStr, err := ReceiveStr(receiver, canonDebeziumMsgWithoutSequence)
require.NoError(t, err)
changeItem, err := abstract.UnmarshalChangeItem([]byte(changeItemStr))
require.NoError(t, err)
return *changeItem
}
4 changes: 2 additions & 2 deletions pkg/debezium/pg/tests/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestUnmarshalMessage(t *testing.T) {
//---------------------------------------------------------------------------------------------------------------------
// utils

func synthesizeDebeziumMessage(t *testing.T, changeItem *abstract.ChangeItem) string {
func synthesizeDebeziumMessage(t require.TestingT, changeItem *abstract.ChangeItem) string {
emitter, err := debezium.NewMessagesEmitter(map[string]string{
debeziumparameters.DatabaseDBName: "pguser",
debeziumparameters.TopicPrefix: "fullfillment",
Expand Down Expand Up @@ -73,7 +73,7 @@ func ReceiveStr(r *debezium.Receiver, in string) (string, error) {
return changeItem.ToJSONString(), err
}

func receiveWrapper(t *testing.T, debeziumMsg, canonizedChangeItem string, originalTypes map[abstract.TableID]map[string]*debeziumcommon.OriginalTypeInfo) {
func receiveWrapper(t require.TestingT, debeziumMsg, canonizedChangeItem string, originalTypes map[abstract.TableID]map[string]*debeziumcommon.OriginalTypeInfo) {
canonDebeziumMsgWithoutSequence := wipeSequenceAndIncremental(debeziumMsg)
receiver := debezium.NewReceiver(originalTypes, nil)
changeItemStr, err := ReceiveStr(receiver, canonDebeziumMsgWithoutSequence)
Expand Down
3 changes: 2 additions & 1 deletion pkg/providers/postgres/change_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/doublecloud/transfer/pkg/util/jsonx"
"github.com/doublecloud/transfer/pkg/util/set"
"github.com/doublecloud/transfer/pkg/util/strict"
"github.com/doublecloud/transfer/pkg/util/xlocale"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
Expand Down Expand Up @@ -62,7 +63,7 @@ func newChangeProcessor(
// use UTC in homo transfers, otherwise timestamps WITHOUT time zone cannot be inserted with COPY
connTimezone = time.UTC.String()
}
location, err := time.LoadLocation(connTimezone)
location, err := xlocale.Load(connTimezone)
if err != nil {
logger.Log.Warn("failed to parse time zone", log.String("timezone", connTimezone), log.Error(err))
location = time.UTC
Expand Down
3 changes: 2 additions & 1 deletion pkg/providers/postgres/unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/doublecloud/transfer/internal/logger"
"github.com/doublecloud/transfer/library/go/core/xerrors"
"github.com/doublecloud/transfer/pkg/abstract"
"github.com/doublecloud/transfer/pkg/util/xlocale"
"github.com/gofrs/uuid"
"github.com/jackc/pgproto3/v2"
"github.com/jackc/pgtype"
Expand All @@ -24,7 +25,7 @@ func MakeUnmarshallerData(isHomo bool, conn *pgx.Conn) UnmarshallerData {
// use UTC in homo transfers, otherwise timestamps WITHOUT time zone cannot be inserted with COPY
marshalTimezone = time.UTC.String()
}
location, err := time.LoadLocation(marshalTimezone)
location, err := xlocale.Load(marshalTimezone)
if err != nil {
logger.Log.Warn("failed to parse time zone", log.String("timezone", marshalTimezone), log.Error(err))
location = time.UTC
Expand Down
3 changes: 2 additions & 1 deletion pkg/providers/s3/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/doublecloud/transfer/pkg/serializer"
"github.com/doublecloud/transfer/pkg/stats"
"github.com/doublecloud/transfer/pkg/util"
"github.com/doublecloud/transfer/pkg/util/xlocale"
"go.ytsaurus.tech/library/go/core/log"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -251,7 +252,7 @@ func (s *sinker) bucket(row abstract.ChangeItem) string {
rowBucketTime = model.ExtractTimeCol(row, s.cfg.LayoutColumn)
}
if s.cfg.LayoutTZ != "" {
loc, _ := time.LoadLocation(s.cfg.LayoutTZ)
loc, _ := xlocale.Load(s.cfg.LayoutTZ)
rowBucketTime = rowBucketTime.In(loc)
}
return rowBucketTime.Format(s.cfg.Layout)
Expand Down
29 changes: 29 additions & 0 deletions pkg/util/xlocale/cached_loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package xlocale

import (
"sync"
"time"
)

var (
locationCache sync.Map
cacheMutex sync.Mutex
)

func Load(name string) (*time.Location, error) {
if entry, ok := locationCache.Load(name); ok {
cacheEntry := entry.(*time.Location)
return cacheEntry, nil
}

loc, err := time.LoadLocation(name)
if err != nil {
return nil, err
}

cacheMutex.Lock()
locationCache.Store(name, loc)
cacheMutex.Unlock()

return loc, nil
}

0 comments on commit a996331

Please sign in to comment.