Skip to content

Commit 17856f7

Browse files
Copiloth2zh
andcommitted
Rebase changes onto latest main after PR #2832 consolidation
Applied TTL cache fetching and downtime checking to the new consolidated runDirectorTestCycle structure from PR #2832. Key changes: - Modified LaunchPeriodicDirectorTest to accept URL string parameter - Fetch fresh ServerAd from TTL cache on each test cycle - Check downtime status using isServerInDowntime() before running tests - Stop test suite when ServerAd is no longer in cache - Integrated with new runDirectorTestCycle helper function structure This maintains the original PR goals while working with the improved code structure from the consolidation in PR #2832. Co-authored-by: h2zh <[email protected]>
1 parent 53f297b commit 17856f7

File tree

2 files changed

+170
-587
lines changed

2 files changed

+170
-587
lines changed

director/monitor.go

Lines changed: 118 additions & 204 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ import (
4242
"github.com/pelicanplatform/pelican/token_scopes"
4343
)
4444

45-
var originReportNotFoundError = errors.New("Origin does not support new reporting API")
46-
4745
// Report the health status of test file transfer to storage server
4846
func reportStatusToServer(ctx context.Context, serverWebUrl string, status string, message string, serverType string, fallback bool) error {
4947
fedInfo, err := config.GetFederation(ctx)
@@ -123,12 +121,6 @@ func reportStatusToServer(ctx context.Context, serverWebUrl string, status strin
123121
if serverType == server_structs.OriginType.String() && resp.StatusCode != 200 {
124122
return errors.Errorf("error response %v from reporting director test: %v", resp.StatusCode, string(body))
125123
}
126-
if serverType == server_structs.CacheType.String() && resp.StatusCode == 404 {
127-
return errors.New("cache reports a 404 error. For cache version < v7.7.0, director-based test is not supported")
128-
}
129-
if serverType == server_structs.OriginType.String() && resp.StatusCode == 404 {
130-
return originReportNotFoundError
131-
}
132124

133125
return nil
134126
}
@@ -140,8 +132,8 @@ func isDowntimeActive(downtime server_structs.Downtime, currentTime int64) bool
140132
return downtime.StartTime <= currentTime && (downtime.EndTime >= currentTime || downtime.EndTime == server_structs.IndefiniteEndTime)
141133
}
142134

143-
// Run a periodic test file transfer against an origin to ensure
144-
// it's talking to the director. The test fetches the current server ad
135+
// LaunchPeriodicDirectorTest runs periodic test file transfers against an origin or cache to ensure
136+
// it's responding to director test requests. The test fetches the current server ad
145137
// from the TTL cache on each cycle and stops when the ad is no longer present.
146138
func LaunchPeriodicDirectorTest(ctx context.Context, serverUrlStr string) {
147139
// Option to disable touch on hit when fetching from cache to avoid extending TTL
@@ -154,20 +146,21 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverUrlStr string) {
154146
return
155147
}
156148
initialAd := initialAdItem.Value()
157-
serverName := initialAd.Name
158-
serverWebUrl := initialAd.WebURL.String()
159-
serverType := initialAd.Type
149+
serverAd := initialAd.ServerAd
150+
serverName := serverAd.Name
151+
serverUrl := serverAd.URL.String()
152+
serverWebUrl := serverAd.WebURL.String()
160153

161-
log.Debug(fmt.Sprintf("Starting a new director test suite for %s server %s at %s", serverType, serverName, serverUrlStr))
154+
log.Debug(fmt.Sprintf("Starting a new director test suite for %s server %s at %s", serverAd.Type, serverName, serverUrl))
162155

163156
metrics.PelicanDirectorFileTransferTestSuite.With(
164157
prometheus.Labels{
165-
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": serverType,
158+
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": string(serverAd.Type),
166159
}).Inc()
167160

168161
metrics.PelicanDirectorActiveFileTransferTestSuite.With(
169162
prometheus.Labels{
170-
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": serverType,
163+
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": string(serverAd.Type),
171164
}).Inc()
172165

