Skip to content

Commit a8765d1

Browse files
committed
Initial code
1 parent cb6f1c6 commit a8765d1

8 files changed

+424
-0
lines changed

RabbitMQ/Common.cls

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
Class RabbitMQ.Common Extends %RegisteredObject [ Abstract ]
2+
{
3+
4+
/// This is the ID name of the set of credentials values (Username, Password) to be used to access the HTTP server
5+
/// Property Credentials As %String [ InitialExpression = "None" ];
6+
Property Host As %String [ InitialExpression = "localhost" ];
7+
8+
Property Port As %Integer [ InitialExpression = -1 ];
9+
10+
Property VirtualHost As %String [ InitialExpression = "/" ];
11+
12+
Property Queue As %String;
13+
14+
/// Config Name of the Java Gateway service controlling the Java Gateway server this Operation will use.
15+
Property JGService As %String;
16+
17+
/// Gateway connection
18+
Property JGW As %Net.Remote.Gateway;
19+
20+
/// API object
21+
Property API As isc.rabbitmq.API;
22+
23+
/// Encoding to convert message body. Leave empty to get/send as is.
24+
Property Encoding As %String;
25+
26+
/// CLASSPATH containing the files required to be passed as an argument when starting the JVM.
27+
/// The user should typically provide here the files containing the classes used via the Java Gateway.
28+
/// We assume that the user has properly quoted the classpath and supplied the correct separators for the platform
29+
/// in case of multiple files. <br>
30+
/// See property AdditionalPaths in that class.
31+
Property ClassPath As %String(MAXLEN = 32000);
32+
33+
/// These are the production settings for this object
34+
Parameter SETTINGS = "Host:Basic,Port:Basic,VirtualHost:Basic,Queue:Basic,Credentials:Basic:credentialsSelector,JGService:Basic:selector?context={Ens.ContextSearch/ProductionItems?targets=0&productionName=@productionId},ClassPath:Basic,Encoding:Basic";
35+
36+
/// Connect to running JGW
37+
Method Connect() As %Status
38+
{
39+
// connect to current namespace, use 2 second timeout
40+
Set sc = $$$OK
41+
Set timeout = 5
42+
Set classPath = ##class(%ListOfDataTypes).%New()
43+
Do classPath.Insert(..ClassPath)
44+
45+
// get a connection handle and connect
46+
Set gateway = ##class(%Net.Remote.Gateway).%New()
47+
Set host = ##class(Ens.Director).GetHostSettingValue(..JGService, "Address")
48+
Set port = ##class(Ens.Director).GetHostSettingValue(..JGService, "Port")
49+
Set sc = gateway.%Connect(host, port, $namespace, timeout, classPath)
50+
51+
If $$$ISOK(sc) {
52+
Set ..JGW = gateway
53+
}
54+
Quit sc
55+
}
56+
57+
Method ConnectToRabbitMQ() As %Status
58+
{
59+
Set sc = $$$OK
60+
61+
If ..%CredentialsObj.Username'="" {
62+
Set user = ..%CredentialsObj.Username
63+
Set pass = ..%CredentialsObj.Password
64+
} Else {
65+
Set user = "guest"
66+
Set pass = "guest"
67+
}
68+
69+
Try {
70+
Set ..API = ##class(isc.rabbitmq.API).%New(..JGW, ..Host, ..Port, user, pass, ..VirtualHost, ..Queue)
71+
} Catch ex {
72+
Set sc = $$$ADDSC(ex.AsStatus(),$g(%objlasterror))
73+
}
74+
75+
Quit sc
76+
}
77+
78+
}
79+

RabbitMQ/InboundAdapter.cls

