-
Notifications
You must be signed in to change notification settings - Fork 543
Add Filesystem datastore support #5892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
d1379c7
add filesystem datastore implementation
leighmcculloch 3b33571
remove metadata support from filesystem datastore
leighmcculloch 0218f0a
expand filesystem datastore documentation
leighmcculloch a2c9796
Fix variable shadowing warnings in filesystem datastore tests
leighmcculloch 8461178
Document concurrent write limitations in filesystem datastore
leighmcculloch fff78f0
Add context cancellation support in ListFilePaths
leighmcculloch 2fc44f3
extract filesystem permission constants
leighmcculloch ff43078
remove directory creation from NewFilesystemDataStoreWithPath
leighmcculloch 328fc59
write file content directly without buffering
leighmcculloch 6dbbccc
replace "uploaded" with "written" in log messages
leighmcculloch f8fc5ad
Merge branch 'main' into datastore-filesystem
leighmcculloch 3035b76
word
leighmcculloch fb56196
Merge branch 'main' into datastore-filesystem
leighmcculloch File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,307 @@ | ||
| package datastore | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "os" | ||
| "path/filepath" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/stellar/go-stellar-sdk/support/log" | ||
| ) | ||
|
|
||
| var _ DataStore = &FilesystemDataStore{} | ||
|
|
||
| const metadataSuffix = ".metadata.json" | ||
|
|
||
| // FilesystemDataStore implements DataStore for local filesystem storage. | ||
| type FilesystemDataStore struct { | ||
| basePath string | ||
| writeMetadata bool | ||
| } | ||
|
|
||
| // NewFilesystemDataStore creates a new FilesystemDataStore from configuration. | ||
| func NewFilesystemDataStore(ctx context.Context, datastoreConfig DataStoreConfig) (DataStore, error) { | ||
| destinationPath, ok := datastoreConfig.Params["destination_path"] | ||
| if !ok { | ||
| return nil, errors.New("invalid Filesystem config, no destination_path") | ||
| } | ||
|
|
||
| // write_metadata defaults to true | ||
| writeMetadata := true | ||
| if val, ok := datastoreConfig.Params["write_metadata"]; ok { | ||
| writeMetadata = val != "false" | ||
| } | ||
|
|
||
| return NewFilesystemDataStoreWithPath(destinationPath, writeMetadata) | ||
| } | ||
|
|
||
| // NewFilesystemDataStoreWithPath creates a FilesystemDataStore with the given base path. | ||
| func NewFilesystemDataStoreWithPath(basePath string, writeMetadata bool) (DataStore, error) { | ||
| // Ensure the base path exists | ||
| if err := os.MkdirAll(basePath, 0755); err != nil { | ||
| return nil, fmt.Errorf("failed to create base directory %s: %w", basePath, err) | ||
| } | ||
|
|
||
| absPath, err := filepath.Abs(basePath) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to resolve absolute path: %w", err) | ||
| } | ||
|
|
||
| log.Debugf("Creating Filesystem datastore at: %s, writeMetadata: %v", absPath, writeMetadata) | ||
|
|
||
| return &FilesystemDataStore{ | ||
| basePath: absPath, | ||
| writeMetadata: writeMetadata, | ||
| }, nil | ||
| } | ||
|
|
||
| // fullPath returns the full filesystem path for a given relative path. | ||
| func (f *FilesystemDataStore) fullPath(path string) string { | ||
| return filepath.Join(f.basePath, path) | ||
leighmcculloch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // metadataPath returns the path to the metadata sidecar file. | ||
| func (f *FilesystemDataStore) metadataPath(path string) string { | ||
| return f.fullPath(path) + metadataSuffix | ||
| } | ||
|
|
||
| // Exists checks if a file exists in the filesystem. | ||
| func (f *FilesystemDataStore) Exists(ctx context.Context, path string) (bool, error) { | ||
| _, err := os.Stat(f.fullPath(path)) | ||
| if err == nil { | ||
| return true, nil | ||
| } | ||
| if os.IsNotExist(err) { | ||
| return false, nil | ||
| } | ||
| return false, err | ||
| } | ||
|
|
||
| // Size returns the size of a file in bytes. | ||
| func (f *FilesystemDataStore) Size(ctx context.Context, path string) (int64, error) { | ||
| info, err := os.Stat(f.fullPath(path)) | ||
| if err != nil { | ||
| if os.IsNotExist(err) { | ||
| return 0, os.ErrNotExist | ||
| } | ||
| return 0, err | ||
| } | ||
| return info.Size(), nil | ||
| } | ||
|
|
||
| // GetFileLastModified returns the last modification time of a file. | ||
| func (f *FilesystemDataStore) GetFileLastModified(ctx context.Context, path string) (time.Time, error) { | ||
| info, err := os.Stat(f.fullPath(path)) | ||
| if err != nil { | ||
| if os.IsNotExist(err) { | ||
| return time.Time{}, os.ErrNotExist | ||
| } | ||
| return time.Time{}, err | ||
| } | ||
| return info.ModTime(), nil | ||
| } | ||
|
|
||
| // GetFile returns a reader for the file at the given path. | ||
| func (f *FilesystemDataStore) GetFile(ctx context.Context, path string) (io.ReadCloser, error) { | ||
| file, err := os.Open(f.fullPath(path)) | ||
| if err != nil { | ||
| if os.IsNotExist(err) { | ||
| return nil, os.ErrNotExist | ||
| } | ||
| return nil, fmt.Errorf("error opening file %s: %w", path, err) | ||
| } | ||
| log.Debugf("File retrieved successfully: %s", path) | ||
| return file, nil | ||
| } | ||
|
|
||
| // GetFileMetadata reads metadata from the sidecar JSON file. | ||
| func (f *FilesystemDataStore) GetFileMetadata(ctx context.Context, path string) (map[string]string, error) { | ||
| metaPath := f.metadataPath(path) | ||
| data, err := os.ReadFile(metaPath) | ||
| if err != nil { | ||
| if os.IsNotExist(err) { | ||
| // Check if the main file exists | ||
| if _, mainErr := os.Stat(f.fullPath(path)); os.IsNotExist(mainErr) { | ||
| return nil, os.ErrNotExist | ||
| } | ||
| // Main file exists but no metadata - return empty map | ||
| return map[string]string{}, nil | ||
| } | ||
| return nil, fmt.Errorf("error reading metadata file %s: %w", metaPath, err) | ||
| } | ||
|
|
||
| var metadata map[string]string | ||
| if err := json.Unmarshal(data, &metadata); err != nil { | ||
| return nil, fmt.Errorf("error parsing metadata file %s: %w", metaPath, err) | ||
| } | ||
| return metadata, nil | ||
| } | ||
|
|
||
| // PutFile writes a file to the filesystem with optional metadata sidecar. | ||
| func (f *FilesystemDataStore) PutFile(ctx context.Context, path string, in io.WriterTo, metaData map[string]string) error { | ||
| fullPath := f.fullPath(path) | ||
|
|
||
| // Create parent directories | ||
| dir := filepath.Dir(fullPath) | ||
| if err := os.MkdirAll(dir, 0755); err != nil { | ||
| return fmt.Errorf("failed to create directory %s: %w", dir, err) | ||
| } | ||
|
|
||
| // Write metadata sidecar first if enabled and metadata is provided. | ||
| // This ensures that if the data file exists, metadata is assumed to exist too. | ||
leighmcculloch marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if f.writeMetadata && len(metaData) > 0 { | ||
| if err := f.writeMetadataFile(path, metaData); err != nil { | ||
| return err | ||
| } | ||
| } | ||
leighmcculloch marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // Write the data file | ||
| file, err := os.Create(fullPath) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create file %s: %w", path, err) | ||
| } | ||
|
|
||
| if _, err := in.WriteTo(file); err != nil { | ||
| file.Close() | ||
| return fmt.Errorf("failed to write file %s: %w", path, err) | ||
| } | ||
|
|
||
| if err := file.Close(); err != nil { | ||
| return fmt.Errorf("failed to close file %s: %w", path, err) | ||
| } | ||
leighmcculloch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| log.Debugf("File uploaded successfully: %s", path) | ||
| return nil | ||
| } | ||
leighmcculloch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // PutFileIfNotExists writes a file only if it doesn't already exist. | ||
| func (f *FilesystemDataStore) PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo, metaData map[string]string) (bool, error) { | ||
| fullPath := f.fullPath(path) | ||
|
|
||
| // Create parent directories | ||
| dir := filepath.Dir(fullPath) | ||
| if err := os.MkdirAll(dir, 0755); err != nil { | ||
| return false, fmt.Errorf("failed to create directory %s: %w", dir, err) | ||
| } | ||
|
|
||
| // Use O_CREATE|O_EXCL for atomic check-and-create | ||
| file, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0644) | ||
| if err != nil { | ||
| if os.IsExist(err) { | ||
| log.Debugf("File already exists: %s", path) | ||
| return false, nil | ||
| } | ||
| return false, fmt.Errorf("failed to create file %s: %w", path, err) | ||
| } | ||
|
|
||
| // Write content to the file | ||
| buf := &bytes.Buffer{} | ||
| if _, err := in.WriteTo(buf); err != nil { | ||
| file.Close() | ||
| os.Remove(fullPath) // Clean up on error | ||
| return false, fmt.Errorf("failed to write file %s: %w", path, err) | ||
| } | ||
|
|
||
| if _, err := file.Write(buf.Bytes()); err != nil { | ||
| file.Close() | ||
leighmcculloch marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
leighmcculloch marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| os.Remove(fullPath) // Clean up on error | ||
| return false, fmt.Errorf("failed to write file %s: %w", path, err) | ||
| } | ||
|
|
||
| if err := file.Close(); err != nil { | ||
| return false, fmt.Errorf("failed to close file %s: %w", path, err) | ||
| } | ||
|
|
||
| // Write metadata sidecar if enabled and metadata is provided | ||
| if f.writeMetadata && len(metaData) > 0 { | ||
| if err := f.writeMetadataFile(path, metaData); err != nil { | ||
| return true, err | ||
leighmcculloch marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| log.Debugf("File uploaded successfully: %s", path) | ||
| return true, nil | ||
| } | ||
|
|
||
| // writeMetadataFile writes metadata to a sidecar JSON file. | ||
| func (f *FilesystemDataStore) writeMetadataFile(path string, metaData map[string]string) error { | ||
| metaPath := f.metadataPath(path) | ||
|
|
||
| data, err := json.Marshal(metaData) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to marshal metadata for %s: %w", path, err) | ||
| } | ||
|
|
||
| if err := os.WriteFile(metaPath, data, 0644); err != nil { | ||
| return fmt.Errorf("failed to write metadata file %s: %w", metaPath, err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // ListFilePaths lists file paths matching the given options. | ||
| // Results are returned in lexicographical order (matching GCS/S3 behavior). | ||
| func (f *FilesystemDataStore) ListFilePaths(ctx context.Context, options ListFileOptions) ([]string, error) { | ||
| limit := options.Limit | ||
| if limit <= 0 || limit > listFilePathsMaxLimit { | ||
| limit = listFilePathsMaxLimit | ||
| } | ||
|
|
||
| var files []string | ||
| err := filepath.WalkDir(f.basePath, func(path string, d os.DirEntry, err error) error { | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Skip directories | ||
| if d.IsDir() { | ||
| return nil | ||
| } | ||
|
|
||
| // Skip metadata sidecar files | ||
| if strings.HasSuffix(d.Name(), metadataSuffix) { | ||
| return nil | ||
| } | ||
|
|
||
| // Get path relative to basePath and normalize to forward slashes | ||
| relPath, err := filepath.Rel(f.basePath, path) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| relPath = filepath.ToSlash(relPath) | ||
|
|
||
| // Apply prefix filter | ||
| if options.Prefix != "" && !strings.HasPrefix(relPath, options.Prefix) { | ||
| return nil | ||
| } | ||
|
|
||
| // Apply StartAfter filter (WalkDir walks in lexical order) | ||
| if options.StartAfter != "" && relPath <= options.StartAfter { | ||
| return nil | ||
| } | ||
|
|
||
| files = append(files, relPath) | ||
|
|
||
| // Stop early if we've reached the limit | ||
| if uint32(len(files)) >= limit { | ||
| return filepath.SkipAll | ||
| } | ||
|
|
||
| return nil | ||
| }) | ||
| if err != nil && err != filepath.SkipAll { | ||
| return nil, err | ||
| } | ||
|
|
||
| return files, nil | ||
| } | ||
leighmcculloch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Close is a no-op for FilesystemDataStore as it doesn't maintain persistent connections. | ||
| func (f *FilesystemDataStore) Close() error { | ||
| return nil | ||
| } | ||
leighmcculloch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.