173166
customInterval := param.Director_OriginCacheHealthTestInterval.GetDuration()
@@ -181,203 +174,124 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverUrlStr string) {
181174
ticker := time.NewTicker(customInterval)
182175

183176
defer ticker.Stop()
184-
defer func() {
185-
metrics.PelicanDirectorActiveFileTransferTestSuite.With(
177+
178+
// runDirectorTestCycle executes a single director test cycle and reports the result back to the server.
179+
// Extracted as a helper to allow running the first test immediately upon registration, avoiding the
180+
// race condition where the origin/cache 30s timeout fires before the first ticker-driven test.
181+
// Returns true if the test was run, false if it was skipped (e.g., server not in cache or in downtime).
182+
runDirectorTestCycle := func() bool {
183+
// Fetch the current server ad from the TTL cache
184+
adItem := serverAds.Get(serverUrlStr, disableTouchOpt)
185+
if adItem == nil {
186+
log.Infof("The Director doesn't have any advertisements for server with URL %s. Stopping director tests.", serverUrlStr)
187+
return false
188+
}
189+
currentServerAd := adItem.Value().ServerAd
190+
191+
// Check if the server is in downtime by checking the filteredServers map
192+
if isServerInDowntime(currentServerAd.Name) {
193+
log.Debugf("Skipping director test cycle for %s server %s: server is in downtime", currentServerAd.Type, currentServerAd.Name)
194+
return true // Return true to continue the loop, but don't run the test
195+
}
196+
197+
log.Debug(fmt.Sprintf("Starting a director test cycle for %s server %s at %s", currentServerAd.Type, currentServerAd.Name, currentServerAd.URL.String()))
198+
testSucceeded := true
199+
var testErr error
200+
if currentServerAd.Type == server_structs.OriginType.String() {
201+
fileTests := server_utils.TestFileTransferImpl{}
202+
testSucceeded, testErr = fileTests.RunTests(ctx, currentServerAd.URL.String(), currentServerAd.URL.String(), "", server_utils.DirectorTest)
203+
} else if currentServerAd.Type == server_structs.CacheType.String() {
204+
testErr = runCacheTest(ctx, currentServerAd.URL)
205+
}
206+
207+
// Compose the result of this Director-test to report to the server
208+
var reportStatus, reportMessage string // status (result of the Director-test) and message to report back to the server
209+
var healthStatus HealthTestStatus
210+
if testSucceeded && testErr == nil {
211+
reportStatus = "ok"
212+
reportMessage = "Director test cycle succeeded at " + time.Now().Format(time.RFC3339)
213+
healthStatus = HealthStatusOK
214+
log.Debugf("Director file transfer test cycle succeeded at %s for %s server with URL at %s", time.Now().Format(time.RFC3339), currentServerAd.Type, currentServerAd.URL.String())
215+
} else {
216+
reportStatus = "error"
217+
reportMessage = "Director file transfer test cycle failed for server: " + currentServerAd.URL.String()
218+
if testErr != nil {
219+
reportMessage += " " + testErr.Error()
220+
}
221+
healthStatus = HealthStatusError
222+
log.Warningln("Director file transfer test cycle failed for ", currentServerAd.Type, " server: ", currentServerAd.URL.String(), " ", testErr)
223+
}
224+
225+
// Update healthTestUtils once per cycle
226+
func() {
227+
healthTestUtilsMutex.Lock()
228+
defer healthTestUtilsMutex.Unlock()
229+
if existingUtil, ok := healthTestUtils[currentServerAd.URL.String()]; ok {
230+
existingUtil.Status = healthStatus
231+
} else {
232+
log.Debugln("HealthTestUtil missing for ", currentServerAd.Type, " server: ", currentServerAd.URL.String(), " Failed to update internal status")
233+
}
234+
}()
235+
236+
// Determine the metric status label based on test result
237+
testStatusMetric := metrics.MetricSucceeded
238+
if !testSucceeded || testErr != nil {
239+
testStatusMetric = metrics.MetricFailed
240+
}
241+
242+
// Report the result of this Director-test back to origin/server (single call)
243+
reportErr := reportStatusToServer(ctx, currentServerAd.WebURL.String(), reportStatus, reportMessage, currentServerAd.Type, false)
244+
245+
// Determine report status metric and log if reporting failed
246+
reportStatusMetric := metrics.MetricSucceeded
247+
if reportErr != nil {
248+
reportStatusMetric = metrics.MetricFailed
249+
log.Warningf("Failed to report director test result to %s server at %s: %v", currentServerAd.Type, currentServerAd.WebURL.String(), reportErr)
250+
}
251+
252+
// Record metrics once per cycle
253+
metrics.PelicanDirectorFileTransferTestsRuns.With(
186254
prometheus.Labels{
187-
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": serverType,
188-
}).Dec()
189-
}()
255+
"server_name": currentServerAd.Name,
256+
"server_web_url": currentServerAd.WebURL.String(),
257+
"server_type": string(currentServerAd.Type),
258+
"status": string(testStatusMetric),
259+
"report_status": string(reportStatusMetric),
260+
},
261+
).Inc()
262+
263+
return true // Test was run successfully
264+
}
265+
266+
// Run the first test immediately to avoid race with origin/cache 30s timeout.
267+
// Without this, time.NewTicker waits for the first interval before firing,
268+
// which could cause the origin/cache to report a missed test if registration
269+
// takes more than 15 seconds after the server started.
270+
if !runDirectorTestCycle() {
271+
// If the first test fails (server not in cache), stop immediately
272+
return
273+
}
190274

