Skip to content

Commit

Permalink
Merge pull request #227 from moov-io/pipeline-chunk-files-benchmark
Browse files Browse the repository at this point in the history
Pipeline chunk files benchmark
  • Loading branch information
adamdecaf authored Feb 29, 2024
2 parents dc6e821 + 8365ce6 commit b6c9f49
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 34 deletions.
15 changes: 7 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ require (
github.com/gorilla/mux v1.8.1
github.com/hashicorp/go-retryablehttp v0.7.5
github.com/jlaffaye/ftp v0.2.0
github.com/moov-io/ach v1.33.4
github.com/moov-io/ach v1.35.0
github.com/moov-io/base v0.48.5
github.com/moov-io/cryptfs v0.7.1
github.com/moov-io/go-ftp v0.3.2
github.com/moov-io/go-sftp v0.13.3
github.com/ory/dockertest/v3 v3.9.1
github.com/ory/mail/v3 v3.0.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_golang v1.19.0
github.com/rickar/cal/v2 v2.1.13
github.com/robfig/cron/v3 v3.0.1
github.com/sethvargo/go-retry v0.2.4
Expand All @@ -33,7 +33,7 @@ require (
gocloud.dev v0.35.0
gocloud.dev/pubsub/kafkapubsub v0.35.0
goftp.io/server v0.4.1
golang.org/x/crypto v0.18.0
golang.org/x/crypto v0.19.0
golang.org/x/exp v0.0.0-20231219160207-73b9e39aefca
golang.org/x/sync v0.5.0
golang.org/x/text v0.14.0
Expand Down Expand Up @@ -144,7 +144,6 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/markbates/pkger v0.17.1 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/minio-go/v6 v6.0.57 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
Expand All @@ -163,7 +162,7 @@ require (
github.com/pkg/sftp v1.13.6 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
Expand All @@ -190,9 +189,9 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.16.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
Expand Down
34 changes: 16 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,6 @@ github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxec
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
Expand All @@ -368,8 +366,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/moov-io/ach v1.33.4 h1:6haoD+2H+rSeLBkpGndOdpNnSnM8g/EkgppX5T+2hDA=
github.com/moov-io/ach v1.33.4/go.mod h1:liEoB/PlHQoYJq+V+O/izTzTebEDkAM4waGYVcIIKc4=
github.com/moov-io/ach v1.35.0 h1:SBFgGYO+GwucaZJMVCk9PW5JEp/TqLR2zcP7UTZYABU=
github.com/moov-io/ach v1.35.0/go.mod h1:0WUah/tGNDi3iLk9IX2AkSQkDXTrf7RJdMcLDYy/h3c=
github.com/moov-io/base v0.48.5 h1:QaTyTo6eFFFV35R9l/GdePQN40IJti9knD5hqdWPnnM=
github.com/moov-io/base v0.48.5/go.mod h1:D5ZV9COV/qtCjTQuYpq7gGInCk64AhOQI6UY4kt4Rq8=
github.com/moov-io/cryptfs v0.7.1 h1:aT79fcxvX3vLJ25GMFlsl6WtPLijZ3VWMaRSq8OJu4M=
Expand Down Expand Up @@ -409,13 +407,13 @@ github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Q
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
Expand Down Expand Up @@ -532,8 +530,8 @@ golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw
golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231219160207-73b9e39aefca h1:+xQfFu/HO/82Wwg4zuJ5xiLp0yaOLJjBGnuafXp85YQ=
golang.org/x/exp v0.0.0-20231219160207-73b9e39aefca/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
Expand Down Expand Up @@ -566,11 +564,11 @@ golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.15.0 h1:s8pnnxNVzjWyrvYdFUQq5llS1PX2zhPXmccZv99h7uQ=
golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM=
golang.org/x/oauth2 v0.16.0 h1:aDkGMBSYxElaoP81NpoUoz2oo2R2wHdZpGToUxfyQrQ=
golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -605,16 +603,16 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
12 changes: 10 additions & 2 deletions internal/incoming/odfi/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"testing"
"time"

Expand All @@ -39,7 +40,10 @@ import (

func TestProcessor(t *testing.T) {
dir := t.TempDir()
err := os.WriteFile(filepath.Join(dir, "invalid.ach"), []byte("invalid-ach-file"), 0600)

data, err := os.ReadFile(filepath.Join("testdata", "return-no-batch-controls.ach"))
require.NoError(t, err)
err = os.WriteFile(filepath.Join(dir, "invalid.ach"), data, 0600)
require.NoError(t, err)

proc := &MockProcessor{}
Expand All @@ -60,7 +64,11 @@ func TestProcessor(t *testing.T) {

require.NotNil(t, proc.HandledFile)
require.NotNil(t, proc.HandledFile.ACHFile)
require.Equal(t, "7ffdca32898fc89e5e680d0a01e9e1c2a1cd2717", proc.HandledFile.ACHFile.ID)
if runtime.GOOS == "windows" {
require.Equal(t, "41bcece862c7cb3e74ca96b828fe57458db85199", proc.HandledFile.ACHFile.ID)
} else {
require.Equal(t, "36b5cb975ef810e23f82d976219c176b70d785b2", proc.HandledFile.ACHFile.ID)
}

// Real world file
path := filepath.Join("..", "..", "..", "testdata", "HMBRAD_ACHEXPORT_1001_08_19_2022_09_10")
Expand Down
28 changes: 23 additions & 5 deletions internal/pipeline/merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,36 +416,54 @@ func (m *filesystemMerging) chunkFilesTogether(ctx context.Context, indices []in
if err != nil {
return nil, err
}
span.AddEvent("files-read")

merged, err := ach.MergeFilesWith(files.ACHFiles, conditions)
if err != nil {
return nil, err
}
return determineMergeDestinations(files, merged), nil
span.AddEvent("merged-files")

out, err := determineMergeDestinations(files, merged), nil
if err != nil {
return nil, err
}
return out, nil
}

var input namedFiles
out := make([]*ach.File, 0, len(indices))
mergeParts := make([]*ach.File, 0, len(indices))
for i := 0; i < len(indices)-1; i += 0 {
files, err := m.readFiles(matches[indices[i]:indices[i+1]]) // need to keep filename around
if err != nil {
return nil, err
}
span.AddEvent(fmt.Sprintf("files-read-idx-%d", i))

input.Names = append(input.Names, files.Names...)
input.ACHFiles = append(input.ACHFiles, files.ACHFiles...)

fs, err := ach.MergeFilesWith(files.ACHFiles, conditions)
if err != nil {
return nil, err
}
span.AddEvent(fmt.Sprintf("merged-files-idx-%d", i))

i += 1
out = append(out, fs...)
mergeParts = append(mergeParts, fs...)
}

merged, err := ach.MergeFilesWith(mergeParts, conditions)
if err != nil {
return nil, err
}
span.AddEvent("final-merge-files")

merged, err := ach.MergeFilesWith(out, conditions)
out, err := determineMergeDestinations(input, merged), nil
if err != nil {
return nil, err
}
return determineMergeDestinations(input, merged), nil
return out, nil
}

// determineMergeDestinations will compare the input ACH files against the merged files to determine
Expand Down
66 changes: 65 additions & 1 deletion internal/pipeline/merging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path/filepath"
"slices"
"strconv"
"testing"

"github.com/moov-io/ach"
Expand Down Expand Up @@ -128,7 +129,70 @@ func TestMerging_chunkFilesTogether(t *testing.T) {
require.Len(t, merged[0].ACHFile.Batches[0].GetEntries(), 2)
}

func read(t *testing.T, where string) *ach.File {
func Benchmark_chunkFilesTogether(b *testing.B) {
// Advice: Run this with "-benchtime 100x" to specify a fixed number of iterations
b.StopTimer()

example := read(b, filepath.Join("testdata", "ppd-debit.ach"))
initialTraceNumber, err := strconv.ParseInt(example.Batches[0].GetEntries()[0].TraceNumber, 10, 64)
if err != nil {
b.Fatal(err)
}

// Setup pending directory
dir := b.TempDir()
iterations := b.N * 10_000
groupSize := 250
indices := makeIndices(iterations, iterations/groupSize)
var matches []string
for i := 0; i < iterations; i++ {
file := ach.NewFile()
file.Header = example.Header

exampleBatch := example.Batches[0]
batch, err := ach.NewBatch(exampleBatch.GetHeader())
require.NoError(b, err)
entry := *exampleBatch.GetEntries()[0]
entry.SetTraceNumber(exampleBatch.GetHeader().ODFIIdentification, int(initialTraceNumber)+i)
batch.AddEntry(&entry)
require.NoError(b, batch.Create())

file.AddBatch(batch)
require.NoError(b, file.Create())

var buf bytes.Buffer
err = ach.NewWriter(&buf).Write(file)
require.NoError(b, err)

filename := fmt.Sprintf("%d.ach", i)
matches = append(matches, filename)
err = os.WriteFile(filepath.Join(dir, filename), buf.Bytes(), 0600)
require.NoError(b, err)
}
b.Logf("merged %d files", iterations)

fs, err := storage.NewFilesystem(dir)
require.NoError(b, err)

m := &filesystemMerging{
logger: log.NewTestLogger(),
storage: fs,
}

b.Run("run", func(b *testing.B) {
b.StartTimer()

conditions := ach.Conditions{
MaxLines: 100_000,
MaxDollarAmount: 50_000_000,
}
merged, err := m.chunkFilesTogether(context.Background(), indices, matches, conditions)
require.NoError(b, err)
b.Logf("%d files after merge", len(merged))
})
}

func read(t testing.TB, where string) *ach.File {
t.Helper()

file, err := ach.ReadFile(where)
Expand Down

0 comments on commit b6c9f49

Please sign in to comment.