Skip to content

Commit 5fbc6d1

Browse files
authored
Add support for variables, "+" and "#" (#95)
1 parent 959a5f0 commit 5fbc6d1

File tree

3 files changed

+39
-9
lines changed

3 files changed

+39
-9
lines changed

pkg/mqtt/client.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,13 @@ func (c *client) IsConnected() bool {
7171
return c.client.IsConnectionOpen()
7272
}
7373

74-
func (c *client) HandleMessage(_ paho.Client, msg paho.Message) {
74+
func (c *client) HandleMessage(topic string, payload []byte) {
7575
message := Message{
7676
Timestamp: time.Now(),
77-
Value: msg.Payload(),
77+
Value: payload,
7878
}
79-
c.topics.AddMessage(msg.Topic(), message)
79+
80+
c.topics.AddMessage(topic, message)
8081
}
8182

8283
func (c *client) GetTopic(reqPath string) (*Topic, bool) {
@@ -104,9 +105,16 @@ func (c *client) Subscribe(reqPath string) *Topic {
104105
return t
105106
}
106107

107-
log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", t.Path)
108-
if token := c.client.Subscribe(t.Path, 0, c.HandleMessage); token.Wait() && token.Error() != nil {
109-
log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", t.Path, "error", token.Error())
108+
log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", topicPath)
109+
110+
topic := resolveTopic(t.Path)
111+
112+
if token := c.client.Subscribe(topic, 0, func(_ paho.Client, m paho.Message) {
113+
// by wrapping HandleMessage we can directly get the correct topicPath for the incoming topic
114+
// and don't need to regex it against + and #.
115+
c.HandleMessage(topicPath, []byte(m.Payload()))
116+
}); token.Wait() && token.Error() != nil {
117+
log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topicPath, "error", token.Error())
110118
}
111119
c.topics.Store(t)
112120
return t
@@ -126,7 +134,9 @@ func (c *client) Unsubscribe(reqPath string) {
126134
}
127135

128136
log.DefaultLogger.Debug("Unsubscribing from MQTT topic", "topic", t.Path)
129-
if token := c.client.Unsubscribe(t.Path); token.Wait() && token.Error() != nil {
137+
138+
topic := resolveTopic(t.Path)
139+
if token := c.client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
130140
log.DefaultLogger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", token.Error())
131141
}
132142
}

pkg/mqtt/topic.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package mqtt
22

33
import (
44
"path"
5+
"strings"
56
"sync"
67
"time"
78

@@ -97,3 +98,10 @@ func (tm *TopicMap) Store(t *Topic) {
9798
func (tm *TopicMap) Delete(key string) {
9899
tm.Map.Delete(key)
99100
}
101+
102+
// replace all __PLUS__ with + and one __HASH__ with #
103+
// Question: Why does grafana not allow + and # in query?
104+
func resolveTopic(topic string) string {
105+
resolvedTopic := strings.ReplaceAll(topic, "__PLUS__", "+")
106+
return strings.Replace(resolvedTopic, "__HASH__", "#", -1)
107+
}

src/datasource.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,21 @@
1-
import { DataSourceInstanceSettings } from '@grafana/data';
2-
import { DataSourceWithBackend } from '@grafana/runtime';
1+
import { DataSourceInstanceSettings, ScopedVars } from '@grafana/data';
2+
import { DataSourceWithBackend, getTemplateSrv } from '@grafana/runtime';
33
import { MqttDataSourceOptions, MqttQuery } from './types';
44

55
export class DataSource extends DataSourceWithBackend<MqttQuery, MqttDataSourceOptions> {
66
constructor(instanceSettings: DataSourceInstanceSettings<MqttDataSourceOptions>) {
77
super(instanceSettings);
88
}
9+
10+
applyTemplateVariables(query: MqttQuery, scopedVars: ScopedVars): Record<string, any> {
11+
let resolvedTopic = getTemplateSrv().replace(query.topic, scopedVars);
12+
resolvedTopic = resolvedTopic.replace(/\+/gi, '__PLUS__');
13+
resolvedTopic = resolvedTopic.replace(/\#/gi, '__HASH__');
14+
const resolvedQuery: MqttQuery = {
15+
...query,
16+
topic: resolvedTopic,
17+
};
18+
19+
return resolvedQuery;
20+
}
921
}

0 commit comments

Comments
 (0)