Skip to content

Commit

Permalink
pipeline: call .ReadDir instead of fs.WalkDir to gather all file desc…
Browse files Browse the repository at this point in the history
…riptors
  • Loading branch information
adamdecaf committed Apr 8, 2024
1 parent 9a953ce commit 0b86fd6
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 110 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.4
github.com/igrmk/treemap/v2 v2.0.1
github.com/jlaffaye/ftp v0.2.0
github.com/moov-io/ach v1.37.1
github.com/moov-io/ach v1.37.2
github.com/moov-io/base v0.48.5
github.com/moov-io/cryptfs v0.7.1
github.com/moov-io/go-ftp v0.3.2
Expand Down Expand Up @@ -138,7 +138,7 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand Down Expand Up @@ -188,7 +188,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/time v0.5.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
Expand Down Expand Up @@ -368,8 +368,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/moov-io/ach v1.37.1 h1:cIH/W/S0PbmoQjGxJBHhgYKwfIXKyXUygN6NyJ1c8nY=
github.com/moov-io/ach v1.37.1/go.mod h1:g/XLp337pbEMctCDazuCHMnmNcoV04aMqZGoQfOsrpI=
github.com/moov-io/ach v1.37.2 h1:Onh8QmINf+z0ZfFmugO7An7l59JmQt8f3txpU0lw+g4=
github.com/moov-io/ach v1.37.2/go.mod h1:g/XLp337pbEMctCDazuCHMnmNcoV04aMqZGoQfOsrpI=
github.com/moov-io/base v0.48.5 h1:QaTyTo6eFFFV35R9l/GdePQN40IJti9knD5hqdWPnnM=
github.com/moov-io/base v0.48.5/go.mod h1:D5ZV9COV/qtCjTQuYpq7gGInCk64AhOQI6UY4kt4Rq8=
github.com/moov-io/cryptfs v0.7.1 h1:aT79fcxvX3vLJ25GMFlsl6WtPLijZ3VWMaRSq8OJu4M=
Expand Down Expand Up @@ -567,8 +567,8 @@ golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.16.0 h1:aDkGMBSYxElaoP81NpoUoz2oo2R2wHdZpGToUxfyQrQ=
golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o=
Expand Down
4 changes: 2 additions & 2 deletions internal/pipeline/events_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func TestEventsAPI_FileUploaded(t *testing.T) {
switch i {
case 0:
// Example: TESTING-143425.52750.ach
require.True(t, strings.HasPrefix(v.Filename, "TESTING-"))
require.True(t, strings.HasSuffix(v.Filename, ".ach"))
require.True(t, strings.HasPrefix(v.Filename, "TESTING-"), v.Filename)
require.True(t, strings.HasSuffix(v.Filename, ".ach"), v.Filename)
case 1:
require.Equal(t, "foo.ach", v.Filename)
}
Expand Down
117 changes: 64 additions & 53 deletions internal/pipeline/merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io/fs"
"path/filepath"
"slices"
"strings"
Expand Down Expand Up @@ -102,7 +101,9 @@ func (m *filesystemMerging) writeACHFile(ctx context.Context, xfer incoming.ACHF
if err := ach.NewWriter(&buf).Write(xfer.File); err != nil {
return err
}
path := filepath.Join("mergable", m.shard.Name, fmt.Sprintf("%s.ach", xfer.FileID))

fileID := strings.TrimSuffix(xfer.FileID, ".ach")
path := filepath.Join("mergable", m.shard.Name, fmt.Sprintf("%s.ach", fileID))
if err := m.storage.WriteFile(path, buf.Bytes()); err != nil {
return err
}
Expand All @@ -116,7 +117,8 @@ func (m *filesystemMerging) writeACHFile(ctx context.Context, xfer incoming.ACHF
"shardKey": log.String(xfer.ShardKey),
}).Logf("ERROR encoding ValidateOpts: %v", err)
}
path := filepath.Join("mergable", m.shard.Name, fmt.Sprintf("%s.json", xfer.FileID))

