Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ pelican.yaml
oidc-client-id
oidc-client-secret
MaxMindKey
e2e_fed_tests.test
7 changes: 6 additions & 1 deletion daemon/launch_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,17 @@ func LaunchDaemons(ctx context.Context, launchers []Launcher, egrp *errgroup.Gro
log.Infof("Daemon %q with pid %d was killed", daemons[chosen].name, daemons[chosen].pid)
}
if waitResult := context.Cause(daemons[chosen].ctx); waitResult != nil {
metricName := strings.SplitN(launchers[chosen].Name(), ".", 2)[0]
if IsExpectedRestart() {
metrics.SetComponentHealthStatus(metrics.HealthStatusComponent(metricName), metrics.StatusShuttingDown, "XRootD restart in progress")
log.Infof("Daemon %q exited during expected restart: %v", daemons[chosen].name, waitResult)
return nil
}
if !daemons[chosen].expiry.IsZero() {
return nil
} else if errors.Is(waitResult, context.Canceled) {
return nil
}
metricName := strings.SplitN(launchers[chosen].Name(), ".", 2)[0]
metrics.SetComponentHealthStatus(metrics.HealthStatusComponent(metricName), metrics.StatusCritical,
launchers[chosen].Name()+" process failed unexpectedly")
err = errors.Wrapf(waitResult, "%s process failed unexpectedly", launchers[chosen].Name())
Expand Down
34 changes: 34 additions & 0 deletions daemon/restart_flag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/***************************************************************
*
* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/

package daemon

import "sync/atomic"

var expectedRestart atomic.Bool

// SetExpectedRestart marks whether a XRootD restart is currently in progress.
func SetExpectedRestart(inProgress bool) {
expectedRestart.Store(inProgress)
}

// IsExpectedRestart reports whether daemon shutdowns should be treated as
// intentional because a restart is underway.
func IsExpectedRestart() bool {
return expectedRestart.Load()
}
220 changes: 220 additions & 0 deletions e2e_fed_tests/restart_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
//go:build !windows

/***************************************************************
*
* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/

package fed_tests

import (
_ "embed"
"fmt"
"os"
"path/filepath"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/pelicanplatform/pelican/client"
"github.com/pelicanplatform/pelican/fed_test_utils"
"github.com/pelicanplatform/pelican/metrics"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/test_utils"
"github.com/pelicanplatform/pelican/xrootd"
)

func waitForComponentStatus(t *testing.T, component metrics.HealthStatusComponent, desired metrics.HealthStatusEnum, timeout time.Duration) {
t.Helper()
require.Eventually(t, func() bool {
status, err := metrics.GetComponentStatus(component)
if err != nil {
return false
}
return status == desired.String()
}, timeout, 100*time.Millisecond, "component %s did not reach status %s", component, desired)
}

func waitForComponentStatusMatch(t *testing.T, component metrics.HealthStatusComponent, desired []metrics.HealthStatusEnum, timeout time.Duration) {
t.Helper()
require.Eventually(t, func() bool {
status, err := metrics.GetComponentStatus(component)
if err != nil {
return false
}
for _, target := range desired {
if status == target.String() {
return true
}
}
return false
}, timeout, 100*time.Millisecond, "component %s did not reach expected statuses", component)
}

func waitForComponentStatusNotOK(t *testing.T, component metrics.HealthStatusComponent, timeout time.Duration) {
t.Helper()
require.Eventually(t, func() bool {
status, err := metrics.GetComponentStatus(component)
if err != nil {
return false
}
return status != metrics.StatusOK.String()
}, timeout, 50*time.Millisecond, "component %s never left OK state", component)
}

// TestXRootDRestart tests that XRootD can be restarted and continues to function
func TestXRootDRestart(t *testing.T) {
t.Cleanup(test_utils.SetupTestLogging(t))
server_utils.ResetTestState()
defer server_utils.ResetTestState()

// Create a federation with origin and cache
ft := fed_test_utils.NewFedTest(t, bothPubNamespaces)

// Create a test file to upload
tempDir := t.TempDir()
testFile := filepath.Join(tempDir, "test.txt")
testContent := "Hello from Pelican restart test"
require.NoError(t, os.WriteFile(testFile, []byte(testContent), 0644))

// Upload the file before restart
destUrl := fmt.Sprintf("pelican://%s:%d/first/namespace/restart/test.txt", param.Server_Hostname.GetString(), param.Server_WebPort.GetInt())
transferDetailsUpload, err := client.DoPut(ft.Ctx, testFile, destUrl, false, client.WithTokenLocation(ft.Token))
require.NoError(t, err)
require.NotEmpty(t, transferDetailsUpload)
assert.Greater(t, transferDetailsUpload[0].TransferredBytes, int64(0))

// Download the file to verify it works before restart
downloadFile := filepath.Join(tempDir, "download_before.txt")
transferDetailsDownload, err := client.DoGet(ft.Ctx, destUrl, downloadFile, false, client.WithTokenLocation(ft.Token))
require.NoError(t, err)
require.NotEmpty(t, transferDetailsDownload)
assert.Greater(t, transferDetailsDownload[0].TransferredBytes, int64(0))

// Verify content
downloadedContent, err := os.ReadFile(downloadFile)
require.NoError(t, err)
assert.Equal(t, testContent, string(downloadedContent))

// Get the origin server from the fed test (would need to expose this or get it another way)
// For now, we'll test the restart mechanism directly via RestartXrootd

// Restart the XRootD processes
oldPids := ft.Pids
require.NotEmpty(t, oldPids, "No PIDs found for XRootD processes")

waitForComponentStatus(t, metrics.OriginCache_XRootD, metrics.StatusOK, 10*time.Second)

restartDone := make(chan struct{})
var newPids []int
var restartErr error

go func() {
newPids, restartErr = xrootd.RestartXrootd(ft.Ctx, oldPids)
close(restartDone)
}()

waitForComponentStatusNotOK(t, metrics.OriginCache_XRootD, 5*time.Second)
waitForComponentStatusMatch(t, metrics.OriginCache_XRootD, []metrics.HealthStatusEnum{metrics.StatusShuttingDown, metrics.StatusCritical}, 5*time.Second)

<-restartDone
require.NoError(t, restartErr)
require.NotEmpty(t, newPids)
require.NotEqual(t, oldPids, newPids, "PIDs should be different after restart")

// Update the PIDs in the fed test
ft.Pids = newPids

waitForComponentStatus(t, metrics.OriginCache_XRootD, metrics.StatusOK, 10*time.Second)

// Try to download the file again after restart
downloadFileAfter := filepath.Join(tempDir, "download_after.txt")
transferDetailsAfter, err := client.DoGet(ft.Ctx, destUrl, downloadFileAfter, false, client.WithTokenLocation(ft.Token))
require.NoError(t, err)
require.NotEmpty(t, transferDetailsAfter)
assert.Greater(t, transferDetailsAfter[0].TransferredBytes, int64(0))

// Verify content after restart
downloadedContentAfter, err := os.ReadFile(downloadFileAfter)
require.NoError(t, err)
assert.Equal(t, testContent, string(downloadedContentAfter))

// Verify old PIDs are no longer running
for _, pid := range oldPids {
process, err := os.FindProcess(pid)
if err == nil {
// Try to signal the process - should fail if it's dead
err = process.Signal(syscall.Signal(0))
assert.Error(t, err, "Old PID %d should not be running after restart", pid)
}
}

// Verify new PIDs are running
for _, pid := range newPids {
process, err := os.FindProcess(pid)
require.NoError(t, err)
err = process.Signal(syscall.Signal(0))
require.NoError(t, err, "New PID %d should be running after restart", pid)
}
}

// TestXRootDRestartConcurrent tests that concurrent restart attempts are properly serialized
func TestXRootDRestartConcurrent(t *testing.T) {
t.Cleanup(test_utils.SetupTestLogging(t))
server_utils.ResetTestState()
defer server_utils.ResetTestState()

// Create a federation
ft := fed_test_utils.NewFedTest(t, bothPubNamespaces)

oldPids := ft.Pids
require.NotEmpty(t, oldPids, "No PIDs found for XRootD processes")

// Try two concurrent restarts
done := make(chan error, 2)

go func() {
_, err := xrootd.RestartXrootd(ft.Ctx, oldPids)
done <- err
}()

// Small delay to let first restart acquire the lock
time.Sleep(10 * time.Millisecond)

go func() {
_, err := xrootd.RestartXrootd(ft.Ctx, oldPids)
done <- err
}()

// Collect results
err1 := <-done
err2 := <-done

// One should succeed, one should fail with "already in progress"
if err1 == nil {
require.Error(t, err2)
assert.Contains(t, err2.Error(), "already in progress")
} else if err2 == nil {
require.Error(t, err1)
assert.Contains(t, err1.Error(), "already in progress")
} else {
t.Fatal("Both restart attempts failed, at least one should have succeeded")
}
}
7 changes: 6 additions & 1 deletion launchers/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m
}

log.Info("Launching cache")
launchers, err := xrootd.ConfigureLaunchers(false, configPath, false, true)
useCMSD := false
privileged := false
launchers, err := xrootd.ConfigureLaunchers(privileged, configPath, useCMSD, true)
if err != nil {
return nil, err
}
Expand All @@ -162,6 +164,9 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m
log.Infoln("Cache startup complete on port", port)
}

// Store restart information before launching
xrootd.StoreRestartInfo(launchers, egrp, portStartCallback, true, useCMSD, privileged)

pids, err := xrootd.LaunchDaemons(ctx, launchers, egrp, portStartCallback)
if err != nil {
return nil, err
Expand Down
6 changes: 5 additions & 1 deletion launchers/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func OriginServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group,
}

privileged := param.Origin_Multiuser.GetBool()
launchers, err := xrootd.ConfigureLaunchers(privileged, configPath, param.Origin_EnableCmsd.GetBool(), false)
useCMSD := param.Origin_EnableCmsd.GetBool()
launchers, err := xrootd.ConfigureLaunchers(privileged, configPath, useCMSD, false)
if err != nil {
return nil, err
}
Expand All @@ -147,6 +148,9 @@ func OriginServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group,
log.Infoln("Origin startup complete on port", port)
}

// Store restart information before launching
xrootd.StoreRestartInfo(launchers, egrp, portStartCallback, false, useCMSD, privileged)

pids, err := xrootd.LaunchDaemons(ctx, launchers, egrp, portStartCallback)
if err != nil {
return nil, err
Expand Down
Loading
Loading