Skip to content

Commit 5d06d48

Browse files
committed
message-store and message-store-postgres are combined
1 parent 766fe39 commit 5d06d48

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+999
-1310
lines changed

lib/message_store.rb

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
require 'pp'
2-
require 'json'
1+
require 'pg'
32

43
require 'casing'
54
require 'identifier/uuid'
@@ -9,9 +8,12 @@
98
require 'virtual'
109
require 'async_invocation'
1110

11+
require 'log'
12+
require 'settings'
13+
1214
require 'message_store/expected_version'
13-
require 'message_store/no_stream'
1415
require 'message_store/id'
16+
require 'message_store/no_stream'
1517
require 'message_store/stream_name'
1618

1719
require 'message_store/message_data'
@@ -21,10 +23,20 @@
2123

2224
require 'message_store/log'
2325

26+
require 'message_store/settings'
27+
require 'message_store/session'
28+
29+
require 'message_store/put'
30+
require 'message_store/write'
31+
2432
require 'message_store/get'
2533
require 'message_store/get/substitute'
34+
require 'message_store/get/condition'
35+
require 'message_store/get/stream'
2636
require 'message_store/get/stream/last'
2737
require 'message_store/get/stream/last/substitute'
38+
require 'message_store/get/category'
39+
require 'message_store/get/category/correlation'
40+
require 'message_store/get/category/consumer_group'
2841
require 'message_store/read/iterator'
2942
require 'message_store/read'
30-
require 'message_store/write'

lib/message_store/controls.rb

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,10 @@
88
require 'message_store/controls/id'
99
require 'message_store/controls/category'
1010
require 'message_store/controls/stream_name'
11-
require 'message_store/controls/read'
11+
require 'message_store/controls/position'
1212
require 'message_store/controls/message_data'
13-
require 'message_store/controls/message_data/hash'
1413
require 'message_store/controls/message_data/metadata'
15-
require 'message_store/controls/message_data/write'
14+
require 'message_store/controls/message_data/hash'
1615
require 'message_store/controls/message_data/read'
17-
require 'message_store/controls/write'
18-
require 'message_store/controls/get'
19-
require 'message_store/controls/get_last'
16+
require 'message_store/controls/message_data/write'
17+
require 'message_store/controls/put'

lib/message_store/controls/get_last.rb

Lines changed: 0 additions & 33 deletions
This file was deleted.

lib/message_store/controls/message_data/write.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,26 @@ def self.data
4848
def self.metadata
4949
MessageData::Metadata.data
5050
end
51+
52+
module List
53+
Entry = Struct.new(:stream_name, :category, :message_data)
54+
55+
def self.get(instances: nil, stream_name: nil, category: nil)
56+
instances ||= 1
57+
58+
list = []
59+
instances.times do
60+
instance_stream_name = stream_name || StreamName.example(category: category)
61+
instance_category = MessageStore::StreamName.get_category(instance_stream_name)
62+
63+
write_message = Controls::MessageData::Write.example
64+
65+
list << Entry.new(instance_stream_name, instance_category, write_message)
66+
end
67+
68+
list
69+
end
70+
end
5171
end
5272
end
5373
end
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
module MessageStore
2+
module Controls
3+
module Position
4+
def self.example
5+
1
6+
end
7+
8+
def self.max
9+
(2 ** 63) - 1
10+
end
11+
end
12+
end
13+
end

lib/message_store/controls/put.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
module MessageStore
2+
module Controls
3+
module Put
4+
def self.call(instances: nil, stream_name: nil, message_data: nil, message: nil, category: nil, type: nil)
5+
instances ||= 1
6+
stream_name ||= StreamName.example(category: category)
7+
message_data ||= message
8+
9+
message_specified = !message_data.nil?
10+
11+
message_data ||= MessageData::Write.example(type: type)
12+
13+
position = nil
14+
instances.times do
15+
position = MessageStore::Put.(message_data, stream_name)
16+
17+
unless message_specified
18+
message_data.id = MessageData::Write.id
19+
end
20+
end
21+
22+
[stream_name, position]
23+
end
24+
end
25+
end
26+
end

lib/message_store/controls/read.rb

Lines changed: 0 additions & 22 deletions
This file was deleted.

lib/message_store/controls/write.rb

Lines changed: 0 additions & 23 deletions
This file was deleted.

lib/message_store/get.rb

Lines changed: 152 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,161 @@ def self.included(cls)
77
include Virtual
88
include Log::Dependency
99

