Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Esti Improvements #8731

Merged
merged 2 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions esti/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,13 +483,13 @@ func TestUpdatePolicy(t *testing.T) {

// Tests merge with different clients
func mergeAuthTest(t *testing.T, cli *apigen.ClientWithResponses, ctx context.Context, repo string, branch string) (*apigen.MergeIntoBranchResponse, error) {
UploadFileRandomData(ctx, t, repo, mainBranch, "README")
UploadFileRandomData(ctx, t, repo, mainBranch, "README", nil)

resMainCommit, err := cli.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{Message: "Initial content"})
require.NoError(t, err, "failed to commit initial content in merge auth test")
require.Equal(t, http.StatusCreated, resMainCommit.StatusCode())

UploadFileRandomData(ctx, t, repo, branch, "foo.txt")
UploadFileRandomData(ctx, t, repo, branch, "foo.txt", nil)

resBranchCommit, err := cli.CommitWithResponse(ctx, repo, branch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{Message: "Additional content"})
require.NoError(t, err, "failed to commit additional content in merge auth test")
Expand Down
6 changes: 3 additions & 3 deletions esti/catalog_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func uploadAndCommitObjects(t *testing.T, ctx context.Context, repo, branch stri
t.Helper()
for _, objects := range objectsGroups {
for path, obj := range objects {
resp, err := uploadContent(ctx, repo, branch, path, obj)
resp, err := uploadContent(ctx, repo, branch, path, obj, nil)
require.NoError(t, err)
require.Equal(t, http.StatusCreated, resp.StatusCode())
}
Expand Down Expand Up @@ -503,7 +503,7 @@ func TestDeltaCatalogExport(t *testing.T) {
if err != nil {
return err
}
uploadResp, err := uploadContent(ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf))
uploadResp, err := uploadContent(ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -645,7 +645,7 @@ func TestDeltaCatalogExportAbfss(t *testing.T) {
if err != nil {
return err
}
uploadResp, err := uploadContent(ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf))
uploadResp, err := uploadContent(ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf), nil)
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions esti/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestCommitSingle(t *testing.T) {
defer tearDownTest(repo)

const objPath = "1.txt"
_, objContent := UploadFileRandomData(ctx, t, repo, mainBranch, objPath)
_, objContent := UploadFileRandomData(ctx, t, repo, mainBranch, objPath, nil)
commitResp, err := client.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{
Message: "singleCommit",
})
Expand Down Expand Up @@ -56,7 +56,7 @@ type Upload struct {
// upload uploads random file data for uploads.
func upload(ctx context.Context, uploads chan Upload) error {
for u := range uploads {
_, _, err := uploadFileRandomDataAndReport(ctx, u.Repo, u.Branch, u.Path, false)
_, _, err := uploadFileRandomDataAndReport(ctx, u.Repo, u.Branch, u.Path, false, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -137,8 +137,8 @@ func TestCommitWithTombstone(t *testing.T) {
defer tearDownTest(repo)
origObjPathLow := "objb.txt"
origObjPathHigh := "objc.txt"
UploadFileRandomData(ctx, t, repo, mainBranch, origObjPathLow)
UploadFileRandomData(ctx, t, repo, mainBranch, origObjPathHigh)
UploadFileRandomData(ctx, t, repo, mainBranch, origObjPathLow, nil)
UploadFileRandomData(ctx, t, repo, mainBranch, origObjPathHigh, nil)
commitResp, err := client.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{
Message: "First commit",
})
Expand All @@ -148,8 +148,8 @@ func TestCommitWithTombstone(t *testing.T) {

tombstoneObjPath := "obja.txt"
newObjPath := "objd.txt"
UploadFileRandomData(ctx, t, repo, mainBranch, tombstoneObjPath)
UploadFileRandomData(ctx, t, repo, mainBranch, newObjPath)
UploadFileRandomData(ctx, t, repo, mainBranch, tombstoneObjPath, nil)
UploadFileRandomData(ctx, t, repo, mainBranch, newObjPath, nil)

// Turning tombstoneObjPath to tombstone
resp, err := client.DeleteObjectWithResponse(ctx, repo, mainBranch, &apigen.DeleteObjectParams{Path: tombstoneObjPath})
Expand Down
4 changes: 2 additions & 2 deletions esti/delete_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestDeleteObjects(t *testing.T) {
identifiers = append(identifiers, types.ObjectIdentifier{
Key: aws.String(mainBranch + "/" + file),
})
_, _ = UploadFileRandomData(ctx, t, repo, mainBranch, file)
_, _ = UploadFileRandomData(ctx, t, repo, mainBranch, file, nil)
}

listOut, err := svc.ListObjects(ctx, &s3.ListObjectsInput{
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestDeleteObjects_Viewer(t *testing.T) {

// setup data
const filename = "delete-me"
_, _ = UploadFileRandomData(ctx, t, repo, mainBranch, filename)
_, _ = UploadFileRandomData(ctx, t, repo, mainBranch, filename, nil)

// setup user with only view rights - create user, add to group, generate credentials
uid := "del-viewer"
Expand Down
6 changes: 3 additions & 3 deletions esti/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestDeleteStaging(t *testing.T) {
defer tearDownTest(repo)
objPath := "1.txt"

_, _ = UploadFileRandomData(ctx, t, repo, mainBranch, objPath)
_, _ = UploadFileRandomData(ctx, t, repo, mainBranch, objPath, nil)

f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
Expand All @@ -55,7 +55,7 @@ func TestDeleteCommitted(t *testing.T) {
defer tearDownTest(repo)
objPath := "1.txt"

_, _ = UploadFileRandomData(ctx, t, repo, mainBranch, objPath)
_, _ = UploadFileRandomData(ctx, t, repo, mainBranch, objPath, nil)

f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestCommitDeleteCommitted(t *testing.T) {
defer tearDownTest(repo)
objPath := "1.txt"

_, _ = UploadFileRandomData(ctx, t, repo, mainBranch, objPath)
_, _ = UploadFileRandomData(ctx, t, repo, mainBranch, objPath, nil)

f, err := objectFound(ctx, repo, mainBranch, objPath)
assert.NoError(t, err)
Expand Down
43 changes: 33 additions & 10 deletions esti/esti_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/aws/aws-sdk-go-v2/service/s3"
pebblesst "github.com/cockroachdb/pebble/sstable"
"github.com/go-openapi/swag"
"github.com/hashicorp/go-multierror"
"github.com/rs/xid"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/treeverse/lakefs/pkg/api/apiutil"
"github.com/treeverse/lakefs/pkg/api/helpers"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/graveler/sstable"
"github.com/treeverse/lakefs/pkg/logging"
)

Expand Down Expand Up @@ -283,7 +285,7 @@ func setupTest(t testing.TB) (context.Context, logging.Logger, string) {

func tearDownTest(repoName string) {
ctx := context.Background()
deleteRepositoryIfAskedTo(ctx, repoName)
DeleteRepositoryIfAskedTo(ctx, repoName)
}

func createRepositoryForTest(ctx context.Context, t testing.TB) string {
Expand Down Expand Up @@ -339,7 +341,7 @@ func createRepository(ctx context.Context, t testing.TB, name string, repoStorag
"create repository '%s', storage '%s'", name, repoStorage)
}

func deleteRepositoryIfAskedTo(ctx context.Context, repositoryName string) {
func DeleteRepositoryIfAskedTo(ctx context.Context, repositoryName string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏

deleteRepositories := viper.GetBool("delete_repositories")
if deleteRepositories {
resp, err := client.DeleteRepositoryWithResponse(ctx, repositoryName, &apigen.DeleteRepositoryParams{Force: swag.Bool(true)})
Expand Down Expand Up @@ -369,16 +371,16 @@ const (
largeDataContentLength = 6 << 20
)

func uploadFileRandomDataAndReport(ctx context.Context, repo, branch, objPath string, direct bool) (checksum, content string, err error) {
func uploadFileRandomDataAndReport(ctx context.Context, repo, branch, objPath string, direct bool, clt apigen.ClientWithResponsesInterface) (checksum, content string, err error) {
objContent := randstr.String(randomDataContentLength)
checksum, err = uploadFileAndReport(ctx, repo, branch, objPath, objContent, direct)
checksum, err = uploadFileAndReport(ctx, repo, branch, objPath, objContent, direct, clt)
if err != nil {
return "", "", err
}
return checksum, objContent, nil
}

func uploadFileAndReport(ctx context.Context, repo, branch, objPath, objContent string, direct bool) (checksum string, err error) {
func uploadFileAndReport(ctx context.Context, repo, branch, objPath, objContent string, direct bool, clt apigen.ClientWithResponsesInterface) (checksum string, err error) {
// Upload using direct access
if direct {
stats, err := uploadContentDirect(ctx, client, repo, branch, objPath, nil, "", strings.NewReader(objContent))
Expand All @@ -388,7 +390,7 @@ func uploadFileAndReport(ctx context.Context, repo, branch, objPath, objContent
return stats.Checksum, nil
}
// Upload using API
resp, err := uploadContent(ctx, repo, branch, objPath, objContent)
resp, err := uploadContent(ctx, repo, branch, objPath, objContent, clt)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -457,7 +459,10 @@ func uploadContentDirect(ctx context.Context, client apigen.ClientWithResponsesI
}
}

func uploadContent(ctx context.Context, repo string, branch string, objPath string, objContent string) (*apigen.UploadObjectResponse, error) {
func uploadContent(ctx context.Context, repo, branch, objPath, objContent string, clt apigen.ClientWithResponsesInterface) (*apigen.UploadObjectResponse, error) {
if clt == nil {
clt = client
}
var b bytes.Buffer
w := multipart.NewWriter(&b)
contentWriter, err := w.CreateFormFile("content", filepath.Base(objPath))
Expand All @@ -472,13 +477,13 @@ func uploadContent(ctx context.Context, repo string, branch string, objPath stri
if err != nil {
return nil, fmt.Errorf("close form file: %w", err)
}
return client.UploadObjectWithBodyWithResponse(ctx, repo, branch, &apigen.UploadObjectParams{
return clt.UploadObjectWithBodyWithResponse(ctx, repo, branch, &apigen.UploadObjectParams{
Path: objPath,
}, w.FormDataContentType(), &b)
}

func UploadFileRandomData(ctx context.Context, t *testing.T, repo, branch, objPath string) (checksum, content string) {
checksum, content, err := uploadFileRandomDataAndReport(ctx, repo, branch, objPath, false)
func UploadFileRandomData(ctx context.Context, t *testing.T, repo, branch, objPath string, clt apigen.ClientWithResponsesInterface) (checksum, content string) {
checksum, content, err := uploadFileRandomDataAndReport(ctx, repo, branch, objPath, false, clt)
require.NoError(t, err, "failed to upload file", repo, branch, objPath)
return checksum, content
}
Expand Down Expand Up @@ -576,3 +581,21 @@ func getServerConfig(t testing.TB, ctx context.Context) *apigen.SetupState {
require.NotNil(t, resp.JSON200)
return resp.JSON200
}

func GravelerIterator(data []byte) (*sstable.Iterator, error) {
// read file descriptor
reader, err := pebblesst.NewMemReader(data, pebblesst.ReaderOptions{})
if err != nil {
return nil, err
}

// create an iterator over the whole thing
iter, err := reader.NewIter(nil, nil)
if err != nil {
return nil, err
}

// wrap it in a Graveler iterator
dummyDeref := func() error { return nil }
return sstable.NewIterator(iter, dummyDeref), nil
}
2 changes: 1 addition & 1 deletion esti/hooks_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func createAction(t *testing.T, ctx context.Context, repo, branch, path string,
err := tmp.Execute(&doc, docData)
require.NoError(t, err)
content := doc.String()
uploadResp, err := uploadContent(ctx, repo, branch, "_lakefs_actions/"+uuid.NewString(), content)
uploadResp, err := uploadContent(ctx, repo, branch, "_lakefs_actions/"+uuid.NewString(), content, nil)
require.NoError(t, err)
require.Equal(t, http.StatusCreated, uploadResp.StatusCode())
logger.WithField("branch", branch).Info("Commit initial content")
Expand Down
4 changes: 2 additions & 2 deletions esti/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func testCommitMerge(t *testing.T, ctx context.Context, repo string) {
ref := string(createBranchResp.Body)
t.Log("Branch created", ref)

resp, err := uploadContent(ctx, repo, branch, "somefile", "")
resp, err := uploadContent(ctx, repo, branch, "somefile", "", nil)
require.NoError(t, err)
require.Equal(t, http.StatusCreated, resp.StatusCode())

Expand Down Expand Up @@ -457,7 +457,7 @@ func parseAndUploadActions(t *testing.T, ctx context.Context, repo, branch strin
require.NoError(t, err)

action := doc.String()
resp, err := uploadContent(ctx, repo, branch, "_lakefs_actions/"+ent, action)
resp, err := uploadContent(ctx, repo, branch, "_lakefs_actions/"+ent, action, nil)
require.NoError(t, err)
require.Equal(t, http.StatusCreated, resp.StatusCode())
}
Expand Down
4 changes: 2 additions & 2 deletions esti/identity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestIdentity(t *testing.T) {
})
require.NoError(t, err, "failed creating branch1")

checksum, objContent, err := uploadFileRandomDataAndReport(ctx, repo, branch1, objPath, false)
checksum, objContent, err := uploadFileRandomDataAndReport(ctx, repo, branch1, objPath, false, nil)
require.NoError(t, err, "failed uploading file")
commitResp, err := client.CommitWithResponse(ctx, repo, branch1, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{
Message: "commit on branch1",
Expand All @@ -36,7 +36,7 @@ func TestIdentity(t *testing.T) {
})
require.NoError(t, err, "failed creating branch2")

checksumNew, err := uploadFileAndReport(ctx, repo, branch2, objPath, objContent, false)
checksumNew, err := uploadFileAndReport(ctx, repo, branch2, objPath, objContent, false, nil)
require.NoError(t, err)
require.Equal(t, checksum, checksumNew, "Same file uploaded to committed branch, expected no checksum difference")

Expand Down
2 changes: 1 addition & 1 deletion esti/lakectl_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestLakectlLocal_clone(t *testing.T) {
require.NoError(t, err)
vars["PREFIX"] = "dir_marker/"
vars["LOCAL_DIR"] = dataDir
_, err = uploadContent(context.Background(), vars["REPO"], vars["BRANCH"], vars["PREFIX"], "")
_, err = uploadContent(context.Background(), vars["REPO"], vars["BRANCH"], vars["PREFIX"], "", nil)
require.NoError(t, err)
runCmd(t, Lakectl()+" commit lakefs://"+vars["REPO"]+"/"+vars["BRANCH"]+" --allow-empty-message -m \" \"", false, false, vars)
RunCmdAndVerifyContainsText(t, Lakectl()+" local clone lakefs://"+repoName+"/"+mainBranch+"/"+vars["PREFIX"]+" "+dataDir, false, "Successfully cloned lakefs://${REPO}/${REF}/${PREFIX} to ${LOCAL_DIR}.", vars)
Expand Down
4 changes: 2 additions & 2 deletions esti/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestMergeAndList(t *testing.T) {
const branch = "feature-1"

logger.WithField("branch", mainBranch).Info("Upload initial content")
checksum, content := UploadFileRandomData(ctx, t, repo, mainBranch, "README")
checksum, content := UploadFileRandomData(ctx, t, repo, mainBranch, "README", nil)
checksums := map[string]string{
checksum: content,
}
Expand Down Expand Up @@ -56,7 +56,7 @@ func doMergeAndListIteration(t *testing.T, logger logging.Logger, ctx context.Co
for i := 0; i < addedFiles; i++ {
p := fmt.Sprintf("%d.txt", i)
logger.WithFields(logging.Fields{"iteration": iteration, "path": p}).Info("Upload content to branch")
checksum, content := UploadFileRandomData(ctx, t, repo, branch, p)
checksum, content := UploadFileRandomData(ctx, t, repo, branch, p, nil)
checksums[checksum] = content
}
const totalFiles = addedFiles + 1
Expand Down
26 changes: 3 additions & 23 deletions esti/metadata_download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,16 @@ import (
"context"
"testing"

pebblesst "github.com/cockroachdb/pebble/sstable"
"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/graveler/committed"
"github.com/treeverse/lakefs/pkg/graveler/sstable"
)

func gravelerIterator(data []byte) (*sstable.Iterator, error) {
// read file descriptor
reader, err := pebblesst.NewMemReader(data, pebblesst.ReaderOptions{})
if err != nil {
return nil, err
}

// create an iterator over the whole thing
iter, err := reader.NewIter(nil, nil)
if err != nil {
return nil, err
}

// wrap it in a Graveler iterator
dummyDeref := func() error { return nil }
return sstable.NewIterator(iter, dummyDeref), nil
}

func TestDownloadMetadataObject(t *testing.T) {
ctx := context.Background()

repo := createRepositoryUnique(ctx, t)
UploadFileRandomData(ctx, t, repo, mainBranch, "some/random/path/43543985430548930")
UploadFileRandomData(ctx, t, repo, mainBranch, "some/random/path/43543985430548930", nil)
commitResp, err := client.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{
Message: "committing just to get a meta range!",
})
Expand All @@ -47,7 +27,7 @@ func TestDownloadMetadataObject(t *testing.T) {
t.Errorf("got unexpected error downloading metarange")
}
// try reading the meta-range
iter, err := gravelerIterator(response.Body)
iter, err := GravelerIterator(response.Body)
if err != nil {
t.Error("could not get an iterator from meta-range body")
}
Expand All @@ -66,7 +46,7 @@ func TestDownloadMetadataObject(t *testing.T) {
require.NoError(t, err, "failed to get range with presign=true")

// try reading the range
iter, err = gravelerIterator(response.Body)
iter, err = GravelerIterator(response.Body)
if err != nil {
t.Error("could not get an iterator from range body")
}
Expand Down
Loading
Loading