191275
for {
192276
select {
193277
case <-ctx.Done():
194-
log.Debug(fmt.Sprintf("Stopped the Director test suite for %s server %s at %s", serverType, serverName, serverUrlStr))
278+
log.Debug(fmt.Sprintf("Stopped the Director test suite for %s server %s at %s", serverAd.Type, serverName, serverUrl))
279+
280+
metrics.PelicanDirectorActiveFileTransferTestSuite.With(
281+
prometheus.Labels{
282+
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": string(serverAd.Type),
283+
}).Dec()
284+
195285
return
196286
case <-ticker.C:
197-
// Fetch the current server ad from the TTL cache
198-
adItem := serverAds.Get(serverUrlStr, disableTouchOpt)
199-
if adItem == nil {
200-
log.Infof("The Director doesn't have any advertisements for server with URL %s. Stopping director tests.", serverUrlStr)
287+
if !runDirectorTestCycle() {
288+
// If server is no longer in cache, stop the test suite
289+
metrics.PelicanDirectorActiveFileTransferTestSuite.With(
290+
prometheus.Labels{
291+
"server_name": serverName, "server_web_url": serverWebUrl, "server_type": string(serverAd.Type),
292+
}).Dec()
201293
return
202294
}
203-
serverAd := adItem.Value().ServerAd
204-
205-
// Check if the server is in downtime by checking the filteredServers map
206-
if isServerInDowntime(serverAd.Name) {
207-
log.Debugf("Skipping director test cycle for %s server %s: server is in downtime", serverAd.Type, serverAd.Name)
208-
continue
209-
}
210-
211-
log.Debug(fmt.Sprintf("Starting a director test cycle for %s server %s at %s", serverAd.Type, serverAd.Name, serverAd.URL.String()))
212-
ok := true
213-
var testErr error
214-
if serverAd.Type == server_structs.OriginType.String() {
215-
fileTests := server_utils.TestFileTransferImpl{}
216-
ok, testErr = fileTests.RunTests(ctx, serverAd.URL.String(), serverAd.URL.String(), "", server_utils.DirectorTest)
217-
} else if serverAd.Type == server_structs.CacheType.String() {
218-
testErr = runCacheTest(ctx, serverAd.URL)
219-
}
220-
221-
// Successfully run a test, no error
222-
if ok && testErr == nil {
223-
log.Debugf("Director file transfer test cycle succeeded at %s for %s server with URL at %s", time.Now().Format(time.RFC3339), serverAd.Type, serverAd.URL.String())
224-
func() {
225-
healthTestUtilsMutex.Lock()
226-
defer healthTestUtilsMutex.Unlock()
227-
if existingUtil, ok := healthTestUtils[serverAd.URL.String()]; ok {
228-
existingUtil.Status = HealthStatusOK
229-
} else {
230-
log.Debugln("HealthTestUtil missing for ", serverAd.Type, " server: ", serverAd.URL.String(), " Failed to update internal status")
231-
}
232-
}()
233-
234-
// Report error back to origin/server
235-
if err := reportStatusToServer(
236-
ctx,
237-
serverAd.WebURL.String(),
238-
"ok", "Director test cycle succeeded at "+time.Now().Format(time.RFC3339),
239-
serverAd.Type,
240-
false,
241-
); err != nil {
242-
// origin <7.7 only supports legacy report endpoint. Fallback to the legacy one
243-
if err == originReportNotFoundError {
244-
newErr := reportStatusToServer(
245-
ctx,
246-
serverAd.WebURL.String(),
247-
"ok", "Director test cycle succeeded at "+time.Now().Format(time.RFC3339),
248-
serverAd.Type,
249-
true, // Fallback to legacy endpoint
250-
)
251-
// If legacy endpoint still reports error
252-
if newErr != nil {
253-
log.Warningf("Failed to report director test result to %s server at %s: %v", serverAd.Type, serverAd.WebURL.String(), err)
254-
metrics.PelicanDirectorFileTransferTestsRuns.With(
255-
prometheus.Labels{
256-
"server_name": serverAd.Name,
257-
"server_web_url": serverAd.WebURL.String(),
258-
"server_type": serverAd.Type,
259-
"status": string(metrics.MetricSucceeded),
260-
"report_status": string(metrics.MetricFailed),
261-
},
262-
).Inc()
263-
// Successfully report to the origin/cache via the legacy endpoint
264-
} else {
265-
metrics.PelicanDirectorFileTransferTestsRuns.With(
266-
prometheus.Labels{
267-
"server_name": serverAd.Name,
268-
"server_web_url": serverAd.WebURL.String(),
269-
"server_type": serverAd.Type,
270-
"status": string(metrics.MetricSucceeded),
271-
"report_status": string(metrics.MetricSucceeded),
272-
},
273-
).Inc()
274-
}
275-
// If the error is not originReportNotFoundError, then we record the error right away
276-
} else {
277-
log.Warningf("Failed to report director test result to %s server at %s: %v", serverAd.Type, serverAd.WebURL.String(), err)
278-
metrics.PelicanDirectorFileTransferTestsRuns.With(
279-
prometheus.Labels{
280-
"server_name": serverAd.Name,
281-
"server_web_url": serverAd.WebURL.String(),
282-
"server_type": serverAd.Type,
283-
"status": string(metrics.MetricSucceeded),
284-
"report_status": string(metrics.MetricFailed),
285-
},
286-
).Inc()
287-
}
288-
// No error when reporting the result, we are good
289-
} else {
290-
metrics.PelicanDirectorFileTransferTestsRuns.With(
291-
prometheus.Labels{
292-
"server_name": serverAd.Name,
293-
"server_web_url": serverAd.WebURL.String(),
294-
"server_type": serverAd.Type,
295-
"status": string(metrics.MetricSucceeded),
296-
"report_status": string(metrics.MetricSucceeded),
297-
},
298-
).Inc()
299-
}
300-
// The file tests failed. Report failure back to origin/cache
301-
} else {
302-
log.Warningln("Director file transfer test cycle failed for ", serverAd.Type, " server: ", serverAd.URL.String(), " ", testErr)
303-
func() {
304-
healthTestUtilsMutex.Lock()
305-
defer healthTestUtilsMutex.Unlock()
306-
if existingUtil, ok := healthTestUtils[serverAd.URL.String()]; ok {
307-
existingUtil.Status = HealthStatusError
308-
} else {
309-
log.Debugln("HealthTestUtil missing for", serverAd.Type, " server: ", serverAd.URL.String(), " Failed to update internal status")
310-
}
311-
}()
312-
313-
if err := reportStatusToServer(
314-
ctx,
315-
serverAd.WebURL.String(),
316-
"error", "Director file transfer test cycle failed for origin: "+serverAd.URL.String()+" "+testErr.Error(),
317-
serverAd.Type,
318-
false,
319-
); err != nil {
320-
// origin <7.7 only supports legacy report endpoint. Fallback to the legacy one
321-
if err == originReportNotFoundError {
322-
newErr := reportStatusToServer(
323-
ctx,
324-
serverAd.WebURL.String(),
325-
"ok", "Director test cycle succeeded at "+time.Now().Format(time.RFC3339),
326-
serverAd.Type,
327-
true, // Fallback to legacy endpoint
328-
)
329-
// If legacy endpoint still reports error
330-
if newErr != nil {
331-
log.Warningf("Failed to report director test result to %s server at %s: %v", serverAd.Type, serverAd.WebURL.String(), err)
332-
metrics.PelicanDirectorFileTransferTestsRuns.With(
333-
prometheus.Labels{
334-
"server_name": serverAd.Name,
335-
"server_web_url": serverAd.WebURL.String(),
336-
"server_type": serverAd.Type,
337-
"status": string(metrics.MetricFailed),
338-
"report_status": string(metrics.MetricFailed),
339-
},
340-
).Inc()
341-
// Successfully report to the origin/cache via the legacy endpoint
342-
} else {
343-
metrics.PelicanDirectorFileTransferTestsRuns.With(
344-
prometheus.Labels{
345-
"server_name": serverAd.Name,
346-
"server_web_url": serverAd.WebURL.String(),
347-
"server_type": serverAd.Type,
348-
"status": string(metrics.MetricFailed),
349-
"report_status": string(metrics.MetricSucceeded),
350-
},
351-
).Inc()
352-
}
353-
// If the error is not originReportNotFoundError, then we record the error right away
354-
} else {
355-
log.Warningf("Failed to report director test result to %s server at %s: %v", serverAd.Type, serverAd.WebURL.String(), err)
356-
metrics.PelicanDirectorFileTransferTestsRuns.With(
357-
prometheus.Labels{
358-
"server_name": serverAd.Name,
359-
"server_web_url": serverAd.WebURL.String(),
360-
"server_type": serverAd.Type,
361-
"status": string(metrics.MetricFailed),
362-
"report_status": string(metrics.MetricFailed),
363-
},
364-
).Inc()
365-
}
366-
367-
} else {
368-
// No error when reporting the result, we are good
369-
metrics.PelicanDirectorFileTransferTestsRuns.With(
370-
prometheus.Labels{
371-
"server_name": serverAd.Name,
372-
"server_web_url": serverAd.WebURL.String(),
373-
"server_type": serverAd.Type,
374-
"status": string(metrics.MetricFailed),
375-
"report_status": string(metrics.MetricSucceeded),
376-
},
377-
).Inc()
378-
}
379-
}
380-
381295
}
382296
}
383297
}

0 commit comments

Comments
 (0)