Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔥 Feature (v3): Add buffered streaming support #3131

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

grivera64
Copy link
Contributor

Description

This feature adds buffered streaming support to Fiber v3 through the new Ctx method SendStreamWriter:

func (c Ctx ) SendStreamWriter(streamWriter func(w *bufio.Writer))

c.SendStreamWriter() essentially is a wrapper for calling fasthttp's SetBodyStreamWriter method. This feature wraps this method in the same way that c.SendStream() wraps fasthttp's SetBodyStream().

With this feature, Fiber users can send shorter segments of content over persistent HTTP connections. This functionality is important for several web-based applications such as:

  • Server-Side Events (SSE)
  • Large File Downloads

For example, a simple self-contained SSE example using this new feature can be setup as the following:

homeHtml := `<!DOCTYPE html>
<html>
    <body>
        <h1>SSE Messages</h1>
        <div id="result"></div>

        <script>
            if (typeof(EventSource) !== "undefined") {
                var source = new EventSource("http://127.0.0.1:3000/sse");
                source.addEventListener("my-event", function(event) {
                    document.getElementById("result").innerHTML += event.data + "<br>";
                });
            } else {
                document.getElementById("result").innerHTML = "Sorry, your browser does not support server-sent events...";
            }
        </script>
    </body>
</html>
`
app := fiber.New()
app.Get("/", func(c fiber.Ctx) error {
    c.Set("Content-Type", fiber.MIMETextHTMLCharsetUTF8)
    return c.SendString(homeHtml)
})

app.Get("/sse", func (c fiber.Ctx) error {
    c.Set("Content-Type", "text/event-stream")
    c.Set("Cache-Control", "no-cache")
    c.Set("Connection", "keep-alive")
    c.Set("Transfer-Encoding", "chunked")

    return c.Status(fiber.StatusOK).SendStreamWriter(func (w *bufio.Writer) {
        for {
            fmt.Fprintf(w, "event: my-event\ndata: %s", time.Now().Local())
            if err := w.Flush(); err != nil {
                return
            }
            time.Sleep(time.Second)
        }
    })
})

Fixes #3127

Type of change

Please delete options that are not relevant.

  • New feature (non-breaking change which adds functionality)
  • Documentation update (changes to documentation)

CURRENT STATUS

  • Features

    • Add Ctx.SendStreamWriter() to ctx.go
  • Unit Tests

    • Add Test_Ctx_SendStream_Writer to ctx_test.go

    • Add Test_Ctx_SendStreamWriter_Interrupted to ctx_test.go

      ⚠️ WARNING:
      This test gives a race condition warning when running with -race, but is stable and consistent
      via mutex/channels.

  • Documentation

    • Add Ctx.SendStreamWriter() docs to docs/api/ctx.md
  • Benchmarks

Checklist

Before you submit your pull request, please make sure you meet these requirements:

  • Followed the inspiration of the Express.js framework for new functionalities, making them similar in usage.
  • Conducted a self-review of the code and provided comments for complex or critical parts.
  • Updated the documentation in the /docs/ directory for Fiber's documentation.
  • Added or updated unit tests to validate the effectiveness of the changes or new features.
  • Ensured that new and existing unit tests pass locally with the changes.
  • Verified that any new dependencies are essential and have been agreed upon by the maintainers/community.
  • Aimed for optimal performance with minimal allocations in the new code.
  • Provided benchmarks for the new code to analyze and improve upon.

Create a new `*DefaultCtx` method called `SendStreamWriter()`
that maps to fasthttp's `Response.SetBodyStreamWriter()`
- Adds Test_Ctx_SendStreamWriter to ctx_test.go
- Adds Test_Ctx_SendStreamWriter_Interrupted to ctx_test.go
    - (Work-In-Progress) This test verifies that some data is
      still sent before a client disconnects when using the method
      `c.SendStreamWriter()`.

**Note:** Running this test reports a race condition when using
the `-race` flag or running `make test`. The test uses a channel
and mutex to prevent race conditions, but still triggers a warning.
@grivera64 grivera64 requested a review from a team as a code owner September 14, 2024 00:39
@grivera64 grivera64 requested review from gaby, sixcolors, ReneWerner87 and efectn and removed request for a team September 14, 2024 00:39
Copy link
Contributor

coderabbitai bot commented Sep 14, 2024

Walkthrough

The pull request introduces a new method, SendStreamWriter, to the DefaultCtx struct and the Ctx interface within the fiber package. This method allows users to set a response body stream writer using a function that accepts a pointer to a bufio.Writer. Additionally, two new test functions are added to validate the functionality of SendStreamWriter. Documentation is updated to include usage examples for this new method, enhancing the framework's ability to handle streaming data in HTTP responses.

