Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,15 @@ func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[]
Status: HealthStatusInit,
}
errgrp.Go(func() error {
LaunchPeriodicDirectorTest(cancelCtx, sAd)
LaunchPeriodicDirectorTest(cancelCtx, ad.URL.String())
return nil
})
log.Debugf("New director test suite issued for %s %s. Errgroup was evicted", string(ad.Type), ad.URL.String())
} else {
// Existing errorgroup still working
cancelCtx, cancel := context.WithCancel(existingUtil.ErrGrpContext)
started := existingUtil.ErrGrp.TryGo(func() error {
LaunchPeriodicDirectorTest(cancelCtx, sAd)
LaunchPeriodicDirectorTest(cancelCtx, ad.URL.String())
return nil
})
if !started {
Expand Down Expand Up @@ -422,7 +422,7 @@ func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[]
Status: HealthStatusInit,
}
errgrp.Go(func() error {
LaunchPeriodicDirectorTest(cancelCtx, sAd)
LaunchPeriodicDirectorTest(cancelCtx, ad.URL.String())
return nil
})
}
Expand Down Expand Up @@ -546,6 +546,16 @@ func applyServerDowntimes(serverName string, downtimes []server_structs.Downtime
}
}

// isServerInDowntime checks if a server is in the filteredServers map with an active filter.
// A server is considered in downtime if it exists in filteredServers with any filter type except tempAllowed.
func isServerInDowntime(serverName string) bool {
filteredServersMutex.RLock()
defer filteredServersMutex.RUnlock()

existingFilterType, isServerFiltered := filteredServers[serverName]
return isServerFiltered && existingFilterType != tempAllowed
}

// applyActiveDowntimeFilter checks federationDowntimes for any active downtime for the given server
// and applies the tempFiltered filter immediately if found. This ensures that when a server wakes up
// mid-downtime, it is blocked right away without waiting for the next registry poll.
Expand Down
70 changes: 50 additions & 20 deletions director/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"net/url"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -124,9 +125,21 @@ func reportStatusToServer(ctx context.Context, serverWebUrl string, status strin
return nil
}

