Skip to content

Commit 816a58e

Browse files
Merge pull request #2890 from patrickbrophy/move-cache-monitoring
Fix cache monitoring and provide e2e test
2 parents d09060f + cfbda66 commit 816a58e

File tree

4 files changed

+130
-9
lines changed

4 files changed

+130
-9
lines changed

e2e_fed_tests/cache_stats_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
//go:build !windows
2+
3+
/***************************************************************
4+
*
5+
* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License"); you
8+
* may not use this file except in compliance with the License. You may
9+
* obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*
19+
***************************************************************/
20+
21+
package fed_tests
22+
23+
import (
24+
"fmt"
25+
"net/url"
26+
"os"
27+
"path/filepath"
28+
"testing"
29+
"time"
30+
31+
"github.com/prometheus/client_golang/prometheus/testutil"
32+
"github.com/stretchr/testify/assert"
33+
"github.com/stretchr/testify/require"
34+
35+
"github.com/pelicanplatform/pelican/client"
36+
"github.com/pelicanplatform/pelican/fed_test_utils"
37+
"github.com/pelicanplatform/pelican/metrics"
38+
"github.com/pelicanplatform/pelican/param"
39+
"github.com/pelicanplatform/pelican/server_utils"
40+
)
41+
42+
func TestCacheStatsE2E(t *testing.T) {
43+
server_utils.ResetTestState()
44+
defer server_utils.ResetTestState()
45+
46+
// Shorten the interval for the test to make it faster
47+
oldInterval := metrics.XrdCurlStatsInterval
48+
metrics.XrdCurlStatsInterval = 500 * time.Millisecond
49+
defer func() {
50+
metrics.XrdCurlStatsInterval = oldInterval
51+
}()
52+
53+
// Set up a federation with a public export
54+
originConfig := `
55+
Origin:
56+
StorageType: "posix"
57+
Exports:
58+
- StoragePrefix: /<SHOULD BE OVERRIDDEN>
59+
FederationPrefix: /test-namespace
60+
Capabilities: ["PublicReads", "Reads", "Writes", "DirectReads", "Listings"]
61+
`
62+
fed := fed_test_utils.NewFedTest(t, originConfig)
63+
64+
// Prepare data on Origin
65+
storageDir := fed.Exports[0].StoragePrefix
66+
testFile := "test-file.txt"
67+
content := []byte("hello world")
68+
err := os.WriteFile(filepath.Join(storageDir, testFile), content, 0644)
69+
require.NoError(t, err)
70+
71+
// Determine stats file location
72+
cacheRunDir := param.Cache_RunLocation.GetString()
73+
statsFile := filepath.Join(cacheRunDir, "xrootd.stats")
74+
75+
// Transfer file through Cache
76+
discoveryUrl, err := url.Parse(param.Federation_DiscoveryUrl.GetString())
77+
require.NoError(t, err)
78+
79+
pelicanUrl := fmt.Sprintf("pelican://%s%s/%s", discoveryUrl.Host, fed.Exports[0].FederationPrefix, testFile)
80+
81+
// Download to a temp file
82+
destFile := filepath.Join(t.TempDir(), "downloaded.txt")
83+
84+
// Perform transfer. This will trigger the xrdcl-curl plugin in the cache
85+
// to talk to the origin.
86+
_, err = client.DoCopy(fed.Ctx, pelicanUrl, destFile, false)
87+
require.NoError(t, err)
88+
89+
// Verify the downloaded content
90+
downloadedContent, err := os.ReadFile(destFile)
91+
require.NoError(t, err)
92+
assert.Equal(t, content, downloadedContent)
93+
94+
// 1. Wait for stats file to appear on disk.
95+
assert.Eventually(t, func() bool {
96+
_, err := os.Stat(statsFile)
97+
return err == nil
98+
}, 20*time.Second, 1*time.Second, "Stats file was never created by XRootD plugin")
99+
100+
// 2. Verify metrics are updated in Pelican.
101+
assert.Eventually(t, func() bool {
102+
return testutil.ToFloat64(metrics.XrdclQueueConsumed) > 0
103+
}, 10*time.Second, 500*time.Millisecond, "Metrics were never updated by Pelican monitoring")
104+
105+
t.Logf("Successfully verified that monitoring picked up stats from %s", statsFile)
106+
}

