Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add header option support to the CSV module #4295

Merged
merged 8 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/js/modules/k6/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (d *Data) sharedArray(call sobek.ConstructorCall) *sobek.Object {
// The data module RecordReader interface is implemented by types that can read data that can be
// treated as records, from data sources such as a CSV file, etc.
type RecordReader interface {
Read() ([]string, error)
Read() (any, error)
}

// NewSharedArrayFrom creates a new shared array from the provided data.
Expand Down
29 changes: 24 additions & 5 deletions internal/js/modules/k6/experimental/csv/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (mi *ModuleInstance) Parse(file sobek.Value, options sobek.Value) *sobek.Pr

rt := mi.vu.Runtime()

// 1. Make sure the Sobek object is a fs.File (sobek operation)
// 1. Make sure the Sobek object is a fs.File (Sobek operation)
var fileObj fs.File
if err := mi.vu.Runtime().ExportTo(file, &fileObj); err != nil {
reject(fmt.Errorf("first argument expected to be a fs.File instance, got %T instead", file))
Expand Down Expand Up @@ -192,11 +192,11 @@ func (p *Parser) Next() *sobek.Promise {
promise, resolve, reject := promises.New(p.vu)

go func() {
var records []string
var record any
var done bool
var err error

records, err = p.reader.Read()
record, err = p.reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
resolve(parseResult{Done: true, Value: []string{}})
Expand All @@ -209,7 +209,7 @@ func (p *Parser) Next() *sobek.Promise {

p.currentLine.Add(1)

resolve(parseResult{Done: done, Value: records})
resolve(parseResult{Done: done, Value: record})
}()

return promise
Expand All @@ -222,7 +222,7 @@ type parseResult struct {
Done bool `js:"done"`

// Value holds the line's records value.
Value []string `js:"value"`
Value any `js:"value"`
}

// options holds options used to configure CSV parsing when utilizing the module.
Expand All @@ -243,13 +243,28 @@ type options struct {

// ToLine indicates the line at which to stop reading the CSV file (inclusive).
ToLine null.Int `js:"toLine"`

// AsObjects indicates that the CSV rows should be returned as objects, where
// the keys are the header column names, and values are the corresponding
// row values.
//
// When this option is enabled, the first line of the CSV file is treated as the header.
//
// If the option is set and no header line is present, this should be considered an error
// case.
//
// This option is incompatible with the [SkipFirstLine] option, and if both are set, an error
// should be returned. Same thing applies if the [FromLine] option is set to a value greater
// than 0.
AsObjects null.Bool `js:"asObjects"`
}

// newDefaultParserOptions creates a new options instance with default values.
func newDefaultParserOptions() options {
return options{
Delimiter: ',',
SkipFirstLine: false,
AsObjects: null.BoolFrom(false),
}
}

Expand Down Expand Up @@ -284,6 +299,10 @@ func newParserOptionsFrom(obj *sobek.Object) (options, error) {
options.ToLine = null.IntFrom(v.ToInteger())
}

if v := obj.Get("asObjects"); v != nil {
options.AsObjects = null.BoolFrom(v.ToBoolean())
}

if options.FromLine.Valid && options.ToLine.Valid && options.FromLine.Int64 >= options.ToLine.Int64 {
return options, fmt.Errorf("fromLine must be less than or equal to toLine")
}
Expand Down
108 changes: 108 additions & 0 deletions internal/js/modules/k6/experimental/csv/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,44 @@ func TestParserConstructor(t *testing.T) {
require.NoError(t, err)
})

t.Run("constructing a parser with both asObjects and skipFirstLine options should fail", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const parser = new csv.Parser(file, { delimiter: ';', skipFirstLine: true, asObjects: true });
`, testFilePath)))

require.Error(t, err)
})

t.Run("constructing a parser with both the asObjects option and fromLine option greater than 0 should fail", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const parser = new csv.Parser(file, { delimiter: ';', fromLine: 1, asObjects: true });
`, testFilePath)))

require.Error(t, err)
})

t.Run("constructing a parser without providing a file instance should fail", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -354,6 +392,44 @@ func TestParserNext(t *testing.T) {
require.NoError(t, err)
})

