Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3337,6 +3337,10 @@ func uploadObject(transfer *transferFile) (transferResult TransferResults, err e
Scheme: "https",
Path: transfer.remoteURL.Path,
}
// Add the oss.asize query parameter for PUT requests
query := dest.Query()
query.Set("oss.asize", fmt.Sprintf("%d", sizer.Size()))
dest.RawQuery = query.Encode()
attempt.Endpoint = dest.Host
// Create the wrapped reader and send it to the request
closed := make(chan bool, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ CREATE TABLE globus_collections (
name TEXT NOT NULL DEFAULT '',
server_url TEXT NOT NULL DEFAULT '',
refresh_token TEXT NOT NULL DEFAULT '',
transfer_refresh_token TEXT NOT NULL DEFAULT '',
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
);
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
SELECT 'down SQL query';
-- +goose StatementEnd
13 changes: 12 additions & 1 deletion docs/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ description: |+
Authorization header.

If the origin backend is configured with the `globus` storage type, any value set here will be overridden with the filepath to
the first file ending in `.tok` found in the $(Origin.GlobusConfigLocation)/tokens directory
the file ending in `.tok` found in the $(Origin.GlobusConfigLocation)/tokens directory
type: filename
default: none
components: ["origin"]
Expand Down Expand Up @@ -1267,6 +1267,17 @@ root_default: /run/pelican/xrootd/origin/globus
default: $XDG_RUNTIME_DIR/pelican/xrootd/origin/globus
components: ["origin"]
---
name: Origin.GlobusTransferTokenFile
description: |+
When set, all requests from the Globus backend to the Globus Transfer API will include the contents
of the file as a bearer token in the authorization header.

Any value set here will be overridden with the filepath to the file ending in `.transfer.tok` found
in the $(Origin.GlobusConfigLocation)/tokens directory
type: filename
default: none
components: ["origin"]
---
name: Origin.FedTokenLocation
description: |+
A path to the file containing a token issued by the federation's issuer. This token may be consumed by other federation services
Expand Down
6 changes: 1 addition & 5 deletions launchers/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ func OriginServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group,
return nil, err
}

if err := origin.InitializeDB(); err != nil {
return nil, errors.Wrap(err, "failed to initialize origin sqlite database")
}