+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
Class RabbitMQ.InboundAdapter Extends (Ens.InboundAdapter, RabbitMQ.Common)
2+
{
3+
4+
/// Stream class to store message body. Leave empty to use strings.
5+
Property BodyClass As %Dictionary.CacheClassname;
6+
7+
Parameter SETTINGS = "BodyClass:Basic";
8+
9+
/// Establish gateway connectionand init java API
10+
Method OnInit() As %Status
11+
{
12+
Set sc = $$$OK
13+
Quit:..JGService="" $$$ERROR($$$GeneralError,"Specify JGService setting")
14+
Quit:'##class(Ens.Director).IsItemEnabled(..JGService) $$$ERROR($$$GeneralError, $$$FormatText("Java Gateway Service: '%1' is down",..JGService))
15+
Set sc = ..Connect()
16+
Quit:$$$ISERR(sc)
17+
Set sc = ..ConnectToRabbitMQ()
18+
Quit sc
19+
}
20+
21+
/// Close connection
22+
Method OnTearDown() As %Status
23+
{
24+
Do ..API.close()
25+
Quit $$$OK
26+
}
27+
28+
/// default InboundAdapter behavior: always call ProcessInput on CallInterval
29+
Method OnTask() As %Status
30+
{
31+
Set sc = $$$OK
32+
33+
Set messageCount = 1
34+
35+
While messageCount > 0 {
36+
#Dim messageList As %ListOfDataTypes
37+
38+
If ..BodyClass = "" {
39+
Set messageList = ..API.readMessageString()
40+
} Else {
41+
Set tempStream = ..GetTempStream()
42+
Set messageList = ..API.readMessageStream(.tempStream)
43+
}
44+
45+
Set messageLength = messageList.GetAt(1)
46+
Set messageCount = messageList.GetAt(2)
47+
48+
If messageLength>0 {
49+
#Dim message As RabbitMQ.Message
50+
Set message = ..ListToMessage(messageList)
51+
If ..BodyClass = "" {
52+
Set message.Body = ..DecodeMessageBody(messageList.GetAt(16))
53+
} Else {
54+
Set message.Body = $classmethod(..BodyClass, "%New")
55+
Do message.Body.Write(..DecodeMessageBody(tempStream.Read(messageLength)))
56+
Do message.Body.Rewind()
57+
}
58+
Set sc = ..BusinessHost.ProcessInput(message)
59+
} Else {
60+
CONTINUE
61+
}
62+
Quit:$$$ISERR(sc)
63+
}
64+
Set ..BusinessHost.%WaitForNextCallInterval=1
65+
Quit sc
66+
}
67+
68+
ClassMethod ListToMessage(list As %ListOfDataTypes) As RabbitMQ.Message
69+
{
70+
Set message = ##class(RabbitMQ.Message).%New()
71+
72+
Set message.ContentType = list.GetAt(3)
73+
Set message.ContentEncoding = list.GetAt(4)
74+
Set message.CorrelationId = list.GetAt(5)
75+
Set message.ReplyTo = list.GetAt(6)
76+
Set message.Expiration = list.GetAt(7)
77+
Set message.MessageId = list.GetAt(8)
78+
Set message.Type = list.GetAt(9)
79+
Set message.UserId = list.GetAt(10)
80+
Set message.AppId = list.GetAt(11)
81+
Set message.ClusterId = list.GetAt(12)
82+
Set message.DeliveryMode = list.GetAt(13)
83+
Set message.Priority = list.GetAt(14)
84+
Set message.Timestamp = list.GetAt(15)
85+
86+
Quit message
87+
}
88+
89+
Method DecodeMessageBody(body As %String) As %String
90+
{
91+
If ..Encoding '= "" {
92+
If $isObject(body) {
93+
// TODO streams
94+
} Else {
95+
Set body = $zcvt(body, "O", ..Encoding)
96+
}
97+
}
98+
Quit body
99+
}
100+
101+
ClassMethod GetTempStream() As %GlobalBinaryStream
102+
{
103+
Set stream=##class(%GlobalBinaryStream).%New()
104+
// TODO - work around that
105+
// we need to 'reserve' a number of bytes since we are passing the stream
106+
// by reference (Java's equivalent is byte[] ba = new byte[max];)
107+
For i=1:1:32000 Do stream.Write("0")
108+
Quit stream
109+
}
110+
111+
}
112+

RabbitMQ/Message.cls

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
Class RabbitMQ.Message Extends %Persistent
2+
{
3+
4+
Property ContentType As %String;
5+
6+
Property ContentEncoding As %String;
7+
8+
Property CorrelationId As %String;
9+
10+
Property ReplyTo As %String;
11+
12+
Property Expiration As %String;
13+
14+
Property MessageId As %String;
15+
16+
Property Type As %String;
17+
18+
Property UserId As %String;
19+
20+
Property AppId As %String;
21+
22+
Property ClusterId As %String;
23+
24+
Property DeliveryMode As %String;
25+
26+
Property Priority As %String;
27+
28+
Property Timestamp As %String;
29+
30+
/// Could be either string or stream
31+
Property Body As %String;
32+
33+
Storage Default
34+
{
35+
<Data name="MessageDefaultData">
36+
<Value name="1">
37+
<Value>%%CLASSNAME</Value>
38+
</Value>
39+
<Value name="2">
40+
<Value>ContentType</Value>
41+
</Value>
42+
<Value name="3">
43+
<Value>ContentEncoding</Value>
44+
</Value>
45+
<Value name="4">
46+
<Value>CorrelationId</Value>
47+
</Value>
48+
<Value name="5">
49+
<Value>ReplyTo</Value>
50+
</Value>
51+
<Value name="6">
52+
<Value>Expiration</Value>
53+
</Value>
54+
<Value name="7">
55+
<Value>MessageId</Value>
56+
</Value>
57+
<Value name="8">
58+
<Value>Type</Value>
59+
</Value>
60+
<Value name="9">
61+
<Value>UserId</Value>
62+
</Value>
63+
<Value name="10">
64+
<Value>AppId</Value>
65+
</Value>
66+
<Value name="11">
67+
<Value>ClusterId</Value>
68+
</Value>
69+
<Value name="12">
70+
<Value>DeliveryMode</Value>
71+
</Value>
72+
<Value name="13">
73+
<Value>Priority</Value>
74+
</Value>
75+
<Value name="14">
76+
<Value>Timestamp</Value>
77+
</Value>
78+
<Value name="15">
79+
<Value>Body</Value>
80+
</Value>
81+
</Data>
82+
<DataLocation>^RabbitMQ.MessageD</DataLocation>
83+
<DefaultData>MessageDefaultData</DefaultData>
84+
<IdLocation>^RabbitMQ.MessageD</IdLocation>
85+
<IndexLocation>^RabbitMQ.MessageI</IndexLocation>
86+
<StreamLocation>^RabbitMQ.MessageS</StreamLocation>
87+
<Type>%Library.CacheStorage</Type>
88+
}
89+
90+
}
91+

