Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ help: ##@help show this help
NAME="github.com/goto/compass"
VERSION=$(shell git describe --always --tags 2>/dev/null)
COVERFILE="/tmp/compass.coverprofile"
PROTON_COMMIT := "3663165f19c7e3f685c48208b992dbe18789b513"
PROTON_COMMIT := "db4ee43fd6f9a8d5999e157b9d938ad9593eee15"

TOOLS_MOD_DIR = ./tools
TOOLS_DIR = $(abspath ./.tools)
Expand Down
4 changes: 2 additions & 2 deletions core/asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type Repository interface {
GetByVersionWithID(ctx context.Context, id, version string) (Asset, error)
GetByVersionWithURN(ctx context.Context, urn, version string) (Asset, error)
GetTypes(ctx context.Context, flt Filter) (map[Type]int, error)
Upsert(ctx context.Context, ast *Asset) (*Asset, error)
UpsertPatch(ctx context.Context, ast *Asset, patchData map[string]interface{}) (*Asset, error)
Upsert(ctx context.Context, ast *Asset, isUpdateOnly bool) (*Asset, error)
UpsertPatch(ctx context.Context, ast *Asset, patchData map[string]interface{}, isUpdateOnly bool) (*Asset, error)
DeleteByID(ctx context.Context, id string) (string, error)
DeleteByURN(ctx context.Context, urn string) error
SoftDeleteByID(ctx context.Context, executedAt time.Time, id, updatedByID string) (string, string, error)
Expand Down
58 changes: 30 additions & 28 deletions core/asset/mocks/asset_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 16 additions & 10 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (s *Service) GetAllAssets(ctx context.Context, flt Filter, withTotal bool)
return assets, totalCount, nil
}

func (s *Service) UpsertAsset(ctx context.Context, ast *Asset, upstreams, downstreams []string) (string, error) {
assetID, err := s.UpsertAssetWithoutLineage(ctx, ast)
func (s *Service) UpsertAsset(ctx context.Context, ast *Asset, upstreams, downstreams []string, isUpdateOnly bool) (string, error) {
assetID, err := s.UpsertAssetWithoutLineage(ctx, ast, isUpdateOnly)
if err != nil {
return "", err
}
Expand All @@ -107,14 +107,14 @@ func (s *Service) UpsertAsset(ctx context.Context, ast *Asset, upstreams, downst
return assetID, nil
}

func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset) (string, error) {
func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset, isUpdateOnly bool) (string, error) {
currentTime := time.Now()
ast.RefreshedAt = &currentTime

asset, err := s.assetRepository.Upsert(ctx, ast)
asset, err := s.assetRepository.Upsert(ctx, ast, isUpdateOnly)
// retry due to race condition possibility on insert
if errors.Is(err, ErrURNExist) {
asset, err = s.assetRepository.Upsert(ctx, ast)
asset, err = s.assetRepository.Upsert(ctx, ast, isUpdateOnly)
}
if err != nil {
return "", err
Expand All @@ -127,8 +127,14 @@ func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset) (st
return asset.ID, nil
}

func (s *Service) UpsertPatchAsset(ctx context.Context, ast *Asset, upstreams, downstreams []string, patchData map[string]interface{}) (string, error) {
assetID, err := s.UpsertPatchAssetWithoutLineage(ctx, ast, patchData)
func (s *Service) UpsertPatchAsset( //nolint:revive
ctx context.Context,
ast *Asset, upstreams,
downstreams []string,
patchData map[string]interface{},
isUpdateOnly bool,
) (string, error) {
assetID, err := s.UpsertPatchAssetWithoutLineage(ctx, ast, patchData, isUpdateOnly)
if err != nil {
return "", err
}
Expand All @@ -140,14 +146,14 @@ func (s *Service) UpsertPatchAsset(ctx context.Context, ast *Asset, upstreams, d
return assetID, nil
}

func (s *Service) UpsertPatchAssetWithoutLineage(ctx context.Context, ast *Asset, patchData map[string]interface{}) (string, error) {
func (s *Service) UpsertPatchAssetWithoutLineage(ctx context.Context, ast *Asset, patchData map[string]interface{}, isUpdateOnly bool) (string, error) {
currentTime := time.Now()
ast.RefreshedAt = &currentTime

asset, err := s.assetRepository.UpsertPatch(ctx, ast, patchData)
asset, err := s.assetRepository.UpsertPatch(ctx, ast, patchData, isUpdateOnly)
// retry due to race condition possibility on insert
if errors.Is(err, ErrURNExist) {
asset, err = s.assetRepository.UpsertPatch(ctx, ast, patchData)
asset, err = s.assetRepository.UpsertPatch(ctx, ast, patchData, isUpdateOnly)
}
if err != nil {
return "", err
Expand Down
Loading
Loading