Changes

Files Change Summary
ctx.go, ctx_interface_gen.go Added SendStreamWriter(func(*bufio.Writer)) error method to DefaultCtx and Ctx interface.
ctx_test.go Introduced tests Test_Ctx_SendStreamWriter and Test_Ctx_SendStreamWriter_Interrupted for the new method.
docs/api/ctx.md Updated documentation to include the SendStreamWriter method with usage examples.

Assessment against linked issues

Objective Addressed Explanation
Add buffered streaming support (Proposal #3127)
Ensure API stability without breaking changes (Proposal #3127)
Provide examples for Server-Side Events (SSE) in Fiber (Proposal #3127)

Poem

In the land of code where rabbits play,
A stream of data flows all day.
With SendStreamWriter, swift and bright,
We send our messages, a joyful sight!
Hopping through bytes, we dance and cheer,
For streaming support is finally here! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    -- I pushed a fix in commit <commit_id>, please review it.
    -- Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    -- @coderabbitai generate unit testing code for this file.
    -- @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    -- @coderabbitai generate interesting stats about this repository and render them as a table.
    -- @coderabbitai read src/utils.ts and generate unit testing code.
    -- @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    -- @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@gaby
Copy link
Member

gaby commented Sep 14, 2024

@grivera64 Try this:

func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
    t.Parallel()
    app := New()
    c := app.AcquireCtx(&fasthttp.RequestCtx{})

    var mutex sync.Mutex
    var wg sync.WaitGroup // WaitGroup to synchronize goroutines
    startChan := make(chan bool)

    interruptStreamWriter := func() {
        defer wg.Done() // Notify WaitGroup when done
        <-startChan
        time.Sleep(5 * time.Millisecond)
        mutex.Lock()
        c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error
        mutex.Unlock()
    }

    wg.Add(1) // Increment WaitGroup counter before starting goroutine
    err := c.SendStreamWriter(func(w *bufio.Writer) {
        go interruptStreamWriter()

        startChan <- true
        for lineNum := 1; lineNum <= 5; lineNum++ {
            mutex.Lock()
            fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
            mutex.Unlock()

            if err := w.Flush(); err != nil {
                if lineNum < 3 {
                    t.Errorf("unexpected error: %s", err)
                }
                return
            }

            time.Sleep(1500 * time.Microsecond)
        }
    })
    require.NoError(t, err)

    wg.Wait() // Wait for the interruptStreamWriter to finish

    // Protect access to the response body with the mutex
    mutex.Lock()
    defer mutex.Unlock()
    require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body()))
}

@ReneWerner87 ReneWerner87 added this to the v3 milestone Sep 15, 2024
@grivera64
Copy link
Contributor Author

grivera64 commented Sep 16, 2024

@gaby Thanks for the recommendation! Adding the Wait Group does remove the race error, but now I am getting an empty response body. I think this may be due to one of the following:

  • Using c.Response().CloseBodyStream() may be freeing the response body internally, making c.Context().Body() no longer valid after waiting long enough. This seems strange, but is the most probable, as it is a race condition.
    • The fix for this would be to find a different method to initiate a client disconnect.
  • The Wait Group isn't waiting long enough to allow Flush() to work. This is least probable, since I also tried increasing the counter to ensure that both interruptStreamWriter and SendStreamWriter finished executing, and the body returned is still empty:
// go test -run Test_Ctx_SendStreamWriter_Interrupted
func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
	t.Parallel()
	app := New()
	c := app.AcquireCtx(&fasthttp.RequestCtx{})

	var mutex sync.Mutex
	var wg sync.WaitGroup
	startChan := make(chan bool)

	interruptStreamWriter := func() {
		wg.Add(1)
		defer wg.Done()
		<-startChan
		time.Sleep(5 * time.Millisecond)
		mutex.Lock()
		c.Response().CloseBodyStream() //nolint:errcheck // It is fine to ignore the error
		mutex.Unlock()
	}

	wg.Add(1)
	err := c.SendStreamWriter(func(w *bufio.Writer) {
		go interruptStreamWriter()
		defer wg.Done()

		startChan <- true
		for lineNum := 1; lineNum <= 5; lineNum++ {
			mutex.Lock()
			fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
			mutex.Unlock()

			if err := w.Flush(); err != nil {
				if lineNum < 3 {
					t.Errorf("unexpected error: %s", err)
				}
				return
			}

			time.Sleep(1500 * time.Microsecond)
		}
	})
	require.NoError(t, err)

	// Wait for StreamWriter and the goroutine to finish
	wg.Wait()
	mutex.Lock()
	require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(c.Response().Body()))
	mutex.Unlock()
}