t.Run("next with header option should return records as objects and succeed", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const parser = new csv.Parser(file, { asObjects: true });
let gotParsedCount = 0;

let { done, value } = await parser.next();
while (!done) {
if (typeof value !== 'object' || value === null || Array.isArray(value)) {
throw new Error("Expected record to be an object, but got " + typeof value);
}

if (Object.keys(value).length !== 6) {
throw new Error("Expected record to have 6 fields, but got " + Object.keys(value).length);
}

gotParsedCount++;
({ done, value } = await parser.next());
}

if (gotParsedCount !== 10) {
throw new Error("Expected to parse 10 records, but got " + gotParsedCount);
}

`, testFilePath)))
require.NoError(t, err)
})

t.Run("calling next on a parser that has reached EOF should return done=true and no value", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -527,6 +603,38 @@ func TestParse(t *testing.T) {

require.NoError(t, err)
})

t.Run("parse respects the header option, returns records as objects and succeeds", func(t *testing.T) {
t.Parallel()

r, err := newConfiguredRuntime(t)
require.NoError(t, err)

// Ensure the testdata.csv file is present on the test filesystem.
r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error {
return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644)
})

_, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(`
const file = await fs.open(%q);
const csvRecords = await csv.parse(file, { asObjects: true });

if (csvRecords.length !== 10) {
throw new Error("Expected 11 records, but got " + csvRecords.length);
}
oleiade marked this conversation as resolved.
Show resolved Hide resolved

for (const record of csvRecords) {
if (typeof record !== 'object' || record === null || Array.isArray(record)) {
throw new Error("Expected record to be an object, but got " + typeof record);
}

if (Object.keys(record).length !== 6) {
throw new Error("Expected record to have 6 fields, but got " + Object.keys(record).length);
}
}
`, testFilePath)))
require.NoError(t, err)
})
}

const initGlobals = `
Expand Down
121 changes: 85 additions & 36 deletions internal/js/modules/k6/experimental/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"sync/atomic"

"go.k6.io/k6/internal/js/modules/k6/data"
)

// Reader is a CSV reader.
Expand All @@ -18,6 +20,10 @@ type Reader struct {

// options holds the reader's options.
options options

// columnNames stores the column names when the asObjects option is enabled
// in order to be able to map each row values to their corresponding column.
columnNames []string
}

// NewReaderFrom creates a new CSV reader from the provided io.Reader.
Expand All @@ -33,7 +39,10 @@ func NewReaderFrom(r io.Reader, options options) (*Reader, error) {
return nil, fmt.Errorf("the reader cannot be nil")
}

// Ensure the default delimiter is set.
if err := validateOptions(options); err != nil {
return nil, err
}

if options.Delimiter == 0 {
options.Delimiter = ','
}
Expand All @@ -46,47 +55,27 @@ func NewReaderFrom(r io.Reader, options options) (*Reader, error) {
options: options,
}

var (
fromLineSet = options.FromLine.Valid
toLineSet = options.ToLine.Valid
skipFirstLineSet = options.SkipFirstLine
fromLineIsPositive = fromLineSet && options.FromLine.Int64 >= 0
toLineIsPositive = toLineSet && options.ToLine.Int64 >= 0
)

// If set, the fromLine option should either be greater than or equal to 0.
if fromLineSet && !fromLineIsPositive {
return nil, fmt.Errorf("the 'fromLine' option must be greater than or equal to 0; got %d", options.FromLine.Int64)
}

// If set, the toLine option should be strictly greater than or equal to 0.
if toLineSet && !toLineIsPositive {
return nil, fmt.Errorf("the 'toLine' option must be greater than or equal to 0; got %d", options.ToLine.Int64)
}