launchers/cache_serve.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,6 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m
111111
cache.LaunchFedTokManager(ctx, egrp, cacheServer)
112112
}
113113

114-
if param.Cache_EnableEvictionMonitoring.GetBool() {
115-
metrics.LaunchXrootdCacheEvictionMonitoring(ctx, egrp)
116-
}
117-
118-
metrics.LaunchXrdCurlStatsMonitoring(ctx, egrp)
119-
120114
concLimit := param.Cache_Concurrency.GetInt()
121115
if concLimit > 0 {
122116
server_utils.LaunchConcurrencyMonitoring(ctx, egrp, cacheServer.GetServerType())
@@ -164,6 +158,13 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m
164158
return nil, err
165159
}
166160
cacheServer.SetPids(pids)
161+
162+
if param.Cache_EnableEvictionMonitoring.GetBool() {
163+
metrics.LaunchXrootdCacheEvictionMonitoring(ctx, egrp)
164+
}
165+
166+
metrics.LaunchXrdCurlStatsMonitoring(ctx, egrp)
167+
167168
return cacheServer, nil
168169
}
169170

metrics/xrdcl_curl_stats.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package metrics
22

33
import (
44
"context"
5+
"errors"
56
"os"
67
"time"
78

@@ -11,13 +12,19 @@ import (
1112
"github.com/pelicanplatform/pelican/param"
1213
)
1314

15+
var (
16+
// The interval for polling the xrdcl-curl stats file
17+
// This is a variable so it can be shortened in tests
18+
XrdCurlStatsInterval = time.Second * 5
19+
)
20+
1421
// This function should be used up until XRootD v6 is released
1522
// When XRootD v6 is released, these stats will be available over the g-stream
1623
// Until then the stats are consumed from Cache.ClientStatisticsLocation
1724
// When XRootD v6 is released, this function should be removed along with Cache.ClientStatisticsLocation
1825
func LaunchXrdCurlStatsMonitoring(ctx context.Context, egrp *errgroup.Group) {
1926
egrp.Go(func() error {
20-
ticker := time.NewTicker(time.Second * 5)
27+
ticker := time.NewTicker(XrdCurlStatsInterval)
2128
defer ticker.Stop()
2229

2330
for {
@@ -27,7 +34,10 @@ func LaunchXrdCurlStatsMonitoring(ctx context.Context, egrp *errgroup.Group) {
2734
case <-ticker.C:
2835
stats, err := os.ReadFile(param.Cache_ClientStatisticsLocation.GetString())
2936
if err != nil {
30-
log.Errorf("XrdCurlStats monitoring: failed to read stats file: %v", err)
37+
log.Tracef("XrdCurlStats monitoring: failed to read stats file: %v", err)
38+
if !errors.Is(err, os.ErrNotExist) {
39+
log.Errorf("XrdCurlStats monitoring: failed to read stats file: %v", err)
40+
}
3141
continue
3242
}
3343
err = handleXrdcurlstatsPacket(stats)

metrics/xrootd_cache_eviction.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package metrics
2121
import (
2222
"context"
2323
"encoding/json"
24+
"errors"
2425
"os"
2526
"path"
2627
"path/filepath"
@@ -344,7 +345,10 @@ func LaunchXrootdCacheEvictionMonitoring(ctx context.Context, egrp *errgroup.Gro
344345
log.Trace("Xrootd cache eviction monitoring (attempting to ingest eviction data)")
345346
stats, err := os.ReadFile(statsFile)
346347
if err != nil {
347-
log.Errorf("Xrootd cache eviction monitoring: failed to read stats file: %v", err)
348+
log.Tracef("Xrootd cache eviction monitoring: failed to read stats file: %v", err)
349+
if !errors.Is(err, os.ErrNotExist) {
350+
log.Errorf("Xrootd cache eviction monitoring: failed to read stats file: %v", err)
351+
}
348352
continue
349353
}
350354
var dirStatistics DirStatistics

0 commit comments

Comments
 (0)