Skip to content

Commit

Permalink
Add header option support to the CSV module (#4295)
Browse files Browse the repository at this point in the history
* Refactor csv Reader to return any

* Add header option to csv module parse function and Parser class

* Apply Pull Request Review Suggestions

* Apply suggestions from code review

Co-authored-by: Ivan <[email protected]>

* Apply Pull Request suggestions

* Apply Pull Request suggestions

* Update internal/js/modules/k6/experimental/csv/module_test.go

Co-authored-by: İnanç Gümüş <[email protected]>

* Fix missing closing curly brace in test

---------

Co-authored-by: Ivan <[email protected]>
Co-authored-by: İnanç Gümüş <[email protected]>
  • Loading branch information
3 people authored Feb 4, 2025
1 parent 7162993 commit 8799c81
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 43 deletions.
2 changes: 1 addition & 1 deletion internal/js/modules/k6/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (d *Data) sharedArray(call sobek.ConstructorCall) *sobek.Object {
// The data module RecordReader interface is implemented by types that can read data that can be
// treated as records, from data sources such as a CSV file, etc.
type RecordReader interface {
Read() ([]string, error)
Read() (any, error)
}

// NewSharedArrayFrom creates a new shared array from the provided data.
Expand Down
29 changes: 24 additions & 5 deletions internal/js/modules/k6/experimental/csv/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (mi *ModuleInstance) Parse(file sobek.Value, options sobek.Value) *sobek.Pr

rt := mi.vu.Runtime()

// 1. Make sure the Sobek object is a fs.File (sobek operation)
// 1. Make sure the Sobek object is a fs.File (Sobek operation)
var fileObj fs.File
if err := mi.vu.Runtime().ExportTo(file, &fileObj); err != nil {
reject(fmt.Errorf("first argument expected to be a fs.File instance, got %T instead", file))
Expand Down Expand Up @@ -192,11 +192,11 @@ func (p *Parser) Next() *sobek.Promise {
promise, resolve, reject := promises.New(p.vu)

go func() {
var records []string
var record any
var done bool
var err error

records, err = p.reader.Read()
record, err = p.reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
resolve(parseResult{Done: true, Value: []string{}})
Expand All @@ -209,7 +209,7 @@ func (p *Parser) Next() *sobek.Promise {

p.currentLine.Add(1)

resolve(parseResult{Done: done, Value: records})
resolve(parseResult{Done: done, Value: record})
}()

return promise
Expand All @@ -222,7 +222,7 @@ type parseResult struct {
Done bool `js:"done"`

// Value holds the line's records value.
Value []string `js:"value"`
Value any `js:"value"`
}

// options holds options used to configure CSV parsing when utilizing the module.
Expand All @@ -243,13 +243,28 @@ type options struct {

// ToLine indicates the line at which to stop reading the CSV file (inclusive).
ToLine null.Int `js:"toLine"`

// AsObjects indicates that the CSV rows should be returned as objects, where
// the keys are the header column names, and values are the corresponding
// row values.
//
// When this option is enabled, the first line of the CSV file is treated as the header.
//
// If the option is set and no header line is present, this should be considered an error
// case.
//
// This option is incompatible with the [SkipFirstLine] option, and if both are set, an error
// should be returned. Same thing applies if the [FromLine] option is set to a value greater
// than 0.
AsObjects null.Bool `js:"asObjects"`
}

// newDefaultParserOptions creates a new options instance with default values.
func newDefaultParserOptions() options {
return options{
Delimiter: ',',
SkipFirstLine: false,
AsObjects: null.BoolFrom(false),
}
}

Expand Down Expand Up @@ -284,6 +299,10 @@ func newParserOptionsFrom(obj *sobek.Object) (options, error) {
options.ToLine = null.IntFrom(v.ToInteger())
}

if v := obj.Get("asObjects"); v != nil {
options.AsObjects = null.BoolFrom(v.ToBoolean())
}

if options.FromLine.Valid && options.ToLine.Valid && options.FromLine.Int64 >= options.ToLine.Int64 {
return options, fmt.Errorf("fromLine must be less than or equal to toLine")
}
Expand Down
108 changes: 108 additions & 0 deletions internal/js/modules/k6/experimental/csv/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,44 @@ func TestParserConstructor(t *testing.T) {
require.NoError(t, err)
})

t.Run("constructing a parser with both asObjects and skipFirstLine options should fail", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const parser = new csv.Parser(file, { delimiter: ';', skipFirstLine: true, asObjects: true });
`, testFilePath)))

require.Error(t, err)
})

t.Run("constructing a parser with both the asObjects option and fromLine option greater than 0 should fail", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const parser = new csv.Parser(file, { delimiter: ';', fromLine: 1, asObjects: true });
`, testFilePath)))

require.Error(t, err)
})

