Skip to content

Conversation

@ctwww
Copy link

@ctwww ctwww commented Dec 30, 2025

Description

Fixes a resource leak in pkg/apiclient/grpcproxy.go where TCP connections
were not being released when gRPC requests failed.

Changes

  • Close resp.Body when HTTP status code is not OK
  • Close resp.Body when gRPC status code is not OK

Problem

When executeRequest returned early due to errors (non-200 HTTP status or
non-OK gRPC status), the response body was not closed. This prevented the
underlying TCP connection from being returned to the connection pool,
causing connection leaks and potential OOM under high load with failed
requests.

Solution

Add utilio.Close(resp.Body) in both error return paths, following the
same pattern used in the success path. This ensures proper resource cleanup
regardless of the request outcome.

Testing

  • Verified that response bodies are properly closed in error scenarios
  • Tested with large-scale app queries including non-existent apps
  • Confirmed TCP connections are properly released without accumulation

Related Issue

Fixes #25823

Checklist:

  • Either (a) I've created an enhancement proposal and discussed it with the community, (b) this is a bug fix, or (c) this does not need to be in the release notes.
  • The title of the PR states what changed and the related issues number (used for the release note).
  • The title of the PR conforms to the Title of the PR
  • I've included "Closes [ISSUE #]" or "Fixes [ISSUE #]" in the description to automatically close the associated issue.
  • I've updated both the CLI and UI to expose my feature, or I plan to submit a second PR with them.
  • Does this PR require documentation updates?
  • I've updated documentation as required by this PR.
  • I have signed off all my commits as required by DCO
  • I have written unit and/or e2e tests for my change. PRs without these are unlikely to be merged.
  • My build is green (troubleshooting builds).
  • My new feature complies with the feature status guidelines.
  • I have added a brief description of why this PR is necessary and/or what this PR solves.
  • Optional. My organization is added to USERS.md.
  • Optional. For bug fixes, I've indicated what older releases this fix should be cherry-picked into (this may or may not happen depending on risk/complexity).

@ctwww ctwww requested a review from a team as a code owner December 30, 2025 09:40
@bunnyshell
Copy link

bunnyshell bot commented Dec 30, 2025

🔴 Preview Environment stopped on Bunnyshell

See: Environment Details | Pipeline Logs

Available commands (reply to this comment):

  • 🔵 /bns:start to start the environment
  • 🚀 /bns:deploy to redeploy the environment
  • /bns:delete to remove the environment

When executeRequest returns early due to errors (non-200 HTTP status or
non-OK gRPC status), the response body was not closed. This prevented
the underlying TCP connection from being returned to the connection pool,
causing connection leaks and potential OOM under high load with failed
requests.

This fix ensures resp.Body is properly closed in all error return paths,
following Go's net/http best practices.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
Signed-off-by: chentiewen <[email protected]>
@ctwww ctwww force-pushed the fix/execute-request-closeio branch from da5ef6a to 6697840 Compare December 30, 2025 09:44
@blakepettersson
Copy link
Member

Hey @ctwww this looks reasonable to me. Is there any chance we can get some tests for this somehow?

@todaywasawesome
Copy link
Contributor

go func() {
<-stream.Context().Done()
utilio.Close(resp.Body)
}()
defer utilio.Close(resp.Body)
c.httpClient.CloseIdleConnections()
Aren't these supposed to handle errors? I thought the lines you're changing returned nil to get back to this.

@ctwww
Copy link
Author

ctwww commented Dec 31, 2025

go func() {
<-stream.Context().Done()
utilio.Close(resp.Body)
}()
defer utilio.Close(resp.Body)
c.httpClient.CloseIdleConnections()

Aren't these supposed to handle errors? I thought the lines you're changing returned nil to get back to this.

yes, function executeRequest() return err and this goroutine won't execute

@ctwww
Copy link
Author

ctwww commented Dec 31, 2025

Hey @ctwww this looks reasonable to me. Is there any chance we can get some tests for this somehow?

@blakepettersson
Thanks for the acknowledgment! I’ve written a test script, but since it’s quite long, I’ve uploaded it to https://github.com/ctwww/argocd-test-scrips/blob/main/fetch/main.go for your review and execution.

