Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3294,6 +3294,10 @@ func uploadObject(transfer *transferFile) (transferResult TransferResults, err e
Scheme: "https",
Path: transfer.remoteURL.Path,
}
// Add the oss.asize query parameter for PUT requests
query := dest.Query()
query.Set("oss.asize", fmt.Sprintf("%d", sizer.Size()))
dest.RawQuery = query.Encode()
attempt.Endpoint = dest.Host
// Create the wrapped reader and send it to the request
closed := make(chan bool, 1)
Expand Down
9 changes: 9 additions & 0 deletions docs/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,15 @@ root_default: /run/pelican/xrootd/origin/globus
default: $XDG_RUNTIME_DIR/pelican/xrootd/origin/globus
components: ["origin"]
---
name: Origin.GlobusTransferTokenFile
description: |+
The filepath to the Globus transfer token file. This corresponds to the `globus.transfer_token_file`
configuration in XRootD's Globus plugin. If not set, it will be automatically derived from
the Globus token directory.
type: filename
default: none
components: ["origin"]
---
name: Origin.FedTokenLocation
description: |+
A path to the file containing a token issued by the federation's issuer. This token may be consumed by other federation services
Expand Down
9 changes: 8 additions & 1 deletion origin/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pelicanplatform/pelican/database"
"github.com/pelicanplatform/pelican/features"
"github.com/pelicanplatform/pelican/metrics"
pelican_oauth2 "github.com/pelicanplatform/pelican/oauth2"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/server_utils"
Expand Down Expand Up @@ -218,7 +219,13 @@ func (server *OriginServer) CreateAdvertisement(name, originUrlStr, originWebUrl
if len(prefixes) == 0 {
if isGlobusBackend {
activateUrl := param.Server_ExternalWebUrl.GetString() + "/view/origin/globus"
return nil, fmt.Errorf("failed to create advertisement: no activated Globus collection. Go to %s to activate your collection.", activateUrl)
callbackUrl, err := pelican_oauth2.GetRedirectURL(globusCallbackPath)
errMsg := fmt.Sprintf("failed to create advertisement: no activated Globus collection. Go to %s to activate your collection", activateUrl)
if err == nil {
errMsg += fmt.Sprintf(". The Globus app expects the following redirect URL: %s; ", callbackUrl)
errMsg += fmt.Sprintf("ensure the %s configuration parameter is set correctly.", param.OIDC_ClientRedirectHostname.GetName())
}
return nil, errors.New(errMsg)
} else {
return nil, errors.New("failed to create advertisement: no valid export")
}
Expand Down
145 changes: 98 additions & 47 deletions origin/globus.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@ type globusExportStatus string

// For internal globusExports map
type globusExport struct {
DisplayName string `json:"displayName"`
FederationPrefix string `json:"federationPrefix"`
Status globusExportStatus `json:"status"`
Description string `json:"description,omitempty"` // status description
HttpsServer string `json:"httpsServer"` // server url to access files in the collection
Token *oauth2.Token `json:"-"`
DisplayName string `json:"displayName"`
FederationPrefix string `json:"federationPrefix"`
Status globusExportStatus `json:"status"`
Description string `json:"description,omitempty"` // status description
HttpsServer string `json:"httpsServer"` // server url to access files in the collection
Token *oauth2.Token `json:"-"`
TransferToken *oauth2.Token `json:"-"`
TokenFile string `json:"-"`
TransferTokenFile string `json:"-"`
}

// For UI
Expand All @@ -63,21 +66,29 @@ const (
GlobusActivated = "Activated"
)

const GlobusTokenFileExt = ".tok" // File extension for caching Globus access token
const GlobusTokenFileExt = ".tok" // File extension for caching Globus access token
const GlobusTransferTokenFileExt = ".transfer.tok" // File extension for caching Globus transfer token

var (
// An in-memory map-struct to keep Globus collections information with key being the collection UUID.
globusExports map[string]*globusExport
globusExportsMutex = sync.RWMutex{}
)

// InitGlobusBackend does the following things to initialize Globus-related logic
// 1. It initializes the global map structure globusExports to store collection information in-memory
// 2. It checks and setup location for Globus access tokens after user activates the collection
// 3. It loads the Globus OAuth client for OAuth-based authorization to access collection data
// 4. It populates the global map by the exported Origin prefixes/collections. It reads the persisted credentials
// from the origin's SQLite DB and populate the global map, refresh the access token by the persisted
// refresh token
// loadTokenFromDB loads and refreshes a token from the database for a specific token type
func loadTokenFromDB(cid string, refreshToken string, tokenType TokenType, globusAuthCfg *oauth2.Config) (*oauth2.Token, error) {
refToken := &oauth2.Token{
RefreshToken: refreshToken,
}
tokenSource := globusAuthCfg.TokenSource(context.Background(), refToken)
token, err := tokenSource.Token()
if err != nil {
return nil, fmt.Errorf("failed to refresh %s token for collection %s: %v", tokenType, cid, err)
}
return token, nil
}

// InitGlobusBackend initializes the Globus backend by loading existing collections from the database
func InitGlobusBackend(exps []server_utils.OriginExport) error {
uid, err := config.GetDaemonUID()
if err != nil {
Expand Down Expand Up @@ -112,27 +123,28 @@ func InitGlobusBackend(exps []server_utils.OriginExport) error {

globusAuthCfg, err := GetGlobusOAuthCfg()
if err != nil {
return errors.Wrap(err, "failed to get Globus OAuth2 config")
return errors.Wrap(err, "failed to get Globus OAuth config")
}

// Populate globusExports map
globusExportsMutex.Lock()
defer globusExportsMutex.Unlock()
for _, esp := range exps {
if esp.GlobusCollectionID == "" {
continue
}

globusEsp := globusExport{
DisplayName: esp.GlobusCollectionName,
FederationPrefix: esp.FederationPrefix,
Status: GlobusInactive,
Description: "Server start",
Description: "Not activated",
}
// We check the origin db and see if we already have the refresh token in-place
// If so, use the token to initialize the collection

// Check if the collection exists in the database
ok, err := collectionExistsByUUID(esp.GlobusCollectionID)
if err != nil {
return errors.Wrapf(err, "failed to check credential status for Globus collection %s with name %s", esp.GlobusCollectionID, esp.GlobusCollectionName)
return errors.Wrapf(err, "failed to check if Globus collection %s with name %s exists in DB", esp.GlobusCollectionID, esp.GlobusCollectionName)
}
if !ok {
log.Infof("Globus collection %s with name %s is not activated. You need to activate it in the admin website before using this collection", esp.GlobusCollectionID, esp.GlobusCollectionName)
// Collection doesn't exist in DB, mark as inactive
globusExports[esp.GlobusCollectionID] = &globusEsp
continue
}
Expand All @@ -142,22 +154,36 @@ func InitGlobusBackend(exps []server_utils.OriginExport) error {
return errors.Wrapf(err, "failed to get credentials for Globus collection %s with name %s", esp.GlobusCollectionID, esp.GlobusCollectionName)
}

refToken := &oauth2.Token{
RefreshToken: col.RefreshToken,
}
tokenSource := globusAuthCfg.TokenSource(context.Background(), refToken)
collectionToken, err := tokenSource.Token()
// If we can't get the access token, we want to evict the entry from db and
// ask the user to redo the authentication
// Load collection token
collectionToken, err := loadTokenFromDB(col.UUID, col.RefreshToken, TokenTypeCollection, globusAuthCfg)
if err != nil {
if err := deleteCollectionByUUID(col.UUID); err != nil {
return errors.Wrapf(err, "failed to delete expired credential record for Globus collection %s with name %s", esp.GlobusCollectionID, esp.GlobusCollectionName)
}
log.Infof("Access credentials for Globus collection %s with name %s is expired and removed.", esp.GlobusCollectionID, esp.GlobusCollectionName)
globusExports[esp.GlobusCollectionID] = &globusEsp
continue
}

// Load transfer token
transferToken, err := loadTokenFromDB(col.UUID, col.TransferRefreshToken, TokenTypeTransfer, globusAuthCfg)
if err != nil {
if err := deleteCollectionByUUID(col.UUID); err != nil {
return errors.Wrapf(err, "failed to delete expired credential record for Globus collection %s with name %s", esp.GlobusCollectionID, esp.GlobusCollectionName)
}
log.Infof("Transfer access credentials for Globus collection %s with name %s is expired and removed.", esp.GlobusCollectionID, esp.GlobusCollectionName)
globusExports[esp.GlobusCollectionID] = &globusEsp
Comment on lines +175 to +180
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the error handling logic here meant to do? As is, I'm reading that if there's an error loading the token from the database, you try to delete the collection. Can you explain why?

continue
}

// Save the new access token
if err := persistAccessToken(col.UUID, collectionToken); err != nil {
// Save the new access tokens
var tokenFileName string
var transferTokenFileName string
if tokenFileName, err = persistToken(col.UUID, collectionToken, TokenTypeCollection); err != nil {
return err
}

if transferTokenFileName, err = persistToken(col.UUID, transferToken, TokenTypeTransfer); err != nil {
return err
}

Expand All @@ -170,8 +196,11 @@ func InitGlobusBackend(exps []server_utils.OriginExport) error {

globusEsp.Status = GlobusActivated
globusEsp.Token = collectionToken
globusEsp.TransferToken = transferToken
globusEsp.HttpsServer = col.ServerURL
globusEsp.Description = "Activated with cached credentials"
globusEsp.TokenFile = tokenFileName
globusEsp.TransferTokenFile = transferTokenFileName
globusExports[esp.GlobusCollectionID] = &globusEsp
}
return nil
Expand All @@ -191,7 +220,28 @@ func isExportActivated(fedPrefix string) (ok bool) {
return false
}

// Iterate over all Globus exports and refresh the token. Skip any inactive exports.
// refreshTokenWithRetry handles token refresh with retry logic for a specific token type
func refreshTokenWithRetry(cid string, token *oauth2.Token, tokenType TokenType, exp *globusExport) (*oauth2.Token, error) {
newTok, err := refreshGlobusToken(cid, token, tokenType)
if err != nil {
log.Errorf("Failed to refresh Globus %s token for collection %s with name %s. Will retry once: %v", tokenType, cid, exp.DisplayName, err)
newTok, err = refreshGlobusToken(cid, token, tokenType)
if err != nil {
log.Errorf("Failed to retry refreshing Globus %s token for collection %s with name %s: %v", tokenType, cid, exp.DisplayName, err)
exp.Status = GlobusInactive
exp.Description = fmt.Sprintf("Failed to refresh %s token: %v", tokenType, err)
return nil, err
}
}
if newTok == nil {
log.Debugf("Globus %s token for collection %s with name %s is still valid. Refresh skipped", tokenType, cid, exp.DisplayName)
} else {
log.Debugf("Globus %s token for collection %s with name %s is refreshed", tokenType, cid, exp.DisplayName)
}
return newTok, nil
}

// Iterate over all Globus exports and refresh the tokens. Skip any inactive exports.
// Retry once if first attempt failed. If retry failed, mark the activated export to inactive
// and provide error detail in the export description.
//
Expand All @@ -206,24 +256,25 @@ func doGlobusTokenRefresh() error {
if exp.Status == GlobusInactive {
return nil
}
newTok, err := refreshGlobusToken(cid, exp.Token)

// Refresh collection token
newTok, err := refreshTokenWithRetry(cid, exp.Token, TokenTypeCollection, exp)
if err != nil {
log.Errorf("Failed to refresh Globus token for collection %s with name %s. Will retry once: %v", cid, exp.DisplayName, err)
newTok, err = refreshGlobusToken(cid, exp.Token)
if err != nil {
log.Errorf("Failed to retry refreshing Globus token for collection %s with name %s: %v", cid, exp.DisplayName, err)
exp.Status = GlobusInactive
exp.Description = fmt.Sprintf("Failed to refresh token: %v", err)
return err
}
return err
}
if newTok == nil {
log.Debugf("Globus token for collection %s with name %s is still valid. Refresh skipped", cid, exp.DisplayName)
} else {
// Update globusExport with the new token
if newTok != nil {
expInt.Token = newTok
log.Debugf("Globus token for collection %s with name %s is refreshed", cid, exp.DisplayName)
}

// Refresh transfer token
newTransferTok, err := refreshTokenWithRetry(cid, exp.TransferToken, TokenTypeTransfer, exp)
if err != nil {
return err
}
if newTransferTok != nil {
expInt.TransferToken = newTransferTok
}

return nil
}(cid, exp)
if err != nil && firstErr == nil {
Expand Down
Loading
Loading