t.Run("constructing a parser without providing a file instance should fail", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -354,6 +392,44 @@ func TestParserNext(t *testing.T) {
require.NoError(t, err)
})

t.Run("next with header option should return records as objects and succeed", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const parser = new csv.Parser(file, { asObjects: true });
let gotParsedCount = 0;
let { done, value } = await parser.next();
while (!done) {
if (typeof value !== 'object' || value === null || Array.isArray(value)) {
throw new Error("Expected record to be an object, but got " + typeof value);
}
if (Object.keys(value).length !== 6) {
throw new Error("Expected record to have 6 fields, but got " + Object.keys(value).length);
}
gotParsedCount++;
({ done, value } = await parser.next());
}
if (gotParsedCount !== 10) {
throw new Error("Expected to parse 10 records, but got " + gotParsedCount);
}
`, testFilePath)))
require.NoError(t, err)
})

t.Run("calling next on a parser that has reached EOF should return done=true and no value", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -527,6 +603,38 @@ func TestParse(t *testing.T) {

require.NoError(t, err)
})

t.Run("parse respects the header option, returns records as objects and succeeds", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const csvRecords = await csv.parse(file, { asObjects: true });
if (csvRecords.length !== 10) {
throw new Error("Expected 10 records, but got " + csvRecords.length);
}
for (const record of csvRecords) {
if (typeof record !== 'object' || record === null || Array.isArray(record)) {
throw new Error("Expected record to be an object, but got " + typeof record);
}
if (Object.keys(record).length !== 6) {
throw new Error("Expected record to have 6 fields, but got " + Object.keys(record).length);
}
}
`, testFilePath)))
require.NoError(t, err)
})
}