// if the `fromLine` and `toLine` options are set, and `fromLine` is greater or equal to `toLine`, we return an error.
if fromLineSet && toLineSet && options.FromLine.Int64 >= options.ToLine.Int64 {
return nil, fmt.Errorf(
"the 'fromLine' option must be less than the 'toLine' option; got 'fromLine': %d, 'toLine': %d",
options.FromLine.Int64, options.ToLine.Int64,
)
asObjectsEnabled := options.AsObjects.Valid && options.AsObjects.Bool
if asObjectsEnabled {
header, err := csvParser.Read()
if err != nil {
return nil, fmt.Errorf("failed to read the first line; reason: %w", err)
}
reader.columnNames = header
reader.currentLine.Add(1)
}

// If the user wants to skip the first line, we consume and discard it.
if skipFirstLineSet && (!fromLineSet || options.FromLine.Int64 == 0) {
_, err := csvParser.Read()
if err != nil {
if options.SkipFirstLine && (!options.FromLine.Valid || options.FromLine.Int64 == 0) {
if _, err := csvParser.Read(); err != nil {
return nil, fmt.Errorf("failed to skip the first line; reason: %w", err)
}

reader.currentLine.Add(1)
}

if fromLineSet && options.FromLine.Int64 > 0 {
// We skip lines until we reach the specified line.
if options.FromLine.Valid && options.FromLine.Int64 > 0 {
for reader.currentLine.Load() < options.FromLine.Int64 {
_, err := csvParser.Read()
if err != nil {
if _, err := csvParser.Read(); err != nil {
return nil, fmt.Errorf("failed to skip lines until line %d; reason: %w", options.FromLine.Int64, err)
}
reader.currentLine.Add(1)
Expand All @@ -96,20 +85,80 @@ func NewReaderFrom(r io.Reader, options options) (*Reader, error) {
return reader, nil
}

func (r *Reader) Read() ([]string, error) {
// The csv module's read must implement the RecordReader interface.
var _ data.RecordReader = (*Reader)(nil)

// Read reads a record from the CSV file.
//
// If the `header` option is enabled, it will return a map of the record.
// Otherwise, it will return the record as a slice of strings.
func (r *Reader) Read() (any, error) {
toLineSet := r.options.ToLine.Valid

// If the `toLine` option was set and we have reached it, we return EOF.
if toLineSet && r.options.ToLine.Int64 > 0 && r.currentLine.Load() > r.options.ToLine.Int64 {
return nil, io.EOF
}

records, err := r.csv.Read()
record, err := r.csv.Read()
if err != nil {
return nil, err
}

r.currentLine.Add(1)

return records, nil
// If header option is enabled, return a map of the record.
oleiade marked this conversation as resolved.
Show resolved Hide resolved
if r.options.AsObjects.Valid && r.options.AsObjects.Bool {
if r.columnNames == nil {
return nil, fmt.Errorf("the 'asObjects' option is enabled, but no header was found")
}

if len(record) != len(r.columnNames) {
return nil, fmt.Errorf("record length (%d) doesn't match header length (%d)", len(record), len(r.columnNames))
}

recordMap := make(map[string]string)
for i, value := range record {
recordMap[r.columnNames[i]] = value
}

return recordMap, nil
}

return record, nil
}

// validateOptions validates the reader options and returns an error if any validation fails.
func validateOptions(options options) error {
var (
fromLineSet = options.FromLine.Valid
toLineSet = options.ToLine.Valid
skipFirstLineSet = options.SkipFirstLine
asObjectsEnabled = options.AsObjects.Valid && options.AsObjects.Bool
)

if asObjectsEnabled && skipFirstLineSet {
return fmt.Errorf("the 'header' option cannot be enabled when 'skipFirstLine' is true")
}

if asObjectsEnabled && fromLineSet && options.FromLine.Int64 > 0 {
return fmt.Errorf("the 'header' option cannot be enabled when 'fromLine' is set to a value greater than 0")
}

if fromLineSet && options.FromLine.Int64 < 0 {
return fmt.Errorf("the 'fromLine' option must be greater than or equal to 0; got %d", options.FromLine.Int64)
}

if toLineSet && options.ToLine.Int64 < 0 {
return fmt.Errorf("the 'toLine' option must be greater than or equal to 0; got %d", options.ToLine.Int64)
}

if fromLineSet && toLineSet && options.FromLine.Int64 >= options.ToLine.Int64 {
return fmt.Errorf(
"the 'fromLine' option must be less than the 'toLine' option; got 'fromLine': %d, 'toLine': %d",
options.FromLine.Int64, options.ToLine.Int64,
)
}

return nil
}
Loading
Loading