// Run a periodic test file transfer against an origin to ensure
// it's talking to the director
func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.ServerAd) {
// LaunchPeriodicDirectorTest runs periodic test file transfers against an origin or cache to ensure
// it's responding to director test requests. The test fetches the current server ad
// from the TTL cache on each cycle and stops when the ad is no longer present.
func LaunchPeriodicDirectorTest(ctx context.Context, serverUrlStr string) {
// Option to disable touch on hit when fetching from cache to avoid extending TTL
disableTouchOpt := ttlcache.WithDisableTouchOnHit[string, *server_structs.Advertisement]()

// Fetch the initial server ad to set up metrics
initialAdItem := serverAds.Get(serverUrlStr, disableTouchOpt)
if initialAdItem == nil {
log.Errorf("Failed to start director test suite: server ad not found in cache for URL %s. Test will not be started.", serverUrlStr)
return
}
initialAd := initialAdItem.Value()
serverAd := initialAd.ServerAd
serverName := serverAd.Name
serverUrl := serverAd.URL.String()
serverWebUrl := serverAd.WebURL.String()
Expand Down Expand Up @@ -158,15 +171,30 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
// runDirectorTestCycle executes a single director test cycle and reports the result back to the server.
// Extracted as a helper to allow running the first test immediately upon registration, avoiding the
// race condition where the origin/cache 30s timeout fires before the first ticker-driven test.
runDirectorTestCycle := func() {
log.Debug(fmt.Sprintf("Starting a director test cycle for %s server %s at %s", serverAd.Type, serverName, serverUrl))
// Returns true if the test was run, false if it was skipped (e.g., server not in cache or in downtime).
runDirectorTestCycle := func() bool {
// Fetch the current server ad from the TTL cache
adItem := serverAds.Get(serverUrlStr, disableTouchOpt)
if adItem == nil {
log.Infof("The Director doesn't have any advertisements for server with URL %s. Stopping director tests.", serverUrlStr)
return false
}
currentServerAd := adItem.Value().ServerAd

// Check if the server is in downtime by checking the filteredServers map
if isServerInDowntime(currentServerAd.Name) {
log.Debugf("Skipping director test cycle for %s server %s: server is in downtime", currentServerAd.Type, currentServerAd.Name)
return true // Return true to continue the loop, but don't run the test
}

log.Debug(fmt.Sprintf("Starting a director test cycle for %s server %s at %s", currentServerAd.Type, currentServerAd.Name, currentServerAd.URL.String()))
testSucceeded := true
var testErr error
if serverAd.Type == server_structs.OriginType.String() {
if currentServerAd.Type == server_structs.OriginType.String() {
fileTests := server_utils.TestFileTransferImpl{}
testSucceeded, testErr = fileTests.RunTests(ctx, serverUrl, serverUrl, "", server_utils.DirectorTest)
} else if serverAd.Type == server_structs.CacheType.String() {
testErr = runCacheTest(ctx, serverAd.URL)
testSucceeded, testErr = fileTests.RunTests(ctx, currentServerAd.URL.String(), currentServerAd.URL.String(), "", server_utils.DirectorTest)
} else if currentServerAd.Type == server_structs.CacheType.String() {
testErr = runCacheTest(ctx, currentServerAd.URL)
}

// Compose the result of this Director-test to report to the server
Expand All @@ -176,25 +204,25 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
reportStatus = "ok"
reportMessage = "Director test cycle succeeded at " + time.Now().Format(time.RFC3339)
healthStatus = HealthStatusOK
log.Debugf("Director file transfer test cycle succeeded at %s for %s server with URL at %s", time.Now().Format(time.RFC3339), serverAd.Type, serverUrl)
log.Debugf("Director file transfer test cycle succeeded at %s for %s server with URL at %s", time.Now().Format(time.RFC3339), currentServerAd.Type, currentServerAd.URL.String())
} else {
reportStatus = "error"
reportMessage = "Director file transfer test cycle failed for server: " + serverUrl
reportMessage = "Director file transfer test cycle failed for server: " + currentServerAd.URL.String()
if testErr != nil {
reportMessage += " " + testErr.Error()
}
healthStatus = HealthStatusError
log.Warningln("Director file transfer test cycle failed for ", serverAd.Type, " server: ", serverUrl, " ", testErr)
log.Warningln("Director file transfer test cycle failed for ", currentServerAd.Type, " server: ", currentServerAd.URL.String(), " ", testErr)
}

// Update healthTestUtils once per cycle
func() {
healthTestUtilsMutex.Lock()
defer healthTestUtilsMutex.Unlock()
if existingUtil, ok := healthTestUtils[serverAd.URL.String()]; ok {
if existingUtil, ok := healthTestUtils[currentServerAd.URL.String()]; ok {
existingUtil.Status = healthStatus
} else {
log.Debugln("HealthTestUtil missing for ", serverAd.Type, " server: ", serverUrl, " Failed to update internal status")
log.Debugln("HealthTestUtil missing for ", currentServerAd.Type, " server: ", currentServerAd.URL.String(), " Failed to update internal status")
}
}()

Expand All @@ -205,25 +233,27 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
}

// Report the result of this Director-test back to origin/server (single call)
reportErr := reportStatusToServer(ctx, serverWebUrl, reportStatus, reportMessage, serverAd.Type, false)
reportErr := reportStatusToServer(ctx, currentServerAd.WebURL.String(), reportStatus, reportMessage, currentServerAd.Type, false)

// Determine report status metric and log if reporting failed
reportStatusMetric := metrics.MetricSucceeded
if reportErr != nil {
reportStatusMetric = metrics.MetricFailed
log.Warningf("Failed to report director test result to %s server at %s: %v", serverAd.Type, serverAd.WebURL.String(), reportErr)
log.Warningf("Failed to report director test result to %s server at %s: %v", currentServerAd.Type, currentServerAd.WebURL.String(), reportErr)
}

// Record metrics once per cycle
metrics.PelicanDirectorFileTransferTestsRuns.With(
prometheus.Labels{
"server_name": serverName,
"server_web_url": serverWebUrl,
"server_type": string(serverAd.Type),
"server_name": currentServerAd.Name,
"server_web_url": currentServerAd.WebURL.String(),
"server_type": string(currentServerAd.Type),
"status": string(testStatusMetric),
"report_status": string(reportStatusMetric),
},
).Inc()

return true // Test was run successfully
}

// Run the first test immediately to avoid race with origin/cache 30s timeout.
Expand All @@ -235,7 +265,7 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
for {
select {
case <-ctx.Done():
log.Debug(fmt.Sprintf("End director test suite for %s server %s at %s", serverAd.Type, serverName, serverUrl))
log.Debug(fmt.Sprintf("Stopped the Director test suite for %s server %s at %s", serverAd.Type, serverName, serverUrl))

metrics.PelicanDirectorActiveFileTransferTestSuite.With(
prometheus.Labels{
Expand Down
Loading
Loading