Skip to content

Commit c8433f2

Browse files
committed
Updated
1 parent 3f0e342 commit c8433f2

File tree

6 files changed

+23
-7
lines changed

6 files changed

+23
-7
lines changed

Diff for: pkg/pgqueue/opt.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
type opt struct {
1414
worker string
15+
schema string
1516
}
1617

1718
// Opt represents a function that modifies the options
@@ -39,10 +40,20 @@ func applyOpts(opts ...Opt) (*opt, error) {
3940

4041
func OptWorker(worker string) Opt {
4142
return func(o *opt) error {
42-
if worker = strings.TrimSpace(worker); worker == "" {
43+
if v = strings.TrimSpace(v); v == "" {
4344
return httpresponse.ErrBadRequest.With("empty worker name")
4445
}
45-
o.worker = worker
46+
o.worker = v
47+
return nil
48+
}
49+
}
50+
51+
func OptSchema(v string) Opt {
52+
return func(o *opt) error {
53+
if v = strings.TrimSpace(v); v == "" {
54+
return httpresponse.ErrBadRequest.With("empty schema name")
55+
}
56+
o.schema = v
4657
return nil
4758
}
4859
}

Diff for: pkg/pgqueue/pgqueue.go

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func New(ctx context.Context, conn pg.PoolConn, opt ...Opt) (*Client, error) {
3737
return nil, httpresponse.ErrInternalError.Withf("Cannot create listener")
3838
} else {
3939
self.listener = listener
40+
self.conn = conn.With("schema", schema.SchemaName).(pg.PoolConn)
4041
self.topics = []string{schema.TopicQueueInsert}
4142
}
4243

Diff for: plugin.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,5 @@ type Logger interface {
8282

8383
type PG interface {
8484
Task
85-
pg.PoolConn
85+
Conn() pg.PoolConn
8686
}

Diff for: plugin/pg/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (c Config) New(ctx context.Context) (server.Task, error) {
4747
} else if err := pool.Ping(ctx); err != nil {
4848
return nil, err
4949
} else {
50-
return taskWithConn(pool), nil
50+
return taskWith(pool), nil
5151
}
5252
}
5353

Diff for: plugin/pg/task.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,25 @@ import (
1212
// TYPES
1313

1414
type pgpool struct {
15-
pg.PoolConn
15+
conn pg.PoolConn
1616
}
1717

1818
var _ server.Task = (*pgpool)(nil)
1919

2020
////////////////////////////////////////////////////////////////////////////////
2121
// LIFECYCLE
2222

23-
func taskWithConn(conn pg.PoolConn) *pgpool {
23+
func taskWith(conn pg.PoolConn) *pgpool {
2424
return &pgpool{conn}
2525
}
2626

2727
////////////////////////////////////////////////////////////////////////////////
2828
// PUBLIC METHODS
2929

30+
func (pg *pgpool) Conn() pg.PoolConn {
31+
return pg.conn
32+
}
33+
3034
func (*pgpool) Run(ctx context.Context) error {
3135
<-ctx.Done()
3236
return nil

Diff for: plugin/pgqueue/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (c Config) New(ctx context.Context) (server.Task, error) {
3434
}
3535

3636
// Create a new client
37-
client, err := pgqueue.New(ctx, c.Pool, opts...)
37+
client, err := pgqueue.New(ctx, c.Pool.Conn(), opts...)
3838
if err != nil {
3939
return nil, err
4040
}

0 commit comments

Comments
 (0)