const initGlobals = `
Expand Down
121 changes: 85 additions & 36 deletions internal/js/modules/k6/experimental/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"sync/atomic"

"go.k6.io/k6/internal/js/modules/k6/data"
)

// Reader is a CSV reader.
Expand All @@ -18,6 +20,10 @@ type Reader struct {

// options holds the reader's options.
options options

// columnNames stores the column names when the asObjects option is enabled
// in order to be able to map each row values to their corresponding column.
columnNames []string
}

// NewReaderFrom creates a new CSV reader from the provided io.Reader.
Expand All @@ -33,7 +39,10 @@ func NewReaderFrom(r io.Reader, options options) (*Reader, error) {
return nil, fmt.Errorf("the reader cannot be nil")
}

// Ensure the default delimiter is set.
if err := validateOptions(options); err != nil {
return nil, err
}

if options.Delimiter == 0 {
options.Delimiter = ','
}
Expand All @@ -46,47 +55,27 @@ func NewReaderFrom(r io.Reader, options options) (*Reader, error) {
options: options,
}

var (
fromLineSet = options.FromLine.Valid
toLineSet = options.ToLine.Valid
skipFirstLineSet = options.SkipFirstLine
fromLineIsPositive = fromLineSet && options.FromLine.Int64 >= 0
toLineIsPositive = toLineSet && options.ToLine.Int64 >= 0
)

// If set, the fromLine option should either be greater than or equal to 0.
if fromLineSet && !fromLineIsPositive {
return nil, fmt.Errorf("the 'fromLine' option must be greater than or equal to 0; got %d", options.FromLine.Int64)
}

// If set, the toLine option should be strictly greater than or equal to 0.
if toLineSet && !toLineIsPositive {
return nil, fmt.Errorf("the 'toLine' option must be greater than or equal to 0; got %d", options.ToLine.Int64)
}

// if the `fromLine` and `toLine` options are set, and `fromLine` is greater or equal to `toLine`, we return an error.
if fromLineSet && toLineSet && options.FromLine.Int64 >= options.ToLine.Int64 {
return nil, fmt.Errorf(
"the 'fromLine' option must be less than the 'toLine' option; got 'fromLine': %d, 'toLine': %d",
options.FromLine.Int64, options.ToLine.Int64,
)
asObjectsEnabled := options.AsObjects.Valid && options.AsObjects.Bool
if asObjectsEnabled {
header, err := csvParser.Read()
if err != nil {
return nil, fmt.Errorf("failed to read the first line; reason: %w", err)
}
reader.columnNames = header
reader.currentLine.Add(1)
}

// If the user wants to skip the first line, we consume and discard it.
if skipFirstLineSet && (!fromLineSet || options.FromLine.Int64 == 0) {
_, err := csvParser.Read()
if err != nil {
if options.SkipFirstLine && (!options.FromLine.Valid || options.FromLine.Int64 == 0) {
if _, err := csvParser.Read(); err != nil {
return nil, fmt.Errorf("failed to skip the first line; reason: %w", err)
}

reader.currentLine.Add(1)
}

if fromLineSet && options.FromLine.Int64 > 0 {
// We skip lines until we reach the specified line.
if options.FromLine.Valid && options.FromLine.Int64 > 0 {
for reader.currentLine.Load() < options.FromLine.Int64 {
_, err := csvParser.Read()
if err != nil {
if _, err := csvParser.Read(); err != nil {
return nil, fmt.Errorf("failed to skip lines until line %d; reason: %w", options.FromLine.Int64, err)
}
reader.currentLine.Add(1)
Expand All @@ -96,20 +85,80 @@ func NewReaderFrom(r io.Reader, options options) (*Reader, error) {
return reader, nil
}

func (r *Reader) Read() ([]string, error) {
// The csv module's read must implement the RecordReader interface.
var _ data.RecordReader = (*Reader)(nil)

// Read reads a record from the CSV file.
//
// If the `header` option is enabled, it will return a map of the record.
// Otherwise, it will return the record as a slice of strings.
func (r *Reader) Read() (any, error) {
toLineSet := r.options.ToLine.Valid

// If the `toLine` option was set and we have reached it, we return EOF.
if toLineSet && r.options.ToLine.Int64 > 0 && r.currentLine.Load() > r.options.ToLine.Int64 {
return nil, io.EOF
}

records, err := r.csv.Read()
record, err := r.csv.Read()
if err != nil {
return nil, err
}

r.currentLine.Add(1)

return records, nil
// If header option is enabled, return a map of the record.
if r.options.AsObjects.Valid && r.options.AsObjects.Bool {
if r.columnNames == nil {
return nil, fmt.Errorf("the 'asObjects' option is enabled, but no header was found")
}

if len(record) != len(r.columnNames) {
return nil, fmt.Errorf("record length (%d) doesn't match header length (%d)", len(record), len(r.columnNames))
}

recordMap := make(map[string]string)
for i, value := range record {
recordMap[r.columnNames[i]] = value
}

return recordMap, nil
}

return record, nil
}

// validateOptions validates the reader options and returns an error if any validation fails.
func validateOptions(options options) error {
var (
fromLineSet = options.FromLine.Valid
toLineSet = options.ToLine.Valid
skipFirstLineSet = options.SkipFirstLine
asObjectsEnabled = options.AsObjects.Valid && options.AsObjects.Bool
)

if asObjectsEnabled && skipFirstLineSet {
return fmt.Errorf("the 'header' option cannot be enabled when 'skipFirstLine' is true")
}

if asObjectsEnabled && fromLineSet && options.FromLine.Int64 > 0 {
return fmt.Errorf("the 'header' option cannot be enabled when 'fromLine' is set to a value greater than 0")
}

if fromLineSet && options.FromLine.Int64 < 0 {
return fmt.Errorf("the 'fromLine' option must be greater than or equal to 0; got %d", options.FromLine.Int64)
}

if toLineSet && options.ToLine.Int64 < 0 {
return fmt.Errorf("the 'toLine' option must be greater than or equal to 0; got %d", options.ToLine.Int64)
}

if fromLineSet && toLineSet && options.FromLine.Int64 >= options.ToLine.Int64 {
return fmt.Errorf(
"the 'fromLine' option must be less than the 'toLine' option; got 'fromLine': %d, 'toLine': %d",
options.FromLine.Int64, options.ToLine.Int64,
)
}

return nil
}
Loading

0 comments on commit 8799c81

Please sign in to comment.