if err := database.InitServerDatabase(server_structs.OriginType); err != nil {
return nil, errors.Wrap(err, "failed to initialize server sqlite database")
}
Expand Down Expand Up @@ -180,7 +176,7 @@ func OriginServeFinish(ctx context.Context, egrp *errgroup.Group) error {

egrp.Go(func() error {
<-ctx.Done()
return origin.ShutdownOriginDB()
return database.ShutdownDB()
})

return nil
Expand Down
8 changes: 7 additions & 1 deletion origin/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pelicanplatform/pelican/database"
"github.com/pelicanplatform/pelican/features"
"github.com/pelicanplatform/pelican/metrics"
pelican_oauth2 "github.com/pelicanplatform/pelican/oauth2"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/server_utils"
Expand Down Expand Up @@ -218,7 +219,12 @@ func (server *OriginServer) CreateAdvertisement(name, originUrlStr, originWebUrl
if len(prefixes) == 0 {
if isGlobusBackend {
activateUrl := param.Server_ExternalWebUrl.GetString() + "/view/origin/globus"
return nil, fmt.Errorf("failed to create advertisement: no activated Globus collection. Go to %s to activate your collection.", activateUrl)
callbackUrl, err := pelican_oauth2.GetRedirectURL(globusCallbackPath)
errMsg := fmt.Sprintf("failed to create advertisement: no activated Globus collection. Go to %s to activate your collection", activateUrl)
if err == nil {
errMsg += fmt.Sprintf(". The Globus app expects the following redirect URL: %s ", callbackUrl)
}
return nil, errors.New(errMsg)
} else {
return nil, errors.New("failed to create advertisement: no valid export")
}
Expand Down
152 changes: 104 additions & 48 deletions origin/globus.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@ type globusExportStatus string

// For internal globusExports map
type globusExport struct {
DisplayName string `json:"displayName"`
FederationPrefix string `json:"federationPrefix"`
Status globusExportStatus `json:"status"`
Description string `json:"description,omitempty"` // status description
HttpsServer string `json:"httpsServer"` // server url to access files in the collection
Token *oauth2.Token `json:"-"`
DisplayName string `json:"displayName"`
FederationPrefix string `json:"federationPrefix"`
Status globusExportStatus `json:"status"`
Description string `json:"description,omitempty"` // status description
HttpsServer string `json:"httpsServer"` // server url to access files in the collection
Token *oauth2.Token `json:"-"`
TransferToken *oauth2.Token `json:"-"`
TokenFile string `json:"-"`
TransferTokenFile string `json:"-"`
}

// For UI
Expand All @@ -58,26 +61,39 @@ type globusExportUI struct {
UUID string `json:"uuid"`
}

// GlobusTokenType represents the type of Globus token
type GlobusTokenType string

const (
TokenTypeCollection GlobusTokenType = "collection"
TokenTypeTransfer GlobusTokenType = "transfer"
)

const (
GlobusInactive = "Inactive"
GlobusActivated = "Activated"
)

const GlobusTokenFileExt = ".tok" // File extension for caching Globus access token

var (
// An in-memory map-struct to keep Globus collections information with key being the collection UUID.
globusExports map[string]*globusExport
globusExportsMutex = sync.RWMutex{}
)

// InitGlobusBackend does the following things to initialize Globus-related logic
// 1. It initializes the global map structure globusExports to store collection information in-memory
// 2. It checks and setup location for Globus access tokens after user activates the collection
// 3. It loads the Globus OAuth client for OAuth-based authorization to access collection data
// 4. It populates the global map by the exported Origin prefixes/collections. It reads the persisted credentials
// from the origin's SQLite DB and populate the global map, refresh the access token by the persisted
// refresh token
// loadTokenFromDB loads and refreshes a token from the database for a specific token type
func loadTokenFromDB(cid string, refreshToken string, tokenType GlobusTokenType, globusAuthCfg *oauth2.Config) (*oauth2.Token, error) {
refToken := &oauth2.Token{
RefreshToken: refreshToken,
}
tokenSource := globusAuthCfg.TokenSource(context.Background(), refToken)
token, err := tokenSource.Token()
if err != nil {
return nil, fmt.Errorf("failed to refresh %s token for collection %s: %v", tokenType, cid, err)
}
return token, nil
}

// InitGlobusBackend initializes the Globus backend by loading existing collections from the database
func InitGlobusBackend(exps []server_utils.OriginExport) error {
uid, err := config.GetDaemonUID()
if err != nil {
Expand Down Expand Up @@ -112,27 +128,28 @@ func InitGlobusBackend(exps []server_utils.OriginExport) error {

globusAuthCfg, err := GetGlobusOAuthCfg()
if err != nil {
return errors.Wrap(err, "failed to get Globus OAuth2 config")
return errors.Wrap(err, "failed to get Globus OAuth config")
}

// Populate globusExports map
globusExportsMutex.Lock()
defer globusExportsMutex.Unlock()
for _, esp := range exps {
if esp.GlobusCollectionID == "" {
continue
}

globusEsp := globusExport{
DisplayName: esp.GlobusCollectionName,
FederationPrefix: esp.FederationPrefix,
Status: GlobusInactive,
Description: "Server start",
Description: "Not activated",
}
// We check the origin db and see if we already have the refresh token in-place
// If so, use the token to initialize the collection

// Check if the collection exists in the database
ok, err := collectionExistsByUUID(esp.GlobusCollectionID)
if err != nil {
return errors.Wrapf(err, "failed to check credential status for Globus collection %s with name %s", esp.GlobusCollectionID, esp.GlobusCollectionName)
return errors.Wrapf(err, "failed to check if Globus collection %s with name %s exists in DB", esp.GlobusCollectionID, esp.GlobusCollectionName)
}
if !ok {
log.Infof("Globus collection %s with name %s is not activated. You need to activate it in the admin website before using this collection", esp.GlobusCollectionID, esp.GlobusCollectionName)
// Collection doesn't exist in DB, mark as inactive
globusExports[esp.GlobusCollectionID] = &globusEsp
continue
}
Expand All @@ -142,22 +159,36 @@ func InitGlobusBackend(exps []server_utils.OriginExport) error {
return errors.Wrapf(err, "failed to get credentials for Globus collection %s with name %s", esp.GlobusCollectionID, esp.GlobusCollectionName)
}

refToken := &oauth2.Token{
RefreshToken: col.RefreshToken,
}
tokenSource := globusAuthCfg.TokenSource(context.Background(), refToken)
collectionToken, err := tokenSource.Token()
// If we can't get the access token, we want to evict the entry from db and
// ask the user to redo the authentication
// Load collection token
collectionToken, err := loadTokenFromDB(col.UUID, col.RefreshToken, TokenTypeCollection, globusAuthCfg)
if err != nil {
if err := deleteCollectionByUUID(col.UUID); err != nil {
return errors.Wrapf(err, "failed to delete expired credential record for Globus collection %s with name %s", esp.GlobusCollectionID, esp.GlobusCollectionName)
}
log.Infof("Access credentials for Globus collection %s with name %s is expired and removed.", esp.GlobusCollectionID, esp.GlobusCollectionName)
globusExports[esp.GlobusCollectionID] = &globusEsp
continue
}

// Save the new access token
if err := persistAccessToken(col.UUID, collectionToken); err != nil {
// Load transfer token
transferToken, err := loadTokenFromDB(col.UUID, col.TransferRefreshToken, TokenTypeTransfer, globusAuthCfg)
if err != nil {
if err := deleteCollectionByUUID(col.UUID); err != nil {
return errors.Wrapf(err, "failed to delete expired credential record for Globus collection %s with name %s", esp.GlobusCollectionID, esp.GlobusCollectionName)
}
log.Infof("Transfer access credentials for Globus collection %s with name %s is expired and removed.", esp.GlobusCollectionID, esp.GlobusCollectionName)
globusExports[esp.GlobusCollectionID] = &globusEsp
Comment on lines +175 to +180
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the error handling logic here meant to do? As is, I'm reading that if there's an error loading the token from the database, you try to delete the collection. Can you explain why?

continue
}

// Save the new access tokens
var tokenFileName string
var transferTokenFileName string
if tokenFileName, err = persistToken(col.UUID, collectionToken, TokenTypeCollection); err != nil {
return err
}

if transferTokenFileName, err = persistToken(col.UUID, transferToken, TokenTypeTransfer); err != nil {
return err
}

Expand All @@ -170,8 +201,11 @@ func InitGlobusBackend(exps []server_utils.OriginExport) error {

globusEsp.Status = GlobusActivated
globusEsp.Token = collectionToken
globusEsp.TransferToken = transferToken
globusEsp.HttpsServer = col.ServerURL
globusEsp.Description = "Activated with cached credentials"
globusEsp.TokenFile = tokenFileName
globusEsp.TransferTokenFile = transferTokenFileName
globusExports[esp.GlobusCollectionID] = &globusEsp
}
return nil
Expand All @@ -191,7 +225,28 @@ func isExportActivated(fedPrefix string) (ok bool) {
return false
}

// Iterate over all Globus exports and refresh the token. Skip any inactive exports.
// refreshTokenWithRetry handles token refresh with retry logic for a specific token type
func refreshTokenWithRetry(cid string, token *oauth2.Token, tokenType GlobusTokenType, exp *globusExport) (*oauth2.Token, error) {
newTok, err := refreshGlobusToken(cid, token, tokenType)
if err != nil {
log.Errorf("Failed to refresh Globus %s token for collection %s with name %s. Will retry once: %v", tokenType, cid, exp.DisplayName, err)
newTok, err = refreshGlobusToken(cid, token, tokenType)
if err != nil {
log.Errorf("Failed to retry refreshing Globus %s token for collection %s with name %s: %v", tokenType, cid, exp.DisplayName, err)
exp.Status = GlobusInactive
exp.Description = fmt.Sprintf("Failed to refresh %s token: %v", tokenType, err)
return nil, err
}
}
if newTok == nil {
log.Debugf("Globus %s token for collection %s with name %s is still valid. Refresh skipped", tokenType, cid, exp.DisplayName)
} else {
log.Debugf("Globus %s token for collection %s with name %s is refreshed", tokenType, cid, exp.DisplayName)
}
return newTok, nil
}

// Iterate over all Globus exports and refresh the tokens. Skip any inactive exports.
// Retry once if first attempt failed. If retry failed, mark the activated export to inactive
// and provide error detail in the export description.
//
Expand All @@ -206,24 +261,25 @@ func doGlobusTokenRefresh() error {
if exp.Status == GlobusInactive {
return nil
}
newTok, err := refreshGlobusToken(cid, exp.Token)

// Refresh collection token
newTok, err := refreshTokenWithRetry(cid, exp.Token, TokenTypeCollection, exp)
if err != nil {
log.Errorf("Failed to refresh Globus token for collection %s with name %s. Will retry once: %v", cid, exp.DisplayName, err)
newTok, err = refreshGlobusToken(cid, exp.Token)
if err != nil {
log.Errorf("Failed to retry refreshing Globus token for collection %s with name %s: %v", cid, exp.DisplayName, err)
exp.Status = GlobusInactive
exp.Description = fmt.Sprintf("Failed to refresh token: %v", err)
return err
}
return err
}
if newTok == nil {
log.Debugf("Globus token for collection %s with name %s is still valid. Refresh skipped", cid, exp.DisplayName)
} else {
// Update globusExport with the new token
if newTok != nil {
expInt.Token = newTok
log.Debugf("Globus token for collection %s with name %s is refreshed", cid, exp.DisplayName)
}

// Refresh transfer token
newTransferTok, err := refreshTokenWithRetry(cid, exp.TransferToken, TokenTypeTransfer, exp)
if err != nil {
return err
}
if newTransferTok != nil {
expInt.TransferToken = newTransferTok
}

return nil
}(cid, exp)
if err != nil && firstErr == nil {
Expand Down
Loading
Loading