I will file an issue on valyala/fasthttp to ask how to mock client disconnections for this test. If it's not possible, we could remove this test case, as most of the other tests do not test for client disconnection issues.

@grivera64
Copy link
Contributor Author

grivera64 commented Sep 17, 2024

Hey all, after reading a bit more on Fiber and Fasthttp documentation, I believe that the race condition is due to fasthttp.(*Response) not being concurrent-safe.

I tried using a modified version of app's Test() method, that closes an underlying net.Conn connection (I modified testConn too to make Close() block further writes but allow reads). This seemed to do the same as c.Response().CloseBodyStream(), but still gave the race condition issue.

This is the new race condition warning:

WARNING: DATA RACE
Read at 0x00c00016c130 by goroutine 9:
  github.com/gofiber/fiber/v3.(*testConn).Write()
      /fiber/helpers.go:628 +0x6b
  bufio.(*Writer).Flush()
      /usr/local/go/src/bufio/bufio.go:639 +0xee
  github.com/valyala/fasthttp.writeChunk()
      /go/pkg/mod/github.com/valyala/[email protected]/http.go:2250 +0x10b
  github.com/valyala/fasthttp.writeBodyChunked()
      /go/pkg/mod/github.com/valyala/[email protected]/http.go:2170 +0xce
  github.com/valyala/fasthttp.(*Response).writeBodyStream()
      /go/pkg/mod/github.com/valyala/[email protected]/http.go:2066 +0x338
  github.com/valyala/fasthttp.(*Response).Write()
      /go/pkg/mod/github.com/valyala/[email protected]/http.go:1967 +0x2c4
  github.com/valyala/fasthttp.writeResponse()
      /go/pkg/mod/github.com/valyala/[email protected]/server.go:2589 +0xb8
  github.com/valyala/fasthttp.(*Server).serveConn()
      /go/pkg/mod/github.com/valyala/[email protected]/server.go:2432 +0x1ead
  github.com/valyala/fasthttp.(*Server).ServeConn()
      /go/pkg/mod/github.com/valyala/[email protected]/server.go:2042 +0x154
  github.com/gofiber/fiber/v3.(*App).TestWithInterrupt.func1()
      /fiber/app.go:975 +0xde
...

This was the old warning when directly using c.SendStreamWriter() without a server:

github.com/valyala/fasthttp.(*Response).SetBodyStream()
      /go/pkg/mod/github.com/valyala/[email protected]/http.go:249 +0x4f
  github.com/valyala/fasthttp.(*Response).SetBodyStreamWriter()
      /go/pkg/mod/github.com/valyala/[email protected]/http.go:292 +0x64
...

I believe fixing this race warning in the cleanest way possible would require an upstream PR to fasthttp (most likely adding a mutex for fasthttp.(*Response) related methods).

Based on Fiber's current codebase, there doesn't seem to be other interrupt tests (while other tests ignore output when the response times out). If this isn't something we should be testing for, we could just remove the interrupt test and keep the remaining tests.

What are your thoughts on this? Please let me know if you want to see the modified app.Test() code.

Edit: I will still try to work with the modified app.Test() code to see if I can get the race error to go away, since it still does look promising despite the above.

@grivera64
Copy link
Contributor Author

After editing app.Test() a bit, I was able to get a working test without the race warning. I had to create an app method called TestWithInterrupt():

// TestWithInterrupt is used for internal debugging by passing a *http.Request with an interruptAfter duration.
func (app *App) TestWithInterrupt(req *http.Request, interruptAfter time.Duration) (*http.Response, error)

With this change, I was able to use that in the test case as written below as a fix:

func Test_Ctx_SendStreamWriter_Interrupted_New(t *testing.T) {
	t.Parallel()
	app := New(Config{StreamRequestBody: true})
	app.Get("/", func(c Ctx) error {
		return c.SendStreamWriter(func(w *bufio.Writer) {
			for lineNum := 1; lineNum <= 5; lineNum++ {
				time.Sleep(time.Duration(lineNum) * time.Millisecond)

				fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
				if err := w.Flush(); err != nil {
					if lineNum <= 3 {
						t.Errorf("unexpected error: %s", err)
					}
					return
				}
			}
		})
	})
	resp, err := app.TestWithInterrupt(httptest.NewRequest(MethodGet, "/", nil), 8*time.Millisecond)
	require.NoError(t, err, "app.TestWithInterrupt(req)")
	body, err := io.ReadAll(resp.Body)
	require.NotNil(t, err)
	require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(body))
}

Would you all like me to write an issue/PR for adding app.TestWithInterrupt() then use the function here after the merge?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

📝 [Proposal]: Add buffered streaming support
3 participants