Execute the scrip and execute netstat -antl, you will find lots of unreleased tcp connections

Let me know if you need any adjustments or additional test cases!

Copy link
Contributor

@ppapapetrou76 ppapapetrou76 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @ctwww

@blakepettersson
Copy link
Member

@ctwww that script looks good, could that not be converted to an e2e test doing what you're doing in that script?

@reggie-k reggie-k added this to the v3.4 milestone Jan 5, 2026
@rumstead
Copy link
Member

rumstead commented Jan 5, 2026

LGTM would also like to see some testing where possible

Copy link
Contributor

@olivergondza olivergondza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice detective work chasing this down. LGTM.

@blakepettersson
Copy link
Member

blakepettersson commented Jan 11, 2026

How about adding this as a test?

# grpcproxy_test.go
package apiclient

import (
	"context"
	"io"
	"net/http"
	"net/http/httptest"
	"strconv"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/metadata"
)

func TestExecuteRequest_ClosesBodyOnHTTPError(t *testing.T) {
	bodyClosed := &atomic.Bool{}

	// Create a test server that returns HTTP 500 error
	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusInternalServerError)
		w.Write([]byte("internal server error"))
	}))
	defer server.Close()

	// Create client with custom httpClient that tracks body closure
	originalTransport := http.DefaultTransport
	customTransport := &testTransport{
		base:       originalTransport,
		bodyClosed: bodyClosed,
	}

	c := &client{
		ServerAddr: server.URL[7:], // Remove "http://"
		PlainText:  true,
		httpClient: &http.Client{
			Transport: customTransport,
		},
		GRPCWebRootPath: "",
	}

	// Execute request that should fail with HTTP 500
	ctx := context.Background()
	md := metadata.New(map[string]string{})
	_, err := c.executeRequest(ctx, "/test.Service/Method", []byte("test"), md)

	// Verify error was returned
	require.Error(t, err)
	assert.Contains(t, err.Error(), "failed with status code 500")

	// Give a small delay to ensure Close() was called
	time.Sleep(10 * time.Millisecond)

	// Verify body was closed to prevent connection leak
	assert.True(t, bodyClosed.Load(), "response body should be closed on HTTP error to prevent connection leak")
}

func TestExecuteRequest_ClosesBodyOnGRPCError(t *testing.T) {
	bodyClosed := &atomic.Bool{}

	// Create a test server that returns HTTP 200 but with gRPC error status
	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Grpc-Status", "3") // codes.InvalidArgument
		w.Header().Set("Grpc-Message", "invalid argument")
		w.WriteHeader(http.StatusOK)
		w.Write([]byte("grpc error"))
	}))
	defer server.Close()

	// Create client with custom httpClient that tracks body closure
	originalTransport := http.DefaultTransport
	customTransport := &testTransport{
		base:       originalTransport,
		bodyClosed: bodyClosed,
	}

	c := &client{
		ServerAddr: server.URL[7:], // Remove "http://"
		PlainText:  true,
		httpClient: &http.Client{
			Transport: customTransport,
		},
		GRPCWebRootPath: "",
	}

	// Execute request that should fail with gRPC error
	ctx := context.Background()
	md := metadata.New(map[string]string{})
	_, err := c.executeRequest(ctx, "/test.Service/Method", []byte("test"), md)

	// Verify gRPC error was returned
	require.Error(t, err)
	assert.Contains(t, err.Error(), "invalid argument")

	// Give a small delay to ensure Close() was called
	time.Sleep(10 * time.Millisecond)

	// Verify body was closed to prevent connection leak
	assert.True(t, bodyClosed.Load(), "response body should be closed on gRPC error to prevent connection leak")
}

