Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,15 +373,15 @@ func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[]
Status: HealthStatusInit,
}
errgrp.Go(func() error {
LaunchPeriodicDirectorTest(cancelCtx, sAd)
LaunchPeriodicDirectorTest(cancelCtx, ad.URL.String())
return nil
})
log.Debugf("New director test suite issued for %s %s. Errgroup was evicted", string(ad.Type), ad.URL.String())
} else {
// Existing errorgroup still working
cancelCtx, cancel := context.WithCancel(existingUtil.ErrGrpContext)
started := existingUtil.ErrGrp.TryGo(func() error {
LaunchPeriodicDirectorTest(cancelCtx, sAd)
LaunchPeriodicDirectorTest(cancelCtx, ad.URL.String())
return nil
})
if !started {
Expand Down Expand Up @@ -411,7 +411,7 @@ func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[]
Status: HealthStatusInit,
}
errgrp.Go(func() error {
LaunchPeriodicDirectorTest(cancelCtx, sAd)
LaunchPeriodicDirectorTest(cancelCtx, ad.URL.String())
return nil
})
}
Expand Down
154 changes: 101 additions & 53 deletions director/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"net/url"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -132,23 +133,41 @@ func reportStatusToServer(ctx context.Context, serverWebUrl string, status strin
return nil
}

// isDowntimeActive checks if a downtime is currently active based on the current time.
// A downtime is considered active if the current time is between StartTime and EndTime (inclusive),
// or if EndTime is IndefiniteEndTime and current time is after StartTime.
func isDowntimeActive(downtime server_structs.Downtime, currentTime int64) bool {
return downtime.StartTime <= currentTime && (downtime.EndTime >= currentTime || downtime.EndTime == server_structs.IndefiniteEndTime)
}

