Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ help: ##@help show this help
NAME="github.com/goto/compass"
VERSION=$(shell git describe --always --tags 2>/dev/null)
COVERFILE="/tmp/compass.coverprofile"
PROTON_COMMIT := "3663165f19c7e3f685c48208b992dbe18789b513"
PROTON_COMMIT := "db4ee43fd6f9a8d5999e157b9d938ad9593eee15"

TOOLS_MOD_DIR = ./tools
TOOLS_DIR = $(abspath ./.tools)
Expand Down
4 changes: 2 additions & 2 deletions core/asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type Repository interface {
GetByVersionWithID(ctx context.Context, id, version string) (Asset, error)
GetByVersionWithURN(ctx context.Context, urn, version string) (Asset, error)
GetTypes(ctx context.Context, flt Filter) (map[Type]int, error)
Upsert(ctx context.Context, ast *Asset) (*Asset, error)
UpsertPatch(ctx context.Context, ast *Asset, patchData map[string]interface{}) (*Asset, error)
Upsert(ctx context.Context, ast *Asset, isUpdateOnly bool) (*Asset, error)
UpsertPatch(ctx context.Context, ast *Asset, patchData map[string]interface{}, isUpdateOnly bool) (*Asset, error)
DeleteByID(ctx context.Context, id string) (string, error)
DeleteByURN(ctx context.Context, urn string) error
SoftDeleteByID(ctx context.Context, executedAt time.Time, id, updatedByID string) (string, string, error)
Expand Down
58 changes: 30 additions & 28 deletions core/asset/mocks/asset_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 16 additions & 10 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (s *Service) GetAllAssets(ctx context.Context, flt Filter, withTotal bool)
return assets, totalCount, nil
}

func (s *Service) UpsertAsset(ctx context.Context, ast *Asset, upstreams, downstreams []string) (string, error) {
assetID, err := s.UpsertAssetWithoutLineage(ctx, ast)
func (s *Service) UpsertAsset(ctx context.Context, ast *Asset, upstreams, downstreams []string, isUpdateOnly bool) (string, error) {
assetID, err := s.UpsertAssetWithoutLineage(ctx, ast, isUpdateOnly)
if err != nil {
return "", err
}
Expand All @@ -107,14 +107,14 @@ func (s *Service) UpsertAsset(ctx context.Context, ast *Asset, upstreams, downst
return assetID, nil
}

func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset) (string, error) {
func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset, isUpdateOnly bool) (string, error) {
currentTime := time.Now()
ast.RefreshedAt = &currentTime

asset, err := s.assetRepository.Upsert(ctx, ast)
asset, err := s.assetRepository.Upsert(ctx, ast, isUpdateOnly)
// retry due to race condition possibility on insert
if errors.Is(err, ErrURNExist) {
asset, err = s.assetRepository.Upsert(ctx, ast)
asset, err = s.assetRepository.Upsert(ctx, ast, isUpdateOnly)
}
if err != nil {
return "", err
Expand All @@ -127,8 +127,14 @@ func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset) (st
return asset.ID, nil
}

func (s *Service) UpsertPatchAsset(ctx context.Context, ast *Asset, upstreams, downstreams []string, patchData map[string]interface{}) (string, error) {
assetID, err := s.UpsertPatchAssetWithoutLineage(ctx, ast, patchData)
func (s *Service) UpsertPatchAsset( //nolint:revive
ctx context.Context,
ast *Asset, upstreams,
downstreams []string,
patchData map[string]interface{},
isUpdateOnly bool,
) (string, error) {
assetID, err := s.UpsertPatchAssetWithoutLineage(ctx, ast, patchData, isUpdateOnly)
if err != nil {
return "", err
}
Expand All @@ -140,14 +146,14 @@ func (s *Service) UpsertPatchAsset(ctx context.Context, ast *Asset, upstreams, d
return assetID, nil
}

func (s *Service) UpsertPatchAssetWithoutLineage(ctx context.Context, ast *Asset, patchData map[string]interface{}) (string, error) {
func (s *Service) UpsertPatchAssetWithoutLineage(ctx context.Context, ast *Asset, patchData map[string]interface{}, isUpdateOnly bool) (string, error) {
currentTime := time.Now()
ast.RefreshedAt = &currentTime

asset, err := s.assetRepository.UpsertPatch(ctx, ast, patchData)
asset, err := s.assetRepository.UpsertPatch(ctx, ast, patchData, isUpdateOnly)
// retry due to race condition possibility on insert
if errors.Is(err, ErrURNExist) {
asset, err = s.assetRepository.UpsertPatch(ctx, ast, patchData)
asset, err = s.assetRepository.UpsertPatch(ctx, ast, patchData, isUpdateOnly)
}
if err != nil {
return "", err
Expand Down
Loading
Loading