-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathbackend.go
118 lines (100 loc) · 2.96 KB
/
backend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"encoding/base64"
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/eclipse/paho.mqtt.golang"
"sync"
)
// Backend implements a MQTT pub-sub backend.
type Backend struct {
conn mqtt.Client
// Are we connecting to LoRa Server or to Mainflux
isLora bool
mutex sync.RWMutex
}
var (
loraBackend *Backend
mainfluxBackend *Backend
)
// NewBackend creates a new Backend.
func NewBackend(server, username, password string, isLora bool) (*Backend, error) {
b := Backend{}
opts := mqtt.NewClientOptions()
opts.AddBroker(server)
opts.SetUsername(username)
opts.SetPassword(password)
opts.SetOnConnectHandler(b.onConnected)
opts.SetConnectionLostHandler(b.onConnectionLost)
log.WithField("server", server).Info("backend: connecting to mqtt broker")
b.conn = mqtt.NewClient(opts)
if token := b.conn.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
b.isLora = isLora
return &b, nil
}
// Send MQTT message
func (b *Backend) SendMQTTMsg(topic string, data []byte) error {
log.WithField("topic", topic).Info("backend publishing packet: ", string(data))
if token := b.conn.Publish(topic, 0, false, data); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
// Close closes the backend.
func (b *Backend) Close() {
b.conn.Disconnect(250) // wait 250 milisec to complete pending actions
log.Info("-- DISCONNECTING\n")
}
// Subscribe to lora server messages
func (b *Backend) Sub() error {
switch b.isLora {
case true:
if s := b.conn.Subscribe("application/+/node/+/rx", 0, b.MessageHandler); s.Wait() && s.Error() != nil {
log.Info("Failed to subscribe, err: %v\n", s.Error())
return s.Error()
}
case false:
// For now we deo not SUB to Mainflux
break
}
return nil
}
// Handler for received messages from loraserver
func (b *Backend) MessageHandler(c mqtt.Client, msg mqtt.Message) {
log.WithField("topic", msg.Topic()).Info("backend: packet received")
log.Info("TOPIC: %s\n", msg.Topic())
log.Info("MSG: %s\n", msg.Payload())
switch b.isLora {
case true:
// Mainflux backend is subscribed to LoRa Network Server and recieves LoRa messages
u := LoraMessage{}
errStatus := json.Unmarshal(msg.Payload(), &u)
if errStatus != nil {
log.Errorf("\nerror: decode json failed")
log.Errorf(errStatus.Error())
return
}
fmt.Printf("\n <-- RCVD DATA: %s\n", u.Data)
data, err := base64.StdEncoding.DecodeString(u.Data)
if err != nil {
log.Errorf("\nerror: decode base64 failed")
}
topic := cfg.LORAChannel
mainfluxBackend.SendMQTTMsg(topic, data)
log.Info(" --> PUSH DATA: %s to %s\n", topic, data)
case false:
// LoRa backend is not currently subsctibed to Mainflux MQTT broker
break
}
}
func (b *Backend) onConnected(c mqtt.Client) {
defer b.mutex.RUnlock()
b.mutex.RLock()
log.Info("backend: connected to mqtt broker")
}
func (b *Backend) onConnectionLost(c mqtt.Client, reason error) {
log.Errorf("backend: mqtt connection error: %s", reason)
}