Skip to content

Commit

Permalink
Merge pull request #23 from n3wscott/history
Browse files Browse the repository at this point in the history
Adding history to the listen command to fetch back events
  • Loading branch information
n3wscott committed Mar 3, 2022
2 parents aad1e08 + fbdf101 commit c59d2d6
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pkg/commands/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func addListener(topLevel *cobra.Command) {
po := &options.PortOptions{}
pa := &options.PathOptions{}
to := &options.TeeOptions{}
ho := &options.HistoryOptions{}
vo := &options.VerboseOptions{}
listen := &cobra.Command{
Use: "listen",
Expand All @@ -43,6 +44,7 @@ func addListener(topLevel *cobra.Command) {
Port: po.Port,
Path: pa.Path,
Tee: to.URL,
History: ho.Length,
Verbose: vo.Verbose,
}

Expand All @@ -55,6 +57,7 @@ func addListener(topLevel *cobra.Command) {
po.AddFlags(listen)
pa.AddFlags(listen)
vo.AddFlags(listen)
ho.AddFlags(listen)
to.AddFlags(listen)

topLevel.AddCommand(listen)
Expand Down
18 changes: 18 additions & 0 deletions pkg/commands/options/history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
Copyright 2022 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package options

import "github.com/spf13/cobra"

// HistoryOptions
type HistoryOptions struct {
Length int
}

func (o *HistoryOptions) AddFlags(cmd *cobra.Command) {
cmd.Flags().IntVar(&o.Length, "history", 50,
"How many past events to store.")
}
71 changes: 69 additions & 2 deletions pkg/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,66 @@ package listener

import (
"fmt"
"github.com/cloudevents/conformance/pkg/event"
cfhttp "github.com/cloudevents/conformance/pkg/http"
"net/http"
"net/url"
"os"
"sync"

"github.com/cloudevents/conformance/pkg/event"
cfhttp "github.com/cloudevents/conformance/pkg/http"
)

type Listener struct {
Port int
Path string
Verbose bool
Tee *url.URL

History int
ring *ringBuffer
}

type ringBuffer struct {
count int
out chan event.Event
mux sync.Mutex
}

func (r *ringBuffer) Add(ce event.Event) {
r.mux.Lock()
defer r.mux.Unlock()

select {
// If we can, write to out.
case r.out <- ce:
r.count++
// Done.
default:
// If we got blocked, read one from out and write the new event.
<-r.out
r.out <- ce
}
}

func (r *ringBuffer) All() []event.Event {
all := make([]event.Event, 0, r.count)
for r.count > 0 {
r.mux.Lock()

ce := <-r.out
all = append(all, ce)
r.count--

r.mux.Unlock()
}
return all
}

func (l *Listener) Do() error {
addr := fmt.Sprintf(":%d", l.Port) // TODO: do this listen thing for port 0

l.ring = &ringBuffer{out: make(chan event.Event, l.History)}

_, _ = fmt.Fprintf(os.Stderr, "listening on %s\n", addr)
if err := http.ListenAndServe(addr, l); err != nil {
return err
Expand All @@ -36,13 +79,22 @@ func (l *Listener) ServeHTTP(w http.ResponseWriter, req *http.Request) {
_, _ = fmt.Fprintf(os.Stderr, "incoming request from %s\n", req.URL.String())
}

if req.Method == http.MethodGet && req.URL.String() == "/history" {
l.ServeHistory(w, req)
return
}

ce, err := cfhttp.RequestToEvent(req)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "error converting reqest to event: %s\n", err.Error())
w.WriteHeader(http.StatusBadRequest)
return
}

if l.History > 0 && ce != nil {
l.ring.Add(*ce)
}

yaml, err := event.ToYaml(*ce)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "error converting event to yaml: %s\n", err.Error())
Expand All @@ -67,3 +119,18 @@ func (l *Listener) ServeHTTP(w http.ResponseWriter, req *http.Request) {

w.WriteHeader(http.StatusOK)
}

func (l *Listener) ServeHistory(w http.ResponseWriter, _ *http.Request) {
for i, ce := range l.ring.All() {
yaml, err := event.ToYaml(ce)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "error converting event to yaml: %s\n", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
}
if i > 0 {
_, _ = w.Write([]byte("---\n"))
}
_, _ = w.Write(yaml)
}
}

0 comments on commit c59d2d6

Please sign in to comment.