path := filepath.Join("mergable", m.shard.Name, fmt.Sprintf("%s.json", fileID))
if err := m.storage.WriteFile(path, buf.Bytes()); err != nil {
m.logger.Warn().With(log.Fields{
"fileID": log.String(xfer.FileID),
Expand All @@ -129,7 +131,8 @@ func (m *filesystemMerging) writeACHFile(ctx context.Context, xfer incoming.ACHF
}

func (m *filesystemMerging) HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error) {
path := filepath.Join("mergable", m.shard.Name, fmt.Sprintf("%s.ach", cancel.FileID))
fileID := strings.TrimSuffix(cancel.FileID, ".ach")
path := filepath.Join("mergable", m.shard.Name, fmt.Sprintf("%s.ach", fileID))

// Check if the file exists already
file, _ := m.storage.Open(path)
Expand Down Expand Up @@ -335,7 +338,7 @@ func (m *filesystemMerging) WithEachMerged(ctx context.Context, f func(context.C
attribute.Int("achgateway.successful_remote_writes", successfulRemoteWrites),
)

// Build a mapping of BatchHeader + EntryDetail from dir
// Build a mapping of BatchHeader + EntryDetail from dir (input files)
mappings, err := m.buildDirMapping(dir, canceledFiles)
if err != nil {
el.Add(err)
Expand Down Expand Up @@ -395,79 +398,87 @@ func fileAcceptor(canceledFiles []string) func(string) ach.FileAcceptance {
}
}

// buildDirMapping computes a tree of the input files and their entries together so that we can quickly find
// where they were merged into.
func (m *filesystemMerging) buildDirMapping(dir string, canceledFiles []string) (*treemap.TreeMap[string, string], error) {
tree := treemap.New[string, string]()

fds, err := m.storage.ReadDir(dir)
if err != nil {
return nil, err
}

acceptor := fileAcceptor(canceledFiles)

err := fs.WalkDir(m.storage, dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
if strings.Contains(err.Error(), "is a directory") {
return nil
}
return err
}
if d.IsDir() {
return nil // skip directories
}
for i := range fds {
path := fds[i].Name()

if strings.Contains(path, "uploaded") || strings.HasSuffix(path, ".json") {
// Skip /uploaded/ as we're only interested in the input files.
// Skip .json files as they contain ValidateOpts
return nil
// Ignore directories as ReadDir continues inside of them.
// .json files contain ValidateOpts which we can skip
if fds[i].IsDir() || strings.HasSuffix(path, ".json") {
continue
}

// Skip the file if merging would have skipped it
if acceptor(path) == ach.SkipFile {
return nil
continue
}

fd, err := m.storage.Open(path)
err = m.accumulateMappings(tree, filepath.Join(dir, path))
if err != nil {
return fmt.Errorf("opening %s failed: %w", path, err)
return nil, fmt.Errorf("accumulating mappings from %s failed: %w", path, err)
}
defer fd.Close()

// Check for validate opts
validateOptsPath := strings.TrimSuffix(path, filepath.Ext(path)) + ".json"
var validateOpts *ach.ValidateOpts
if optsFD, err := m.storage.Open(validateOptsPath); err == nil {
if optsFD != nil {
defer optsFD.Close()
}
}

err = json.NewDecoder(optsFD).Decode(&validateOpts)
if err != nil {
return fmt.Errorf("reading %s as validate opts failed: %w", validateOptsPath, err)
}
}
return tree, nil
}

rdr := ach.NewReader(fd)
if validateOpts != nil {
rdr.SetValidation(validateOpts)
func (m *filesystemMerging) accumulateMappings(tree *treemap.TreeMap[string, string], path string) error {
fd, err := m.storage.Open(path)
if err != nil {
return fmt.Errorf("opening %s failed: %w", path, err)
}
defer fd.Close()

// Check for validate opts
validateOptsPath := strings.TrimSuffix(path, filepath.Ext(path)) + ".json"
var validateOpts *ach.ValidateOpts
if optsFD, err := m.storage.Open(validateOptsPath); err == nil {
if optsFD != nil {
defer optsFD.Close()
}

file, err := rdr.Read()
err = json.NewDecoder(optsFD).Decode(&validateOpts)
if err != nil {
return fmt.Errorf("reading %s failed: %w", path, err)
return fmt.Errorf("reading %s as validate opts failed: %w", validateOptsPath, err)
}
}

_, filename := filepath.Split(path)
rdr := ach.NewReader(fd)
if validateOpts != nil {
rdr.SetValidation(validateOpts)
}

file, err := rdr.Read()
if err != nil {
return fmt.Errorf("reading %s failed: %w", path, err)
}

// Add each BatchHeader and Entry to the map
for i := range file.Batches {
bh := file.Batches[i].GetHeader().String()
_, filename := filepath.Split(path)

entries := file.Batches[i].GetEntries()
for m := range entries {
tree.Set(makeKey(bh, entries[m]), filename)
}
}
// Add each BatchHeader and Entry to the map
for i := range file.Batches {
bh := file.Batches[i].GetHeader().String()

return nil
})
entries := file.Batches[i].GetEntries()

return tree, err
for m := range entries {
key := makeKey(bh, entries[m])
tree.Set(key, filename)
}
}

return nil
}

func makeKey(bh string, entry *ach.EntryDetail) string {
Expand Down
Loading

0 comments on commit 0b86fd6

Please sign in to comment.