Skip to content

Commit 6d4b15b

Browse files
committed
Updates for pgqueue
1 parent 15db9c7 commit 6d4b15b

File tree

8 files changed

+235
-67
lines changed

8 files changed

+235
-67
lines changed

pkg/pgqueue/opt.go

+48-3
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
package pgqueue
22

33
import (
4+
"fmt"
5+
"os"
46
"strings"
57

68
// Packages
7-
"github.com/mutablelogic/go-server/pkg/httpresponse"
9+
"github.com/djthorpe/go-pg"
10+
httpresponse "github.com/mutablelogic/go-server/pkg/httpresponse"
11+
schema "github.com/mutablelogic/go-server/pkg/pgqueue/schema"
12+
types "github.com/mutablelogic/go-server/pkg/types"
813
)
914

1015
////////////////////////////////////////////////////////////////////////////////
1116
// TYPES
1217

1318
type opt struct {
14-
worker string
19+
pg.OffsetLimit
20+
worker string
21+
namespace string
1522
}
1623

1724
// Opt represents a function that modifies the options
@@ -23,6 +30,14 @@ type Opt func(*opt) error
2330
func applyOpts(opts ...Opt) (*opt, error) {
2431
var o opt
2532

33+
// Set the defaults
34+
o.namespace = schema.DefaultNamespace
35+
if hostname, err := os.Hostname(); err != nil {
36+
return nil, httpresponse.ErrInternalError.With(err)
37+
} else {
38+
o.worker = fmt.Sprint(hostname, ".", os.Getpid())
39+
}
40+
2641
// Apply the options
2742
for _, fn := range opts {
2843
if err := fn(&o); err != nil {
@@ -37,12 +52,42 @@ func applyOpts(opts ...Opt) (*opt, error) {
3752
////////////////////////////////////////////////////////////////////////////////
3853
// PUBLIC METHODS
3954

55+
// Set offset for the queue list
56+
func OptOffset(offset uint64) Opt {
57+
return func(o *opt) error {
58+
o.Offset = offset
59+
return nil
60+
}
61+
}
62+
63+
// Set limit for the queue list
64+
func OptLimit(limit uint64) Opt {
65+
return func(o *opt) error {
66+
o.Limit = types.Uint64Ptr(limit)
67+
return nil
68+
}
69+
}
70+
71+
// Set the worker name when a task is locked for work
4072
func OptWorker(v string) Opt {
4173
return func(o *opt) error {
4274
if v = strings.TrimSpace(v); v == "" {
4375
return httpresponse.ErrBadRequest.With("empty worker name")
76+
} else {
77+
o.worker = v
78+
}
79+
return nil
80+
}
81+
}
82+
83+
// Set the namespace for the tickers and queues
84+
func OptNamespace(v string) Opt {
85+
return func(o *opt) error {
86+
if v = strings.TrimSpace(v); !types.IsIdentifier(v) {
87+
return httpresponse.ErrBadRequest.With("invalid namespacename ")
88+
} else {
89+
o.namespace = v
4490
}
45-
o.worker = v
4691
return nil
4792
}
4893
}

pkg/pgqueue/pgqueue.go

+50-12
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"os"
8-
"strings"
97

108
// Packages
119
pg "github.com/djthorpe/go-pg"
@@ -31,26 +29,22 @@ func New(ctx context.Context, conn pg.PoolConn, opt ...Opt) (*Client, error) {
3129
opts, err := applyOpts(opt...)
3230
if err != nil {
3331
return nil, err
32+
} else {
33+
self.worker = opts.worker
3434
}
3535

3636
// Create a listener
3737
if listener := pg.NewListener(conn); listener == nil {
3838
return nil, httpresponse.ErrInternalError.Withf("Cannot create listener")
3939
} else {
4040
self.listener = listener
41-
self.conn = conn.With("schema", schema.SchemaName).(pg.PoolConn)
41+
self.conn = conn.With(
42+
"schema", schema.SchemaName,
43+
"ns", opts.namespace,
44+
).(pg.PoolConn)
4245
self.topics = []string{schema.TopicQueueInsert}
4346
}
4447

45-
// Set worker name
46-
if worker := strings.TrimSpace(opts.worker); worker != "" {
47-
self.worker = worker
48-
} else if hostname, err := os.Hostname(); err != nil {
49-
return nil, httpresponse.ErrInternalError.With(err)
50-
} else {
51-
self.worker = fmt.Sprint(hostname, ".", os.Getpid())
52-
}
53-
5448
// If the schema does not exist, then bootstrap it
5549
if err := self.conn.Tx(ctx, func(conn pg.Conn) error {
5650
if exists, err := pg.SchemaExists(ctx, conn, schema.SchemaName); err != nil {
@@ -116,3 +110,47 @@ func (client *Client) UpdateQueue(ctx context.Context, name string, meta schema.
116110
}
117111
return &queue, nil
118112
}
113+
114+
// ListQueues returns all queues as a list
115+
func (client *Client) ListQueues(ctx context.Context, opt ...Opt) (*schema.QueueList, error) {
116+
var list schema.QueueList
117+
118+
// Apply options
119+
opts, err := applyOpts(opt...)
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
// Perform list
125+
list.Body = make([]schema.Queue, 0, 10)
126+
if err := client.conn.List(ctx, &list, schema.QueueListRequest{
127+
OffsetLimit: opts.OffsetLimit,
128+
}); err != nil {
129+
return nil, err
130+
}
131+
return &list, nil
132+
}
133+
134+
// CreateTicker creates a new ticker, and returns it.
135+
func (client *Client) CreateTicker(ctx context.Context, meta schema.TickerMeta) (*schema.Ticker, error) {
136+
var ticker schema.Ticker
137+
if err := client.conn.Tx(ctx, func(conn pg.Conn) error {
138+
return client.conn.Insert(ctx, &ticker, meta)
139+
}); err != nil {
140+
return nil, err
141+
}
142+
fmt.Println("Created ticker:", ticker)
143+
return &ticker, nil
144+
}
145+
146+
// GetTicker returns a ticker with the given name.
147+
func (client *Client) GetTicker(ctx context.Context, name string) (*schema.Ticker, error) {
148+
var ticker schema.Ticker
149+
if err := client.conn.Get(ctx, &ticker, schema.TickerName(name)); err != nil {
150+
if errors.Is(err, pg.ErrNotFound) {
151+
return nil, httpresponse.ErrNotFound.Withf("Ticker %q not found", name)
152+
}
153+
return nil, err
154+
}
155+
return &ticker, nil
156+
}

pkg/pgqueue/pgqueue_test.go

+83-3
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@ func Test_Queue_001(t *testing.T) {
2727
conn := conn.Begin(t)
2828
defer conn.Close()
2929

30-
// Ping the database
31-
assert.NoError(conn.Ping(context.Background()))
32-
3330
// Create pgqueue
3431
client, err := pgqueue.New(context.TODO(), conn.PoolConn, pgqueue.OptWorker(t.Name()))
3532
assert.NoError(err)
@@ -162,4 +159,87 @@ func Test_Queue_001(t *testing.T) {
162159
assert.Error(err)
163160
assert.ErrorIs(err, httpresponse.ErrNotFound)
164161
})
162+
163+
// List queue
164+
t.Run("ListQueue_1", func(t *testing.T) {
165+
_, err := client.CreateQueue(context.TODO(), schema.Queue{
166+
Queue: "queue_name_9",
167+
})
168+
assert.NoError(err)
169+
170+
list, err := client.ListQueues(context.TODO())
171+
assert.NoError(err)
172+
assert.NotNil(list)
173+
assert.NotNil(list.Body)
174+
assert.GreaterOrEqual(len(list.Body), 1)
175+
assert.Equal(len(list.Body), int(list.Count))
176+
})
177+
178+
// List queue
179+
t.Run("ListQueue_2", func(t *testing.T) {
180+
_, err := client.CreateQueue(context.TODO(), schema.Queue{
181+
Queue: "queue_name_10",
182+
})
183+
assert.NoError(err)
184+
185+
list, err := client.ListQueues(context.TODO(), pgqueue.OptLimit(0))
186+
assert.NoError(err)
187+
assert.NotNil(list)
188+
assert.NotNil(list.Body)
189+
assert.GreaterOrEqual(int(list.Count), 1)
190+
assert.Equal(len(list.Body), 0)
191+
})
192+
}
193+
194+
func Test_Ticker_001(t *testing.T) {
195+
assert := assert.New(t)
196+
conn := conn.Begin(t)
197+
defer conn.Close()
198+
199+
// Create pgqueue
200+
client, err := pgqueue.New(context.TODO(), conn.PoolConn, pgqueue.OptWorker(t.Name()))
201+
assert.NoError(err)
202+
assert.NotNil(client)
203+
204+
// Create ticker which is disabled (NULL interval)
205+
t.Run("CreateTicker_1", func(t *testing.T) {
206+
ticker, err := client.CreateTicker(context.TODO(), schema.TickerMeta{
207+
Ticker: "ticker_1",
208+
})
209+
assert.NoError(err)
210+
assert.NotNil(ticker)
211+
assert.NotNil(ticker.Ticker)
212+
assert.Equal("ticker_1", ticker.Ticker)
213+
assert.Nil(ticker.Interval)
214+
})
215+
216+
// Create ticker with 1 minute interval
217+
t.Run("CreateTicker_2", func(t *testing.T) {
218+
ticker, err := client.CreateTicker(context.TODO(), schema.TickerMeta{
219+
Ticker: "ticker_2",
220+
Interval: types.DurationPtr(1 * time.Minute),
221+
})
222+
assert.NoError(err)
223+
assert.NotNil(ticker)
224+
assert.NotNil(ticker.Ticker)
225+
assert.Equal("ticker_2", ticker.Ticker)
226+
assert.Equal(1*time.Minute, types.PtrDuration(ticker.Interval))
227+
})
228+
229+
// Get ticker
230+
t.Run("GetTicker_1", func(t *testing.T) {
231+
ticker, err := client.CreateTicker(context.TODO(), schema.TickerMeta{
232+
Ticker: "ticker_3",
233+
Interval: types.DurationPtr(1 * time.Minute),
234+
})
235+
assert.NoError(err)
236+
assert.NotNil(ticker)
237+
238+
ticker2, err := client.GetTicker(context.TODO(), ticker.TickerName(ticker.Ticker))
239+
assert.NoError(err)
240+
assert.NotNil(ticker2)
241+
assert.NotNil(ticker2.Ticker)
242+
assert.Equal(ticker.Ticker, ticker2.Ticker)
243+
assert.Equal(ticker.Interval, ticker2.Interval)
244+
})
165245
}

pkg/pgqueue/schema/queue.go

+19-9
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ func (q QueueCleanRequest) Select(bind *pg.Bind, op pg.Op) (string, error) {
160160
}
161161

162162
func (l QueueListRequest) Select(bind *pg.Bind, op pg.Op) (string, error) {
163-
// Bind parameters
164163
bind.Set("where", "")
165164
l.OffsetLimit.Bind(bind, QueueListLimit)
166165

@@ -277,21 +276,24 @@ func bootstrapQueue(ctx context.Context, conn pg.Conn) error {
277276
const (
278277
queueCreateTable = `
279278
CREATE TABLE IF NOT EXISTS ${"schema"}.queue (
280-
-- queue name
281-
"queue" TEXT PRIMARY KEY,
279+
-- namespace and queue name
280+
"ns" TEXT NOT NULL,
281+
"queue" TEXT NOT NULL,
282282
-- time-to-live for queue messages
283283
"ttl" INTERVAL DEFAULT INTERVAL '1 hour',
284284
-- number of retries before failing
285285
"retries" INTEGER NOT NULL DEFAULT 3 CHECK ("retries" >= 0),
286286
-- delay between retries in seconds
287-
"retry_delay" INTERVAL NOT NULL DEFAULT INTERVAL '2 minute'
287+
"retry_delay" INTERVAL NOT NULL DEFAULT INTERVAL '2 minute',
288+
-- primary key
289+
PRIMARY KEY ("ns", "queue")
288290
)
289291
`
290292
queueInsert = `
291293
INSERT INTO ${"schema"}.queue (
292-
queue, ttl, retries, retry_delay
294+
ns, queue, ttl, retries, retry_delay
293295
) VALUES (
294-
@queue, DEFAULT, DEFAULT, DEFAULT
296+
@ns, @queue, DEFAULT, DEFAULT, DEFAULT
295297
) RETURNING
296298
queue, ttl, retries, retry_delay
297299
`
@@ -302,29 +304,37 @@ const (
302304
${"schema"}.queue
303305
WHERE
304306
queue = @id
307+
AND
308+
ns = @ns
305309
`
306310
queuePatch = `
307311
UPDATE ${"schema"}.queue SET
308312
${patch}
309313
WHERE
310314
queue = @id
315+
AND
316+
ns = @ns
311317
RETURNING
312318
queue, ttl, retries, retry_delay
313319
`
314320
queueDelete = `
315321
DELETE FROM ${"schema"}.queue WHERE
316322
queue = @id
323+
AND
324+
ns = @ns
317325
RETURNING
318326
queue, ttl, retries, retry_delay
319327
`
320328
queueList = `
321329
SELECT
322330
queue, ttl, retries, retry_delay
323331
FROM
324-
${"schema"}.queue ${where}
332+
${"schema"}.queue
333+
WHERE
334+
ns = @ns ${where}
325335
`
326336
queueClean = `
327-
SELECT * FROM ${"schema"}.queue_clean(@id)
337+
SELECT * FROM ${"schema"}.queue_clean(@ns, @id)
328338
`
329339
queueStatsView = `
330340
CREATE OR REPLACE VIEW ${"schema"}."queue_status" AS
@@ -342,6 +352,6 @@ const (
342352
1, 2
343353
`
344354
queueStats = `
345-
SELECT "queue", "status", "count" FROM ${"schema"}.queue_status ${where}
355+
SELECT "queue", "status", "count" FROM ${"schema"}.queue_status WHERE "ns" = @ns ${where}
346356
`
347357
)

pkg/pgqueue/schema/schema.go

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
const (
1515
SchemaName = "pgqueue"
16+
DefaultNamespace = "default"
1617
DefaultPrefix = "/queue/v1"
1718
TopicQueueInsert = "queue_insert"
1819
QueueListLimit = 100

0 commit comments

Comments
 (0)