Skip to content

Commit ba1e4e0

Browse files
committed
Fixing executions.
1 parent 5bb54e7 commit ba1e4e0

10 files changed

Lines changed: 114 additions & 79 deletions

File tree

cmd/server/main.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http"
88
"os"
99
"os/signal"
10+
"syscall"
1011
"time"
1112

1213
"github.com/bruli/raspberryWaterSystem/internal/app"
@@ -41,7 +42,9 @@ func main() {
4142
}
4243

4344
func run() error {
44-
ctx := context.Background()
45+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
46+
defer stop()
47+
4548
log := buildLog()
4649
conf, err := config.New()
4750
if err != nil {
@@ -66,7 +69,6 @@ func run() error {
6669
tracer := otel.Tracer(serviceName)
6770

6871
eventsCh := make(chan tracing.Event, 5)
69-
defer close(eventsCh)
7072

7173
logCHMdw := cqs.NewCommandHndErrorMiddleware(log, tracer)
7274
eventsCHMdw := app.NewEventMiddleware(eventsCh, tracer)
@@ -164,11 +166,7 @@ func run() error {
164166
return err
165167
}
166168

167-
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
168-
defer stop()
169-
170-
errCh := make(chan error)
171-
defer close(errCh)
169+
errCh := make(chan error, 2)
172170

173171
go updateStatusWorker(ctx, qhBus, chBus, log)
174172
go eventsWorker(ctx, eventsCh, eventBus, log, tracer)
@@ -186,10 +184,14 @@ func run() error {
186184
errCh <- err
187185
}
188186
}()
189-
if err := <-errCh; err != nil {
187+
188+
select {
189+
case err := <-errCh:
190190
return err
191+
case <-ctx.Done():
192+
log.Info("shutdown signal received")
193+
return nil
191194
}
192-
return nil
193195
}
194196

195197
func terraceWeatherCron(ctx context.Context, cronJob *cron.Cron, repo *disk.EventsRepository, ch cqs.QueryHandler, log *slog.Logger, tracer trace.Tracer) {
@@ -349,6 +351,7 @@ func buildLog() *slog.Logger {
349351
func executionInTimeWorker(ctx context.Context, qh cqs.QueryHandler, ch cqs.CommandHandler, logger *slog.Logger, tracer trace.Tracer) {
350352
logger.InfoContext(ctx, "[WORKER] Execution in time: started")
351353
ticker := time.NewTicker(time.Minute)
354+
defer ticker.Stop()
352355
for {
353356
select {
354357
case <-ctx.Done():
@@ -374,7 +377,11 @@ func eventsWorker(ctx context.Context, ch <-chan tracing.Event, evBus cqs.EventB
374377
case <-ctx.Done():
375378
logger.InfoContext(ctx, "[WORKER] Events: context done")
376379
return
377-
case event := <-ch:
380+
case event, ok := <-ch:
381+
if !ok {
382+
logger.InfoContext(ctx, "[WORKER] Events: channel closed")
383+
return
384+
}
378385
parentCtx := trace.ContextWithSpanContext(ctx, event.SpanContext)
379386
ctx, span := tracer.Start(parentCtx, "eventDispatch")
380387
span.SetAttributes(attribute.String("event-name", event.Event.EventName()))
@@ -391,6 +398,7 @@ func eventsWorker(ctx context.Context, ch <-chan tracing.Event, evBus cqs.EventB
391398
func updateStatusWorker(ctx context.Context, qh cqs.QueryHandler, ch cqs.CommandHandler, logger *slog.Logger) {
392399
logger.InfoContext(ctx, "[WORKER] Update status: started")
393400
ticker := time.NewTicker(5 * time.Minute)
401+
defer ticker.Stop()
394402
for {
395403
select {
396404
case <-ctx.Done():

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,10 @@ require (
7777
mvdan.cc/gofumpt v0.8.0 // indirect
7878
)
7979

80-
require github.com/stretchr/testify v1.11.1
80+
require (
81+
github.com/stretchr/testify v1.11.1
82+
github.com/warthog618/go-gpiocdev v0.9.1
83+
)
8184

8285
tool (
8386
github.com/matryer/moq

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ github.com/stianeikeland/go-rpio/v4 v4.6.0 h1:eAJgtw3jTtvn/CqwbC82ntcS+dtzUTgo5q
102102
github.com/stianeikeland/go-rpio/v4 v4.6.0/go.mod h1:A3GvHxC1Om5zaId+HqB3HKqx4K/AqeckxB7qRjxMK7o=
103103
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
104104
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
105+
github.com/warthog618/go-gpiocdev v0.9.1 h1:pwHPaqjJfhCipIQl78V+O3l9OKHivdRDdmgXYbmhuCI=
106+
github.com/warthog618/go-gpiocdev v0.9.1/go.mod h1:dN3e3t/S2aSNC+hgigGE/dBW8jE1ONk9bDSEYfoPyl8=
105107
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
106108
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
107109
go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I=

internal/domain/zone/relay.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package zone
33
import "errors"
44

55
const (
6-
MasterRelayID = iota + 1
7-
FertilizerPumpRelayID
8-
BigRelayID
9-
SmallRelayID
10-
AirRelayID
6+
OneRelayID = iota + 1
7+
TwoRelayID
8+
ThreeRelayID
9+
FourRelayID
1110
FertilizerPumpID
11+
AirRelayID
1212
CleanValvuleID
1313
FertilizerValvuleID
1414
)
@@ -22,14 +22,14 @@ func (i RelayID) Int() int {
2222
var ErrUnknownRelay = errors.New("unknown relay")
2323

2424
var enabledRelays = map[RelayID]string{
25-
MasterRelayID: "master",
26-
FertilizerPumpRelayID: "fertilizerPump",
27-
BigRelayID: "big",
28-
SmallRelayID: "small",
29-
AirRelayID: "air",
30-
CleanValvuleID: "clean",
31-
FertilizerPumpID: "22",
32-
FertilizerValvuleID: "fertilizerValvule",
25+
OneRelayID: "18",
26+
TwoRelayID: "17",
27+
ThreeRelayID: "23",
28+
FourRelayID: "22",
29+
AirRelayID: "15",
30+
CleanValvuleID: "14",
31+
FertilizerPumpID: "24",
32+
FertilizerValvuleID: "27",
3333
}
3434

3535
type Relay struct {

internal/domain/zone/relay_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func TestParseRelay(t *testing.T) {
2020
{
2121
name: "with an unknown id, then it return a unknown valid relay id",
2222
id: 1,
23-
expectedRelay: zone.MasterRelayID,
23+
expectedRelay: zone.OneRelayID,
2424
},
2525
}
2626
for _, tt := range tests {

internal/domain/zone/zone_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
func TestNew(t *testing.T) {
12-
rel1, err := zone.ParseRelay(zone.MasterRelayID)
12+
rel1, err := zone.ParseRelay(zone.OneRelayID)
1313
require.NoError(t, err)
1414
relays := []zone.Relay{
1515
rel1,

internal/infra/gpio/pins_executor.go

Lines changed: 64 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,63 +5,90 @@ import (
55
"fmt"
66
"time"
77

8+
"github.com/warthog618/go-gpiocdev"
89
"go.opentelemetry.io/otel/codes"
910
"go.opentelemetry.io/otel/trace"
10-
"periph.io/x/conn/v3/gpio"
1111
)
1212

13+
const gpioChip = "gpiochip0"
14+
1315
type PinsExecutor struct {
14-
relays map[string]gpio.PinIO
16+
relays map[string]int
1517
tracer trace.Tracer
1618
}
1719

18-
func NewPinsExecutor(tracer trace.Tracer) *PinsExecutor {
19-
return &PinsExecutor{relays: relays, tracer: tracer}
20-
}
21-
2220
func (p *PinsExecutor) Execute(ctx context.Context, seconds uint, pins []string) error {
21+
if err := ctx.Err(); err != nil {
22+
return err
23+
}
24+
25+
ctx, span := p.tracer.Start(ctx, "PinsExecutor.Execute")
26+
defer span.End()
27+
28+
activatedPins := make([]*gpiocdev.Line, 0, len(pins))
29+
30+
defer func() {
31+
for _, line := range activatedPins {
32+
_ = line.SetValue(1)
33+
_ = line.Close()
34+
}
35+
}()
36+
37+
for _, pinNumber := range pins {
38+
line, err := p.activatePin(pinNumber)
39+
if err != nil {
40+
span.RecordError(err)
41+
span.SetStatus(codes.Error, err.Error())
42+
return err
43+
}
44+
45+
activatedPins = append(activatedPins, line)
46+
}
47+
48+
timer := time.NewTimer(time.Duration(seconds) * time.Second)
49+
defer timer.Stop()
50+
2351
select {
2452
case <-ctx.Done():
53+
span.RecordError(ctx.Err())
54+
span.SetStatus(codes.Error, ctx.Err().Error())
2555
return ctx.Err()
26-
default:
27-
_, span := p.tracer.Start(ctx, "PinsExecutor.Execute")
28-
defer span.End()
29-
activatedPins := make([]gpio.PinIO, len(pins))
30-
for i, piNumber := range pins {
31-
activatePin, err := p.activatePin(piNumber)
32-
if err != nil {
33-
span.RecordError(err)
34-
span.SetStatus(codes.Error, err.Error())
35-
return err
36-
}
37-
activatedPins[i] = activatePin
38-
}
39-
time.Sleep(time.Duration(seconds) * time.Second)
40-
for _, act := range activatedPins {
41-
if err := p.deActivatePin(act); err != nil {
42-
span.RecordError(err)
43-
span.SetStatus(codes.Error, err.Error())
44-
return err
45-
}
46-
}
47-
span.SetStatus(codes.Ok, "pins executed")
48-
return nil
56+
57+
case <-timer.C:
4958
}
59+
60+
span.SetStatus(codes.Ok, "pins executed")
61+
return nil
5062
}
5163

52-
func (p *PinsExecutor) activatePin(piNumber string) (gpio.PinIO, error) {
53-
pi, ok := p.relays[piNumber]
64+
func (p *PinsExecutor) activatePin(pinNumber string) (*gpiocdev.Line, error) {
65+
lineNumber, ok := p.relays[pinNumber]
5466
if !ok {
55-
return nil, InvalidPinToExecuteError{pinNumber: piNumber}
67+
return nil, InvalidPinToExecuteError{pinNumber: pinNumber}
5668
}
57-
if err := pi.Out(gpio.Low); err != nil {
58-
return nil, err
69+
70+
line, err := gpiocdev.RequestLine(
71+
gpioChip,
72+
lineNumber,
73+
gpiocdev.AsOutput(1),
74+
)
75+
if err != nil {
76+
return nil, fmt.Errorf("failed to request GPIO %s: %w", pinNumber, err)
77+
}
78+
79+
if err := line.SetValue(0); err != nil {
80+
_ = line.Close()
81+
return nil, fmt.Errorf("failed to activate GPIO %s: %w", pinNumber, err)
5982
}
60-
return pi, nil
83+
84+
return line, nil
6185
}
6286

63-
func (p *PinsExecutor) deActivatePin(pi gpio.PinIO) error {
64-
return pi.Out(gpio.High)
87+
func NewPinsExecutor(tracer trace.Tracer) *PinsExecutor {
88+
return &PinsExecutor{
89+
relays: relays,
90+
tracer: tracer,
91+
}
6592
}
6693

6794
type InvalidPinToExecuteError struct {

internal/infra/gpio/relays.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
package gpio
22

3-
import (
4-
"periph.io/x/conn/v3/gpio"
5-
"periph.io/x/host/v3/rpi"
6-
)
7-
8-
var relays = map[string]gpio.PinIO{
9-
"master": rpi.P1_12,
10-
"fertilizerPump": rpi.P1_11,
11-
"big": rpi.P1_16,
12-
"small": rpi.P1_18,
13-
"air": rpi.P1_10,
14-
"clean": rpi.P1_8,
15-
"22": rpi.P1_15,
16-
"fertilizerValvule": rpi.P1_13,
3+
var relays = map[string]int{
4+
"18": 18,
5+
"17": 17,
6+
"23": 23,
7+
"24": 24,
8+
"15": 15,
9+
"14": 14,
10+
"22": 22,
11+
"27": 27,
1712
}

internal/infra/listener/execute_pins_on_exexecute_fertilizer_zone.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (e ExecutePinsOnExecuteFertilizerZone) fertilizerValvule(ctx context.Contex
7979
}
8080

8181
func (e ExecutePinsOnExecuteFertilizerZone) fertilizerPump(ctx context.Context, event *zone.FertilizerZoneExecuted) error {
82-
seconds := event.CleanValvuleSeconds + event.FertilizerPumpSeconds
82+
seconds := event.FertilizerPumpSeconds
8383
e.log.DebugContext(ctx, "execute fertilizer pump", slog.Uint64("seconds", uint64(seconds)))
8484
defer e.log.DebugContext(ctx, "fertilizer pump stopped")
8585
return e.executePin(ctx, seconds, []string{event.FertilizerPumpRelayPin})

tests/functional/programs_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func TestPrograms(t *testing.T) {
6262
then it returns ok`, func(t *testing.T) {
6363
resp, err = buildRequestAndSend(ctx, nil, authorizationHeader(), http2.MethodDelete, "/programs/daily/12:45", cl)
6464
require.NoError(t, err)
65-
},
65+
},
6666
)
6767
})
6868
t.Run(`Given a create weekly program endpoint,

0 commit comments

Comments
 (0)