diff --git a/support/datastore/datastore.go b/support/datastore/datastore.go index 8ee7f6f24a..f162d3e0c7 100644 --- a/support/datastore/datastore.go +++ b/support/datastore/datastore.go @@ -57,6 +57,8 @@ func NewDataStore(ctx context.Context, datastoreConfig DataStoreConfig) (DataSto return NewGCSDataStore(ctx, datastoreConfig) case "S3": return NewS3DataStore(ctx, datastoreConfig) + case "Filesystem": + return NewFilesystemDataStore(ctx, datastoreConfig) default: return nil, fmt.Errorf("invalid datastore type %v, not supported", datastoreConfig.Type) diff --git a/support/datastore/filesystem.go b/support/datastore/filesystem.go new file mode 100644 index 0000000000..7c77220d12 --- /dev/null +++ b/support/datastore/filesystem.go @@ -0,0 +1,250 @@ +package datastore + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "time" + + "github.com/stellar/go-stellar-sdk/support/log" +) + +const ( + defaultDirPerms os.FileMode = 0755 + defaultFilePerms os.FileMode = 0644 +) + +var _ DataStore = &FilesystemDataStore{} + +// FilesystemDataStore implements DataStore for local filesystem storage. +// +// Note: This implementation is not recommended for production use. It is +// intended for development and testing purposes only. +// +// This implementation does not support storing metadata. The metaData +// parameter in PutFile and PutFileIfNotExists is ignored, and GetFileMetadata +// always returns an empty map. +// +// Concurrent writes to the same file path are not safe and may result in +// data corruption. Callers must ensure proper synchronization when writing +// to the same path from multiple processes. +type FilesystemDataStore struct { + basePath string +} + +// NewFilesystemDataStore creates a new FilesystemDataStore from configuration. +func NewFilesystemDataStore(ctx context.Context, datastoreConfig DataStoreConfig) (DataStore, error) { + destinationPath, ok := datastoreConfig.Params["destination_path"] + if !ok { + return nil, errors.New("invalid Filesystem config, no destination_path") + } + + return NewFilesystemDataStoreWithPath(destinationPath) +} + +// NewFilesystemDataStoreWithPath creates a FilesystemDataStore with the given base path. +func NewFilesystemDataStoreWithPath(basePath string) (DataStore, error) { + absPath, err := filepath.Abs(basePath) + if err != nil { + return nil, fmt.Errorf("failed to resolve absolute path: %w", err) + } + + log.Debugf("Creating Filesystem datastore at: %s", absPath) + + return &FilesystemDataStore{ + basePath: absPath, + }, nil +} + +// fullPath returns the full filesystem path for a given relative path. +func (f *FilesystemDataStore) fullPath(path string) string { + return filepath.Join(f.basePath, path) +} + +// Exists checks if a file exists in the filesystem. +func (f *FilesystemDataStore) Exists(ctx context.Context, path string) (bool, error) { + _, err := os.Stat(f.fullPath(path)) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err +} + +// Size returns the size of a file in bytes. +func (f *FilesystemDataStore) Size(ctx context.Context, path string) (int64, error) { + info, err := os.Stat(f.fullPath(path)) + if err != nil { + if os.IsNotExist(err) { + return 0, os.ErrNotExist + } + return 0, err + } + return info.Size(), nil +} + +// GetFileLastModified returns the last modification time of a file. +func (f *FilesystemDataStore) GetFileLastModified(ctx context.Context, path string) (time.Time, error) { + info, err := os.Stat(f.fullPath(path)) + if err != nil { + if os.IsNotExist(err) { + return time.Time{}, os.ErrNotExist + } + return time.Time{}, err + } + return info.ModTime(), nil +} + +// GetFile returns a reader for the file at the given path. +func (f *FilesystemDataStore) GetFile(ctx context.Context, path string) (io.ReadCloser, error) { + file, err := os.Open(f.fullPath(path)) + if err != nil { + if os.IsNotExist(err) { + return nil, os.ErrNotExist + } + return nil, fmt.Errorf("error opening file %s: %w", path, err) + } + log.Debugf("File retrieved successfully: %s", path) + return file, nil +} + +// GetFileMetadata returns an empty map as filesystem storage does not support metadata. +func (f *FilesystemDataStore) GetFileMetadata(ctx context.Context, path string) (map[string]string, error) { + if _, err := os.Stat(f.fullPath(path)); os.IsNotExist(err) { + return nil, os.ErrNotExist + } + return map[string]string{}, nil +} + +// PutFile writes a file to the filesystem. +func (f *FilesystemDataStore) PutFile(ctx context.Context, path string, in io.WriterTo, metaData map[string]string) error { + fullPath := f.fullPath(path) + + // Create parent directories + dir := filepath.Dir(fullPath) + if err := os.MkdirAll(dir, defaultDirPerms); err != nil { + return fmt.Errorf("failed to create directory %s: %w", dir, err) + } + + // Write the data file + file, err := os.Create(fullPath) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", path, err) + } + + if _, err := in.WriteTo(file); err != nil { + file.Close() + return fmt.Errorf("failed to write file %s: %w", path, err) + } + + if err := file.Close(); err != nil { + return fmt.Errorf("failed to close file %s: %w", path, err) + } + + log.Debugf("File written successfully: %s", path) + return nil +} + +// PutFileIfNotExists writes a file only if it doesn't already exist. +func (f *FilesystemDataStore) PutFileIfNotExists( + ctx context.Context, path string, in io.WriterTo, metaData map[string]string, +) (bool, error) { + fullPath := f.fullPath(path) + + // Create parent directories + dir := filepath.Dir(fullPath) + if err := os.MkdirAll(dir, defaultDirPerms); err != nil { + return false, fmt.Errorf("failed to create directory %s: %w", dir, err) + } + + // Use O_CREATE|O_EXCL for atomic check-and-create + file, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, defaultFilePerms) + if err != nil { + if os.IsExist(err) { + log.Debugf("File already exists: %s", path) + return false, nil + } + return false, fmt.Errorf("failed to create file %s: %w", path, err) + } + + if _, err := in.WriteTo(file); err != nil { + file.Close() + os.Remove(fullPath) // Clean up on error + return false, fmt.Errorf("failed to write file %s: %w", path, err) + } + + if err := file.Close(); err != nil { + return false, fmt.Errorf("failed to close file %s: %w", path, err) + } + + log.Debugf("File written successfully: %s", path) + return true, nil +} + +// ListFilePaths lists file paths matching the given options. +// Results are returned in lexicographical order (matching GCS/S3 behavior). +func (f *FilesystemDataStore) ListFilePaths(ctx context.Context, options ListFileOptions) ([]string, error) { + limit := options.Limit + if limit <= 0 || limit > listFilePathsMaxLimit { + limit = listFilePathsMaxLimit + } + + var files []string + err := filepath.WalkDir(f.basePath, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + + // Check for context cancellation + if ctx.Err() != nil { + return ctx.Err() + } + + // Skip directories + if d.IsDir() { + return nil + } + + // Get path relative to basePath and normalize to forward slashes + relPath, err := filepath.Rel(f.basePath, path) + if err != nil { + return err + } + relPath = filepath.ToSlash(relPath) + + // Apply prefix filter + if options.Prefix != "" && !strings.HasPrefix(relPath, options.Prefix) { + return nil + } + + // Apply StartAfter filter (WalkDir walks in lexical order) + if options.StartAfter != "" && relPath <= options.StartAfter { + return nil + } + + files = append(files, relPath) + + // Stop early if we've reached the limit + if len(files) >= int(limit) { + return filepath.SkipAll + } + + return nil + }) + if err != nil && err != filepath.SkipAll { + return nil, err + } + + return files, nil +} + +// Close is a no-op for FilesystemDataStore as it doesn't maintain persistent connections. +func (f *FilesystemDataStore) Close() error { + return nil +} diff --git a/support/datastore/filesystem_test.go b/support/datastore/filesystem_test.go new file mode 100644 index 0000000000..08926a2ba2 --- /dev/null +++ b/support/datastore/filesystem_test.go @@ -0,0 +1,374 @@ +package datastore + +import ( + "bytes" + "context" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFilesystemExists(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + // Create a test file + content := []byte("test content") + err = os.WriteFile(filepath.Join(dir, "file.txt"), content, 0600) + require.NoError(t, err) + + exists, err := store.Exists(context.Background(), "file.txt") + require.NoError(t, err) + require.True(t, exists) + + exists, err = store.Exists(context.Background(), "missing-file.txt") + require.NoError(t, err) + require.False(t, exists) +} + +func TestFilesystemSize(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + content := []byte("inside the file") + err = os.WriteFile(filepath.Join(dir, "file.txt"), content, 0600) + require.NoError(t, err) + + size, err := store.Size(context.Background(), "file.txt") + require.NoError(t, err) + require.Equal(t, int64(len(content)), size) + + _, err = store.Size(context.Background(), "missing-file.txt") + require.ErrorIs(t, err, os.ErrNotExist) +} + +func TestFilesystemPutFile(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + content := []byte("inside the file") + writerTo := bytes.NewReader(content) + err = store.PutFile(context.Background(), "file.txt", writerTo, nil) + require.NoError(t, err) + + reader, err := store.GetFile(context.Background(), "file.txt") + require.NoError(t, err) + requireReaderContentEquals(t, reader, content) + + metadata, err := store.GetFileMetadata(context.Background(), "file.txt") + require.NoError(t, err) + require.Equal(t, map[string]string{}, metadata) + + // Test overwriting + otherContent := []byte("other text") + writerTo = bytes.NewReader(otherContent) + err = store.PutFile(context.Background(), "file.txt", writerTo, nil) + require.NoError(t, err) + + reader, err = store.GetFile(context.Background(), "file.txt") + require.NoError(t, err) + requireReaderContentEquals(t, reader, otherContent) +} + +func TestFilesystemPutFileCreatesDirectories(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + content := []byte("nested file content") + writerTo := bytes.NewReader(content) + err = store.PutFile(context.Background(), "a/b/c/file.txt", writerTo, nil) + require.NoError(t, err) + + reader, err := store.GetFile(context.Background(), "a/b/c/file.txt") + require.NoError(t, err) + requireReaderContentEquals(t, reader, content) +} + +func TestFilesystemPutFileIfNotExists(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + existingContent := []byte("existing content") + err = os.WriteFile(filepath.Join(dir, "file.txt"), existingContent, 0600) + require.NoError(t, err) + + // Attempt to overwrite - should fail + newContent := []byte("new content") + writerTo := bytes.NewReader(newContent) + ok, err := store.PutFileIfNotExists(context.Background(), "file.txt", writerTo, nil) + require.NoError(t, err) + require.False(t, ok) + + // Verify content unchanged + reader, err := store.GetFile(context.Background(), "file.txt") + require.NoError(t, err) + requireReaderContentEquals(t, reader, existingContent) + + // Create new file - should succeed + writerTo = bytes.NewReader(newContent) + ok, err = store.PutFileIfNotExists(context.Background(), "other-file.txt", writerTo, nil) + require.NoError(t, err) + require.True(t, ok) + + reader, err = store.GetFile(context.Background(), "other-file.txt") + require.NoError(t, err) + requireReaderContentEquals(t, reader, newContent) +} + +func TestFilesystemGetFileLastModified(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + content := []byte("inside the file") + writerTo := bytes.NewReader(content) + err = store.PutFile(context.Background(), "file.txt", writerTo, nil) + require.NoError(t, err) + + lastModified, err := store.GetFileLastModified(context.Background(), "file.txt") + require.NoError(t, err) + require.NotZero(t, lastModified) +} + +func TestFilesystemGetNonExistentFile(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, store.Close()) + }) + + // Create a different file + content := []byte("inside the file") + err = os.WriteFile(filepath.Join(dir, "file.txt"), content, 0600) + require.NoError(t, err) + + _, err = store.GetFile(context.Background(), "other-file.txt") + require.ErrorIs(t, err, os.ErrNotExist) + + metadata, err := store.GetFileMetadata(context.Background(), "other-file.txt") + require.ErrorIs(t, err, os.ErrNotExist) + require.Nil(t, metadata) +} + +func TestFilesystemListFilePaths(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + // Create test files + for _, name := range []string{"a", "b", "c"} { + err = os.WriteFile(filepath.Join(dir, name), []byte("1"), 0600) + require.NoError(t, err) + } + + paths, err := store.ListFilePaths(context.Background(), ListFileOptions{Limit: 2}) + require.NoError(t, err) + require.Equal(t, []string{"a", "b"}, paths) +} + +func TestFilesystemListFilePaths_WithPrefix(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + // Create directory structure + require.NoError(t, os.MkdirAll(filepath.Join(dir, "a"), 0755)) + require.NoError(t, os.MkdirAll(filepath.Join(dir, "b"), 0755)) + require.NoError(t, os.WriteFile(filepath.Join(dir, "a", "x"), []byte("1"), 0600)) + require.NoError(t, os.WriteFile(filepath.Join(dir, "a", "y"), []byte("1"), 0600)) + require.NoError(t, os.WriteFile(filepath.Join(dir, "b", "z"), []byte("1"), 0600)) + + paths, err := store.ListFilePaths(context.Background(), ListFileOptions{Prefix: "a", Limit: 10}) + require.NoError(t, err) + require.Equal(t, []string{"a/x", "a/y"}, paths) +} + +func TestFilesystemListFilePaths_LimitDefaultAndCap(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + // Create 1200 files + for i := 0; i < 1200; i++ { + err = os.WriteFile(filepath.Join(dir, fmt.Sprintf("%04d", i)), []byte("1"), 0600) + require.NoError(t, err) + } + + // Default limit should cap at 1000 + paths, err := store.ListFilePaths(context.Background(), ListFileOptions{}) + require.NoError(t, err) + require.Equal(t, 1000, len(paths)) + + // Explicit limit over 1000 should also cap at 1000 + paths, err = store.ListFilePaths(context.Background(), ListFileOptions{Limit: 5000}) + require.NoError(t, err) + require.Equal(t, 1000, len(paths)) +} + +func TestFilesystemListFilePaths_StartAfter_Basic(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + for i := 0; i < 10; i++ { + err = os.WriteFile(filepath.Join(dir, fmt.Sprintf("%04d", i)), []byte("x"), 0600) + require.NoError(t, err) + } + + paths, err := store.ListFilePaths(context.Background(), ListFileOptions{ + StartAfter: "0005", + }) + require.NoError(t, err) + require.Equal(t, []string{"0006", "0007", "0008", "0009"}, paths) +} + +func TestFilesystemListFilePaths_StartAfter_WithPrefix(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + require.NoError(t, os.MkdirAll(filepath.Join(dir, "a"), 0755)) + require.NoError(t, os.MkdirAll(filepath.Join(dir, "b"), 0755)) + require.NoError(t, os.WriteFile(filepath.Join(dir, "a", "0001"), []byte(""), 0600)) + require.NoError(t, os.WriteFile(filepath.Join(dir, "a", "0002"), []byte(""), 0600)) + require.NoError(t, os.WriteFile(filepath.Join(dir, "b", "0002"), []byte(""), 0600)) + + paths, err := store.ListFilePaths(context.Background(), ListFileOptions{ + Prefix: "a/", + StartAfter: "a/0001", + }) + require.NoError(t, err) + require.Equal(t, []string{"a/0002"}, paths) +} + +func TestFilesystemListFilePaths_StartAfter_EqualsLastKey(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + for _, name := range []string{"0000", "0001", "0002"} { + err = os.WriteFile(filepath.Join(dir, name), []byte(""), 0600) + require.NoError(t, err) + } + + paths, err := store.ListFilePaths(context.Background(), ListFileOptions{ + StartAfter: "0002", + }) + require.NoError(t, err) + require.Empty(t, paths) +} + +func TestFilesystemListFilePaths_StartAfter_BeforeFirstKey(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + for _, name := range []string{"0001", "0002", "0003"} { + err = os.WriteFile(filepath.Join(dir, name), []byte(""), 0600) + require.NoError(t, err) + } + + paths, err := store.ListFilePaths(context.Background(), ListFileOptions{ + StartAfter: "0000", + }) + require.NoError(t, err) + require.Equal(t, []string{"0001", "0002", "0003"}, paths) +} + +func TestFilesystemListFilePaths_StartAfter_BetweenKeys(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + for _, name := range []string{"0002", "0004", "0006"} { + err = os.WriteFile(filepath.Join(dir, name), []byte(""), 0600) + require.NoError(t, err) + } + + paths, err := store.ListFilePaths(context.Background(), ListFileOptions{ + StartAfter: "0003", + }) + require.NoError(t, err) + require.Equal(t, []string{"0004", "0006"}, paths) +} + +func TestFilesystemListFilePaths_StartAfter_WithLimit(t *testing.T) { + dir := t.TempDir() + store, err := NewFilesystemDataStoreWithPath(dir) + require.NoError(t, err) + t.Cleanup(func() { _ = store.Close() }) + + for i := 0; i < 10; i++ { + err = os.WriteFile(filepath.Join(dir, fmt.Sprintf("%04d", i)), []byte("x"), 0600) + require.NoError(t, err) + } + + paths, err := store.ListFilePaths(context.Background(), ListFileOptions{ + StartAfter: "0004", + Limit: 3, + }) + require.NoError(t, err) + require.Equal(t, []string{"0005", "0006", "0007"}, paths) +} + +func TestNewFilesystemDataStore(t *testing.T) { + dir := t.TempDir() + + config := DataStoreConfig{ + Type: "Filesystem", + Params: map[string]string{ + "destination_path": dir, + }, + } + + store, err := NewDataStore(context.Background(), config) + require.NoError(t, err) + require.NotNil(t, store) + require.NoError(t, store.Close()) +} + +func TestNewFilesystemDataStore_MissingDestinationPath(t *testing.T) { + config := DataStoreConfig{ + Type: "Filesystem", + Params: map[string]string{}, + } + + _, err := NewDataStore(context.Background(), config) + require.Error(t, err) + require.Contains(t, err.Error(), "no destination_path") +}