RabbitMQ/Operation.cls

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
Class RabbitMQ.Operation Extends Ens.BusinessOperation
2+
{
3+
4+
Parameter ADAPTER = "RabbitMQ.OutboundAdapter";
5+
6+
Property Adapter As RabbitMQ.OutboundAdapter;
7+
8+
Method OnMessage(request As Ens.StringRequest, response As Ens.Response) As %Status
9+
{
10+
#Dim sc As %Status = $$$OK
11+
Set response = ##class(Ens.Response).%New()
12+
quit ..Adapter.SendMessageToQueue("Hello", request.StringValue)
13+
}
14+
15+
}
16+

RabbitMQ/OutboundAdapter.cls

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
Class RabbitMQ.OutboundAdapter Extends (Ens.OutboundAdapter, RabbitMQ.Common)
2+
{
3+
4+
Method OnInit() As %Status
5+
{
6+
Set sc = $$$OK
7+
Quit:..JGService="" $$$ERROR($$$GeneralError,"Specify JGService setting")
8+
Quit:'##class(Ens.Director).IsItemEnabled(..JGService) $$$ERROR($$$GeneralError, $$$FormatText("Java Gateway Service: '%1' is down",..JGService))
9+
Set sc = ..Connect()
10+
Quit:$$$ISERR(sc) sc
11+
Set sc = ..ConnectToRabbitMQ()
12+
Quit sc
13+
}
14+
15+
/// Close connection
16+
Method OnTearDown() As %Status
17+
{
18+
Do ..API.close()
19+
Quit $$$OK
20+
}
21+
22+
/// Send message. message can be a string or stream.
23+
Method SendMessage(message As %Stream.Object) As %Status
24+
{
25+
Set sc = $$$OK
26+
Set stream = ##class(%Library.GlobalBinaryStream).%New()
27+
28+
If $isObject(message) {
29+
While 'message.AtEnd {
30+
Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength)))
31+
}
32+
} Else {
33+
Do stream.Write(..EncodeMessageBody(message))
34+
}
35+
36+
Try {
37+
Do ..API.sendMessage(stream)
38+
} Catch ex {
39+
Set sc = ex.AsStatus()
40+
}
41+
Quit sc
42+
}
43+
44+
/// Send message. message can be a string or stream.
45+
Method SendMessageToQueue(queue As %String, message As %Stream.Object) As %Status
46+
{
47+
Set sc = $$$OK
48+
Set stream = ##class(%Library.GlobalBinaryStream).%New()
49+
50+
If $isObject(message) {
51+
While 'message.AtEnd {
52+
Do stream.Write(..EncodeMessageBody(message.Read($$$MaxStringLength)))
53+
}
54+
} Else {
55+
Do stream.Write(..EncodeMessageBody(message))
56+
}
57+
58+
Try {
59+
Do ..API.sendMessageToQueue(queue, stream)
60+
} Catch ex {
61+
Set sc = ex.AsStatus()
62+
}
63+
Quit sc
64+
}
65+
66+
Method EncodeMessageBody(body As %String) As %String
67+
{
68+
If ..Encoding '= "" {
69+
If $isObject(body) {
70+
// TODO streams
71+
} Else {
72+
Set body = $zcvt(body, "O", ..Encoding)
73+
}
74+
}
75+
Quit body
76+
}
77+
78+
}
79+

0 commit comments

Comments
 (0)