// Run a periodic test file transfer against an origin to ensure
// it's talking to the director
func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.ServerAd) {
serverName := serverAd.Name
serverUrl := serverAd.URL.String()
serverWebUrl := serverAd.WebURL.String()
// it's talking to the director. The test fetches the current server ad
// from the TTL cache on each cycle and stops when the ad is no longer present.
func LaunchPeriodicDirectorTest(ctx context.Context, serverUrlStr string) {
// Option to disable touch on hit when fetching from cache to avoid extending TTL
disableTouchOpt := ttlcache.WithDisableTouchOnHit[string, *server_structs.Advertisement]()

// Fetch the initial server ad to set up metrics
initialAdItem := serverAds.Get(serverUrlStr, disableTouchOpt)
if initialAdItem == nil {
log.Errorf("Failed to start director test suite: server ad not found in cache for URL %s. Test will not be started.", serverUrlStr)
return
}
initialAd := initialAdItem.Value()
serverName := initialAd.Name
serverWebUrl := initialAd.WebURL.String()
serverType := initialAd.Type

log.Debug(fmt.Sprintf("Starting a new director test suite for %s server %s at %s", serverAd.Type, serverName, serverUrl))
log.Debug(fmt.Sprintf("Starting a new director test suite for %s server %s at %s", serverType, serverName, serverUrlStr))

metrics.PelicanDirectorFileTransferTestSuite.With(
prometheus.Labels{
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": string(serverAd.Type),
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": serverType,
}).Inc()

metrics.PelicanDirectorActiveFileTransferTestSuite.With(
prometheus.Labels{
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": string(serverAd.Type),
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": serverType,
}).Inc()

customInterval := param.Director_OriginCacheHealthTestInterval.GetDuration()
Expand All @@ -162,46 +181,75 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
ticker := time.NewTicker(customInterval)

defer ticker.Stop()
defer func() {
metrics.PelicanDirectorActiveFileTransferTestSuite.With(
prometheus.Labels{
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": serverType,
}).Dec()
}()

for {
select {
case <-ctx.Done():
log.Debug(fmt.Sprintf("End director test suite for %s server %s at %s", serverAd.Type, serverName, serverUrl))

metrics.PelicanDirectorActiveFileTransferTestSuite.With(
prometheus.Labels{
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": string(serverAd.Type),
}).Dec()

log.Debug(fmt.Sprintf("Stopped the Director test suite for %s server %s at %s", serverType, serverName, serverUrlStr))
return
case <-ticker.C:
log.Debug(fmt.Sprintf("Starting a director test cycle for %s server %s at %s", serverAd.Type, serverName, serverUrl))
// Fetch the current server ad from the TTL cache
adItem := serverAds.Get(serverUrlStr, disableTouchOpt)
if adItem == nil {
log.Infof("The Director doesn't have any advertisements for server with URL %s. Stopping director tests.", serverUrlStr)
return
}
serverAd := adItem.Value().ServerAd

// Check if the server is in an active downtime
downtimes, err := getCachedDowntimes(serverAd.Name)
if err != nil {
log.Warningf("Failed to get cached downtimes for server %s: %v. Proceeding with director test.", serverAd.Name, err)
} else {
// Check if any downtime is currently active
currentTime := time.Now().UTC().UnixMilli()
hasActiveDowntime := false
for _, downtime := range downtimes {
if isDowntimeActive(downtime, currentTime) {
hasActiveDowntime = true
log.Debugf("Skipping director test cycle for %s server %s: server is in active downtime", serverAd.Type, serverAd.Name)
break
}
}

if hasActiveDowntime {
continue
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Director-test is still being triggered when this server in downtime.

I'd suggest replace this block with the following code, which passes the hand test in my local dev container. By checking filteredServers directly, the director test now uses the same source of truth as the redirect filtering, avoiding maintaining the downtimes traversal here.

Suggested change
// Check if the server is in an active downtime
downtimes, err := getCachedDowntimes(serverAd.Name)
if err != nil {
log.Warningf("Failed to get cached downtimes for server %s: %v. Proceeding with director test.", serverAd.Name, err)
} else {
// Check if any downtime is currently active
currentTime := time.Now().UTC().UnixMilli()
hasActiveDowntime := false
for _, downtime := range downtimes {
if isDowntimeActive(downtime, currentTime) {
hasActiveDowntime = true
log.Debugf("Skipping director test cycle for %s server %s: server is in active downtime", serverAd.Type, serverAd.Name)
break
}
}
if hasActiveDowntime {
continue
}
}
if isServerInDowntime(serverAd.Name) {
log.Debugf("Skipping director test cycle for %s server %s: server is in downtime", serverAd.Type, serverAd.Name)
continue
}
// This helper function should be placed in cache_ads.go
// isServerInDowntime checks if a server is in the filteredServers map with an active filter.
// A server is considered in downtime if it exists in filteredServers with any filter type except tempAllowed.
func isServerInDowntime(serverName string) bool {
filteredServersMutex.RLock()
defer filteredServersMutex.RUnlock()
existingFilterType, isServerFiltered := filteredServers[serverName]
return isServerFiltered && existingFilterType != tempAllowed
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented the suggested fix in commit 2c3d8fa. Now using isServerInDowntime() helper function that checks filteredServers map directly, ensuring director tests use the same source of truth as redirect filtering. This avoids maintaining duplicate downtime traversal logic and ensures consistency.


log.Debug(fmt.Sprintf("Starting a director test cycle for %s server %s at %s", serverAd.Type, serverAd.Name, serverAd.URL.String()))
ok := true
var err error
var testErr error
if serverAd.Type == server_structs.OriginType.String() {
fileTests := server_utils.TestFileTransferImpl{}
ok, err = fileTests.RunTests(ctx, serverUrl, serverUrl, "", server_utils.DirectorTest)
ok, testErr = fileTests.RunTests(ctx, serverAd.URL.String(), serverAd.URL.String(), "", server_utils.DirectorTest)
} else if serverAd.Type == server_structs.CacheType.String() {
err = runCacheTest(ctx, serverAd.URL)
testErr = runCacheTest(ctx, serverAd.URL)
}

// Successfully run a test, no error
if ok && err == nil {
log.Debugf("Director file transfer test cycle succeeded at %s for %s server with URL at %s", time.Now().Format(time.RFC3339), serverAd.Type, serverUrl)
if ok && testErr == nil {
log.Debugf("Director file transfer test cycle succeeded at %s for %s server with URL at %s", time.Now().Format(time.RFC3339), serverAd.Type, serverAd.URL.String())
func() {
healthTestUtilsMutex.Lock()
defer healthTestUtilsMutex.Unlock()
if existingUtil, ok := healthTestUtils[serverAd.URL.String()]; ok {
existingUtil.Status = HealthStatusOK
} else {
log.Debugln("HealthTestUtil missing for ", serverAd.Type, " server: ", serverUrl, " Failed to update internal status")
log.Debugln("HealthTestUtil missing for ", serverAd.Type, " server: ", serverAd.URL.String(), " Failed to update internal status")
}
}()

// Report error back to origin/server
if err := reportStatusToServer(
ctx,
serverWebUrl,
serverAd.WebURL.String(),
"ok", "Director test cycle succeeded at "+time.Now().Format(time.RFC3339),
serverAd.Type,
false,
Expand All @@ -210,7 +258,7 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
if err == originReportNotFoundError {
newErr := reportStatusToServer(
ctx,
serverWebUrl,
serverAd.WebURL.String(),
"ok", "Director test cycle succeeded at "+time.Now().Format(time.RFC3339),
serverAd.Type,
true, // Fallback to legacy endpoint
Expand All @@ -220,9 +268,9 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
log.Warningf("Failed to report director test result to %s server at %s: %v", serverAd.Type, serverAd.WebURL.String(), err)
metrics.PelicanDirectorFileTransferTestsRuns.With(
prometheus.Labels{
"server_name": serverName,
"server_web_url": serverWebUrl,
"server_type": string(serverAd.Type),
"server_name": serverAd.Name,
"server_web_url": serverAd.WebURL.String(),
"server_type": serverAd.Type,
"status": string(metrics.MetricSucceeded),
"report_status": string(metrics.MetricFailed),
},
Expand All @@ -231,9 +279,9 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
} else {
metrics.PelicanDirectorFileTransferTestsRuns.With(
prometheus.Labels{
"server_name": serverName,
"server_web_url": serverWebUrl,
"server_type": string(serverAd.Type),
"server_name": serverAd.Name,
"server_web_url": serverAd.WebURL.String(),
"server_type": serverAd.Type,
"status": string(metrics.MetricSucceeded),
"report_status": string(metrics.MetricSucceeded),
},
Expand All @@ -244,9 +292,9 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
log.Warningf("Failed to report director test result to %s server at %s: %v", serverAd.Type, serverAd.WebURL.String(), err)
metrics.PelicanDirectorFileTransferTestsRuns.With(
prometheus.Labels{
"server_name": serverName,
"server_web_url": serverWebUrl,
"server_type": string(serverAd.Type),
"server_name": serverAd.Name,
"server_web_url": serverAd.WebURL.String(),
"server_type": serverAd.Type,
"status": string(metrics.MetricSucceeded),
"report_status": string(metrics.MetricFailed),
},
Expand All @@ -256,39 +304,39 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
} else {
metrics.PelicanDirectorFileTransferTestsRuns.With(
prometheus.Labels{
"server_name": serverName,
"server_web_url": serverWebUrl,
"server_type": string(serverAd.Type),
"server_name": serverAd.Name,
"server_web_url": serverAd.WebURL.String(),
"server_type": serverAd.Type,
"status": string(metrics.MetricSucceeded),
"report_status": string(metrics.MetricSucceeded),
},
).Inc()
}
// The file tests failed. Report failure back to origin/cache
} else {
log.Warningln("Director file transfer test cycle failed for ", serverAd.Type, " server: ", serverUrl, " ", err)
log.Warningln("Director file transfer test cycle failed for ", serverAd.Type, " server: ", serverAd.URL.String(), " ", testErr)
func() {
healthTestUtilsMutex.Lock()
defer healthTestUtilsMutex.Unlock()
if existingUtil, ok := healthTestUtils[serverAd.URL.String()]; ok {
existingUtil.Status = HealthStatusError
} else {
log.Debugln("HealthTestUtil missing for", serverAd.Type, " server: ", serverUrl, " Failed to update internal status")
log.Debugln("HealthTestUtil missing for", serverAd.Type, " server: ", serverAd.URL.String(), " Failed to update internal status")
}
}()

if err := reportStatusToServer(
ctx,
serverWebUrl,
"error", "Director file transfer test cycle failed for origin: "+serverUrl+" "+err.Error(),
serverAd.WebURL.String(),
"error", "Director file transfer test cycle failed for origin: "+serverAd.URL.String()+" "+testErr.Error(),
serverAd.Type,
false,
); err != nil {
// origin <7.7 only supports legacy report endpoint. Fallback to the legacy one
if err == originReportNotFoundError {
newErr := reportStatusToServer(
ctx,
serverWebUrl,
serverAd.WebURL.String(),
"ok", "Director test cycle succeeded at "+time.Now().Format(time.RFC3339),
serverAd.Type,
true, // Fallback to legacy endpoint
Expand All @@ -298,9 +346,9 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
log.Warningf("Failed to report director test result to %s server at %s: %v", serverAd.Type, serverAd.WebURL.String(), err)
metrics.PelicanDirectorFileTransferTestsRuns.With(
prometheus.Labels{
"server_name": serverName,
"server_web_url": serverWebUrl,
"server_type": string(serverAd.Type),
"server_name": serverAd.Name,
"server_web_url": serverAd.WebURL.String(),
"server_type": serverAd.Type,
"status": string(metrics.MetricFailed),
"report_status": string(metrics.MetricFailed),
},
Expand All @@ -309,9 +357,9 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
} else {
metrics.PelicanDirectorFileTransferTestsRuns.With(
prometheus.Labels{
"server_name": serverName,
"server_web_url": serverWebUrl,
"server_type": string(serverAd.Type),
"server_name": serverAd.Name,
"server_web_url": serverAd.WebURL.String(),
"server_type": serverAd.Type,
"status": string(metrics.MetricFailed),
"report_status": string(metrics.MetricSucceeded),
},
Expand All @@ -322,9 +370,9 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
log.Warningf("Failed to report director test result to %s server at %s: %v", serverAd.Type, serverAd.WebURL.String(), err)
metrics.PelicanDirectorFileTransferTestsRuns.With(
prometheus.Labels{
"server_name": serverName,
"server_web_url": serverWebUrl,
"server_type": string(serverAd.Type),
"server_name": serverAd.Name,
"server_web_url": serverAd.WebURL.String(),
"server_type": serverAd.Type,
"status": string(metrics.MetricFailed),
"report_status": string(metrics.MetricFailed),
},
Expand All @@ -335,9 +383,9 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
// No error when reporting the result, we are good
metrics.PelicanDirectorFileTransferTestsRuns.With(
prometheus.Labels{
"server_name": serverName,
"server_web_url": serverWebUrl,
"server_type": string(serverAd.Type),
"server_name": serverAd.Name,
"server_web_url": serverAd.WebURL.String(),
"server_type": serverAd.Type,
"status": string(metrics.MetricFailed),
"report_status": string(metrics.MetricSucceeded),
},
Expand Down
Loading
Loading