func TestExecuteRequest_ConcurrentErrorRequests_NoConnectionLeak(t *testing.T) {
	// This test simulates the scenario from the test script:
	// Multiple concurrent requests that fail should all close their response bodies

	var totalRequests atomic.Int32
	var closedBodies atomic.Int32

	// Create a test server that always returns errors
	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		totalRequests.Add(1)
		// Alternate between HTTP errors and gRPC errors
		if totalRequests.Load()%2 == 0 {
			w.WriteHeader(http.StatusBadRequest)
		} else {
			w.Header().Set("Grpc-Status", strconv.Itoa(int(codes.PermissionDenied)))
			w.Header().Set("Grpc-Message", "permission denied")
			w.WriteHeader(http.StatusOK)
		}
		w.Write([]byte("error response"))
	}))
	defer server.Close()

	// Create client with custom transport that tracks closures
	customTransport := &testTransport{
		base:       http.DefaultTransport,
		bodyClosed: &atomic.Bool{},
		onClose: func() {
			closedBodies.Add(1)
		},
	}

	c := &client{
		ServerAddr: server.URL[7:],
		PlainText:  true,
		httpClient: &http.Client{
			Transport: customTransport,
		},
		GRPCWebRootPath: "",
	}

	// Simulate concurrent requests like in the test script
	concurrency := 10
	iterations := 5

	var wg sync.WaitGroup
	for iter := 0; iter < iterations; iter++ {
		for i := 0; i < concurrency; i++ {
			wg.Add(1)
			go func() {
				defer wg.Done()
				ctx := context.Background()
				md := metadata.New(map[string]string{})
				_, err := c.executeRequest(ctx, "/application.ApplicationService/ManagedResources", []byte("test"), md)
				// We expect errors
				assert.Error(t, err)
			}()
		}
		wg.Wait()
	}

	// Give time for all Close() calls to complete
	time.Sleep(100 * time.Millisecond)

	// Verify all response bodies were closed
	expectedTotal := int32(concurrency * iterations)
	assert.Equal(t, expectedTotal, totalRequests.Load(), "all requests should have been made")
	assert.Equal(t, expectedTotal, closedBodies.Load(), "all response bodies should be closed to prevent connection leaks")
}

func TestExecuteRequest_SuccessDoesNotCloseBodyPrematurely(t *testing.T) {
	// Verify that successful requests do NOT close the body in executeRequest
	// (caller is responsible for closing in success case)

	bodyClosed := &atomic.Bool{}

	// Create a test server that returns success
	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Grpc-Status", "0") // codes.OK
		w.WriteHeader(http.StatusOK)
		w.Write(toFrame([]byte("success")))
	}))
	defer server.Close()

	customTransport := &testTransport{
		base:       http.DefaultTransport,
		bodyClosed: bodyClosed,
	}

	c := &client{
		ServerAddr: server.URL[7:],
		PlainText:  true,
		httpClient: &http.Client{
			Transport: customTransport,
		},
		GRPCWebRootPath: "",
	}

	// Execute successful request
	ctx := context.Background()
	md := metadata.New(map[string]string{})
	resp, err := c.executeRequest(ctx, "/test.Service/Method", []byte("test"), md)

	// Verify success
	require.NoError(t, err)
	require.NotNil(t, resp)
	defer resp.Body.Close()

	// Verify body was NOT closed by executeRequest (caller's responsibility)
	time.Sleep(10 * time.Millisecond)
	assert.False(t, bodyClosed.Load(), "response body should NOT be closed by executeRequest on success - caller is responsible")
}

// testTransport wraps http.RoundTripper to track body closures
type testTransport struct {
	base       http.RoundTripper
	bodyClosed *atomic.Bool
	onClose    func() // Optional callback for each close
}

func (t *testTransport) RoundTrip(req *http.Request) (*http.Response, error) {
	resp, err := t.base.RoundTrip(req)
	if err != nil {
		return nil, err
	}

	// Wrap the response body to track Close() calls
	resp.Body = &closeTracker{
		ReadCloser: resp.Body,
		closed:     t.bodyClosed,
		onClose:    t.onClose,
	}

	return resp, nil
}

type closeTracker struct {
	io.ReadCloser
	closed  *atomic.Bool
	onClose func()
}

func (c *closeTracker) Close() error {
	c.closed.Store(true)
	if c.onClose != nil {
		c.onClose()
	}
	return c.ReadCloser.Close()
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]Fix resource leak: TCP connections not released when gRPC requests fail

7 participants