10+
prepend Call
11+
prepend BatchSize
12+
13+
dependency :session, Session
14+
1015
abstract :call
1116
abstract :stream_name
12-
abstract :batch_size
17+
abstract :sql_command
18+
abstract :parameters
19+
abstract :parameter_values
1320
abstract :last_position
21+
abstract :log_text
22+
23+
virtual :specialize_error
24+
virtual :assure
25+
end
26+
end
27+
28+
module BatchSize
29+
def batch_size
30+
@batch_size ||= Defaults.batch_size
31+
end
32+
end
33+
34+
def self.build(stream_name, **args)
35+
cls = specialization(stream_name)
36+
cls.build(stream_name, **args)
37+
end
38+
39+
def self.configure(receiver, stream_name, **args)
40+
attr_name = args.delete(:attr_name)
41+
attr_name ||= :get
42+
43+
instance = build(stream_name, **args)
44+
receiver.public_send("#{attr_name}=", instance)
45+
end
46+
47+
## Is there a path to removing this? - Aaron, Sun Jan 15 2023
48+
def configure(session: nil)
49+
Session.configure(self, session: session)
50+
end
51+
52+
def self.call(stream_name, **args)
53+
position = args.delete(:position)
54+
instance = build(stream_name, **args)
55+
instance.(position)
56+
end
57+
58+
module Call
59+
def call(position=nil, stream_name: nil)
60+
position ||= self.class::Defaults.position
61+
62+
stream_name ||= self.stream_name
63+
64+
assure
65+
66+
logger.trace(tag: :get) { "Getting message data (#{log_text(stream_name, position)})" }
67+
68+
result = get_result(stream_name, position)
69+
70+
message_data = convert(result)
71+
72+
logger.info(tag: :get) { "Finished getting message data (Count: #{message_data.length}, #{log_text(stream_name, position)})" }
73+
logger.info(tags: [:data, :message_data]) { message_data.pretty_inspect }
74+
75+
message_data
76+
end
77+
end
78+
79+
def get_result(stream_name, position)
80+
logger.trace(tag: :get) { "Getting result (#{log_text(stream_name, position)})" }
81+
82+
parameter_values = parameter_values(stream_name, position)
83+
84+
begin
85+
result = session.execute(sql_command, parameter_values)
86+
rescue PG::RaiseException => e
87+
raise_error(e)
88+
end
89+
90+
logger.debug(tag: :get) { "Finished getting result (Count: #{result.ntuples}, #{log_text(stream_name, position)})" }
91+
92+
result
93+
end
94+
95+
def convert(result)
96+
logger.trace(tag: :get) { "Converting result to message data (Result Count: #{result.ntuples})" }
97+
98+
message_data = result.map do |record|
99+
Get.message_data(record)
100+
end
101+
102+
logger.debug(tag: :get) { "Converted result to message data (Message Data Count: #{message_data.length})" }
103+
104+
message_data
105+
end
106+
107+
def self.message_data(record)
108+
record['data'] = Get::Deserialize.data(record['data'])
109+
record['metadata'] = Get::Deserialize.metadata(record['metadata'])
110+
record['time'] = Get::Time.utc_coerced(record['time'])
111+
112+
MessageData::Read.build(record)
113+
end
114+
115+
def raise_error(pg_error)
116+
error_message = Get.error_message(pg_error)
117+
118+
error = Condition.error(error_message)
119+
120+
if error.nil?
121+
error = specialize_error(error_message)
122+
end
123+
124+
if not error.nil?
125+
logger.error { error_message }
126+
raise error
127+
end
128+
129+
raise pg_error
130+
end
131+
132+
def self.error_message(pg_error)
133+
pg_error.message.gsub('ERROR:', '').strip
134+
end
135+
136+
def self.specialization(stream_name)
137+
if StreamName.category?(stream_name)
138+
Category
139+
else
140+
Stream
141+
end
142+
end
143+
144+
module Deserialize
145+
def self.data(serialized_data)
146+
return nil if serialized_data.nil?
147+
Transform::Read.(serialized_data, :json, MessageData::Hash)
148+
end
149+
150+
def self.metadata(serialized_metadata)
151+
return nil if serialized_metadata.nil?
152+
Transform::Read.(serialized_metadata, :json, MessageData::Hash)
153+
end
154+
end
155+
156+
module Time
157+
def self.utc_coerced(local_time)
158+
Clock::UTC.coerce(local_time)
159+
end
160+
end
161+
162+
module Defaults
163+
def self.batch_size
164+
1000
14165
end
15166
end
16167
end

0 commit comments

Comments
 (0)