Skip to content

Commit f019f62

Browse files
jsvdmashhurs
andauthored
Backport bulk count and uncompressed size headers to 11.x (+ ci/test fixes) (#1219)
Adds two new headers to each bulk request: * "X-Elastic-Event-Count": number of actions / documents in that bulk request * "X-Elastic-Uncompressed-Request-Length": size in bytes of the request body before compression X-Elastic-Uncompressed-Request-Length is equal to Content-Length when compression is disabled. Backport fixes: * Tolerate the elasticsearch-ruby v8 client in integration tests. (#1208) * elasticsearch-ruby client got updated to v8 in LS core. This plugin uses it in integration tests. This change tolerates both elasticsearch-ruby v7 and v8 client versions. * Fix the ILM spec issue where method was removed, restored internally. Co-authored-by: Mashhur <[email protected]>
1 parent 0261f27 commit f019f62

28 files changed

+305
-198
lines changed

.ci/docker-compose.override.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
version: '3'
2-
31
services:
42

53
logstash:

.ci/logstash-run.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ else
1313
fi
1414

1515
# CentOS 7 using curl defaults does not enable TLSv1.3
16-
CURL_OPTS="-k --tlsv1.2 --tls-max 1.3"
16+
CURL_OPTS="-k -u admin:elastic --tlsv1.2 --tls-max 1.3"
1717

1818
wait_for_es() {
1919
count=120
@@ -22,7 +22,7 @@ wait_for_es() {
2222
[[ $count -eq 0 ]] && exit 1
2323
sleep 1
2424
done
25-
echo $(curl $CURL_OPTS -vi $ES_URL | jq -r .version.number)
25+
echo $(curl $CURL_OPTS $ES_URL | jq -r .version.number)
2626
}
2727

2828
if [[ "$INTEGRATION" != "true" ]]; then

.travis.yml

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,23 @@ import:
44
jobs:
55
include:
66
- stage: "Integration Tests"
7-
env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current
7+
env: INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current
8+
- env: INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.previous
9+
- env: INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current
10+
- env: INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.current
11+
- env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current
812
- env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.previous
913
- env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current
1014
- env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.next
11-
- env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.future
15+
- env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.next
16+
- env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=main
1217
- stage: "Secure Integration Tests"
13-
env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current SNAPSHOT=true
14-
- env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current
15-
- env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current ES_SSL_KEY_INVALID=true
16-
- env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current ES_SSL_SUPPORTED_PROTOCOLS=TLSv1.3
18+
env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current
19+
- env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current
20+
- env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current ES_SSL_KEY_INVALID=true
21+
- env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current ES_SSL_SUPPORTED_PROTOCOLS=TLSv1.3
22+
- env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.current
23+
- env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.current ES_SSL_KEY_INVALID=true
24+
- env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.current ES_SSL_SUPPORTED_PROTOCOLS=TLSv1.3
25+
- env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info SNAPSHOT=true ELASTIC_STACK_VERSION=8.next
26+
- env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info SNAPSHOT=true ELASTIC_STACK_VERSION=9.next

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
## 11.22.13
2+
- Add headers reporting uncompressed size and doc count for bulk requests [#1217](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1217)
3+
14
## 11.22.12
25
- Properly handle http code 413 (Payload Too Large) [#1199](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1199)
6+
37
## 11.22.11
48
- Remove irrelevant log warning about elastic stack version [#1202](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1202)
59

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ module LogStash; module Outputs; class ElasticSearch;
2121
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
2222
# made sense. We picked one on the lowish side to not use too much heap.
2323
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB
24-
24+
EVENT_COUNT_HEADER = "X-Elastic-Event-Count".freeze
25+
UNCOMPRESSED_LENGTH_HEADER = "X-Elastic-Uncompressed-Request-Length".freeze
2526

2627
class HttpClient
2728
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
@@ -143,7 +144,11 @@ def bulk(actions)
143144
:payload_size => stream_writer.pos,
144145
:content_length => body_stream.size,
145146
:batch_offset => (index + 1 - batch_actions.size))
146-
bulk_responses << bulk_send(body_stream, batch_actions)
147+
headers = {
148+
EVENT_COUNT_HEADER => batch_actions.size.to_s,
149+
UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos.to_s
150+
}
151+
bulk_responses << bulk_send(body_stream, batch_actions, headers)
147152
body_stream.truncate(0) && body_stream.seek(0)
148153
stream_writer = gzip_writer(body_stream) if compression_level?
149154
batch_actions.clear
@@ -159,7 +164,14 @@ def bulk(actions)
159164
:payload_size => stream_writer.pos,
160165
:content_length => body_stream.size,
161166
:batch_offset => (actions.size - batch_actions.size))
162-
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
167+
168+
if body_stream.size > 0
169+
headers = {
170+
EVENT_COUNT_HEADER => batch_actions.size.to_s,
171+
UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos.to_s
172+
}
173+
bulk_responses << bulk_send(body_stream, batch_actions, headers)
174+
end
163175

164176
body_stream.close unless compression_level?
165177
join_bulk_responses(bulk_responses)
@@ -179,8 +191,8 @@ def join_bulk_responses(bulk_responses)
179191
}
180192
end
181193

182-
def bulk_send(body_stream, batch_actions)
183-
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}
194+
def bulk_send(body_stream, batch_actions, headers = {})
195+
params = compression_level? ? {:headers => headers.merge("Content-Encoding" => "gzip") } : { :headers => headers }
184196

185197
begin
186198
response = @pool.post(@bulk_path, params, body_stream.string)

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.22.12'
3+
s.version = '11.22.13'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/es_spec_helper.rb

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
require_relative './spec_helper'
22

33
require 'elasticsearch'
4-
require_relative "support/elasticsearch/api/actions/delete_ilm_policy"
5-
require_relative "support/elasticsearch/api/actions/get_alias"
6-
require_relative "support/elasticsearch/api/actions/put_alias"
7-
require_relative "support/elasticsearch/api/actions/get_ilm_policy"
8-
require_relative "support/elasticsearch/api/actions/put_ilm_policy"
94

105
require 'json'
116
require 'cabin'
127

8+
# remove this condition and support package once plugin starts consuming elasticsearch-ruby v8 client
9+
# in elasticsearch-ruby v7, ILM APIs were in a separate xpack gem, now directly available
10+
unless elastic_ruby_v8_client_available?
11+
require_relative "support/elasticsearch/api/actions/delete_ilm_policy"
12+
require_relative "support/elasticsearch/api/actions/get_ilm_policy"
13+
require_relative "support/elasticsearch/api/actions/put_ilm_policy"
14+
end
15+
1316
module ESHelper
1417
def get_host_port
1518
if ENV["INTEGRATION"] == "true"
@@ -20,8 +23,12 @@ def get_host_port
2023
end
2124

2225
def get_client
23-
Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client|
24-
allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking
26+
if elastic_ruby_v8_client_available?
27+
Elasticsearch::Client.new(:hosts => [get_host_port])
28+
else
29+
Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client|
30+
allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking
31+
end
2532
end
2633
end
2734

@@ -128,31 +135,36 @@ def get_cluster_settings(client)
128135
end
129136

130137
def get_policy(client, policy_name)
131-
client.get_ilm_policy(name: policy_name)
138+
if elastic_ruby_v8_client_available?
139+
client.index_lifecycle_management.get_lifecycle(policy: policy_name)
140+
else
141+
client.get_ilm_policy(name: policy_name)
142+
end
132143
end
133144

134145
def put_policy(client, policy_name, policy)
135-
client.put_ilm_policy({:name => policy_name, :body=> policy})
136-
end
137-
138-
def put_alias(client, the_alias, index)
139-
body = {
140-
"aliases" => {
141-
index => {
142-
"is_write_index"=> true
143-
}
144-
}
145-
}
146-
client.put_alias({name: the_alias, body: body})
146+
if elastic_ruby_v8_client_available?
147+
client.index_lifecycle_management.put_lifecycle({:policy => policy_name, :body=> policy})
148+
else
149+
client.put_ilm_policy({:name => policy_name, :body=> policy})
150+
end
147151
end
148152

149153
def clean_ilm(client)
150-
client.get_ilm_policy.each_key { |key| client.delete_ilm_policy(name: key) if key =~ /logstash-policy/ }
154+
if elastic_ruby_v8_client_available?
155+
client.index_lifecycle_management.get_lifecycle.each_key { |key| client.index_lifecycle_management.delete_lifecycle(policy: key) if key =~ /logstash-policy/ }
156+
else
157+
client.get_ilm_policy.each_key { |key| client.delete_ilm_policy(name: key) if key =~ /logstash-policy/ }
158+
end
151159
end
152160

153161
def supports_ilm?(client)
154162
begin
155-
client.get_ilm_policy
163+
if elastic_ruby_v8_client_available?
164+
client.index_lifecycle_management.get_lifecycle
165+
else
166+
client.get_ilm_policy
167+
end
156168
true
157169
rescue
158170
false
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2024-06-25T21:50:58+01:00
1+
2025-07-22T11:15:03+01:00

spec/fixtures/test_certs/ca.crt

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,32 @@
11
-----BEGIN CERTIFICATE-----
2-
MIIFDDCCAvQCAQEwDQYJKoZIhvcNAQELBQAwTDELMAkGA1UEBhMCUFQxCzAJBgNV
3-
BAgMAk5BMQ8wDQYDVQQHDAZMaXNib24xDjAMBgNVBAoMBU15TGFiMQ8wDQYDVQQD
4-
DAZSb290Q0EwHhcNMjQwNjI1MjA1MDU4WhcNMjUwNjI1MjA1MDU4WjBMMQswCQYD
5-
VQQGEwJQVDELMAkGA1UECAwCTkExDzANBgNVBAcMBkxpc2JvbjEOMAwGA1UECgwF
6-
TXlMYWIxDzANBgNVBAMMBlJvb3RDQTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCC
7-
AgoCggIBAMtTMqAWuH17b9XqPa5L3HNqgnZ958+gvcOt7Q/sOEvcDQJgkzZ+Gywh
8-
5er5JF2iomYOHiD5JncYr4YmRQKuYfD6B1WI5FuQthD/OlA1/RHqtbY27J33SaO6
9-
6ro8gal7vjHrXKQkefVYRwdfO6DqqbhV6L4sMiy8FzQ55TMpoM35cWuvoAMxvSQq
10-
GZ4pYYKnfNSGhzHvssfNS1xu/Lwb7Vju4jPhp+43BkGwEimI5km7jNC1nwjiHtxD
11-
sY/s93AKa/vLktXKUK5nA3jjJOhAbRTVnbOAgxFt0YbX98xW/aUqscgBUVs9J/My
12-
TRMwVKJ7Vsmth1PdJQksUASuzESlSPl09dMjTQ+MXzJDt0JvX8SIJPmbBng78MSa
13-
CUhpOZiii1l2mBfPWejx20I/SMCUNmzbwm2w9JD50Jv2iX4l4ge4H1CIK1/orW1p
14-
dY9xPL0uKYm6ADsDC0B8sGgNMBXeB6aLojY1/ITwmmfpfk9c/yWPfC7stHgCYRAv
15-
5MfGAsmv0/ya5VrWQGBJkFiYy1pon6nxUjCbgn0RABojRoGdhhY3QDipgwmSgFZx
16-
r064RFr1bt/Ml3MJmPf535mSwPdk/j/zw4IZTvlmwKW3FyMDhwYL/zX7J0c6MzMP
17-
LEdi73Qjzmr3ENIrir4O86wNz81YRfYkg9ZX8yKJK9LBAUrYCjJ3AgMBAAEwDQYJ
18-
KoZIhvcNAQELBQADggIBABym9LMyS9W9lvpcH4OK1YLfBPJwrhZ+4keiriY4zWOo
19-
pB+v2Q35neMMXSlTDpeIwPdMkqsh8VZprOWURF80JGvpJ6fBfi05rCDWp/ol1ZKi
20-
snCA+dE2zDK7Z3+F0MbakT5oBi5WgkXSvRvlJEJ/gBD7WC1wq0kxCMK+M5w2RPAT
21-
nnV/iozNBkwExxyJA7BpS6F/v0XjwK7fm5Kpql7zKlh5piZ2IVU0B60Sqskcb2mU
22-
90+1r9T06ekIW/Iz1jd5RWYziu0nbmDeKeKvGAICNU+evYXW+/5kKecMLuEvDCgS
23-
ssbt/Hb510uLHhxfhN4SbvBl2zADsLC+2arf2ATIwD8ZXDDs04ayBsejV0ZwVrTZ
24-
ExKqAys+B3tuIHGRqL5VukdmH6g6oJziYueohPBCOuSOzDd0FhppF4uXZS8DReSg
25-
KieO2ZYfiA1gVRiY6jPx+r7J9I5kSS1gwr/e3zHJHa79ijMB1SSIswQUmgSMkwGh
26-
sNyDNI9ZxgJan3v7kVargMt2LiNcXvVyTzPSYSXcY7SoebfpMprVIG7vZ9TZf+Uu
27-
FQeOfxdLFuGTnpFrYmvOD3OIKfODlY5t+TNICg7A3eTUXeJPcdBBnuVCiQU6TCB5
28-
H+69K5w54Q6a70sHZU1IWsGT8XtbUizPNQky+LAFsE/5oUnCqtypeEu4srcZK53x
2+
MIIFdTCCA12gAwIBAgIUDITbsLT9hKser0ZzBZsxqgaZdWswDQYJKoZIhvcNAQEL
3+
BQAwSjELMAkGA1UEBhMCUFQxCzAJBgNVBAgMAk5BMQ8wDQYDVQQHDAZMaXNib24x
4+
DjAMBgNVBAoMBU15TGFiMQ0wCwYDVQQDDARyb290MB4XDTI1MDcyMjEwMTUwM1oX
5+
DTM1MDcyMDEwMTUwM1owSjELMAkGA1UEBhMCUFQxCzAJBgNVBAgMAk5BMQ8wDQYD
6+
VQQHDAZMaXNib24xDjAMBgNVBAoMBU15TGFiMQ0wCwYDVQQDDARyb290MIICIjAN
7+
BgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAy1MyoBa4fXtv1eo9rkvcc2qCdn3n
8+
z6C9w63tD+w4S9wNAmCTNn4bLCHl6vkkXaKiZg4eIPkmdxivhiZFAq5h8PoHVYjk
9+
W5C2EP86UDX9Eeq1tjbsnfdJo7rqujyBqXu+MetcpCR59VhHB187oOqpuFXoviwy
10+
LLwXNDnlMymgzflxa6+gAzG9JCoZnilhgqd81IaHMe+yx81LXG78vBvtWO7iM+Gn
11+
7jcGQbASKYjmSbuM0LWfCOIe3EOxj+z3cApr+8uS1cpQrmcDeOMk6EBtFNWds4CD
12+
EW3Rhtf3zFb9pSqxyAFRWz0n8zJNEzBUontWya2HU90lCSxQBK7MRKVI+XT10yNN
13+
D4xfMkO3Qm9fxIgk+ZsGeDvwxJoJSGk5mKKLWXaYF89Z6PHbQj9IwJQ2bNvCbbD0
14+
kPnQm/aJfiXiB7gfUIgrX+itbWl1j3E8vS4piboAOwMLQHywaA0wFd4HpouiNjX8
15+
hPCaZ+l+T1z/JY98Luy0eAJhEC/kx8YCya/T/JrlWtZAYEmQWJjLWmifqfFSMJuC
16+
fREAGiNGgZ2GFjdAOKmDCZKAVnGvTrhEWvVu38yXcwmY9/nfmZLA92T+P/PDghlO
17+
+WbApbcXIwOHBgv/NfsnRzozMw8sR2LvdCPOavcQ0iuKvg7zrA3PzVhF9iSD1lfz
18+
Iokr0sEBStgKMncCAwEAAaNTMFEwHQYDVR0OBBYEFKFadJx46upif1BrhYZ0iu8o
19+
2z8rMB8GA1UdIwQYMBaAFKFadJx46upif1BrhYZ0iu8o2z8rMA8GA1UdEwEB/wQF
20+
MAMBAf8wDQYJKoZIhvcNAQELBQADggIBAJi4FwYJz/RotoUpfrLZFf69RoI01Fje
21+
8ITt8SR1Dx/1GTPEuqVVfx0EYtOoH6Gg3FwgSQ9GHRDIa1vkHY5S+FUSOW3pCoZE
22+
/kaLu9bmFxn+GntghvQEor+LzODuZKLXupaGcu1tA4fzyuI4jglVD2sGZtLk//CT
23+
Hd4tOWXo5k1Fj0jMnJq+2Htr8yBeSAO5ZNsvtAjOUU6pfDEwL9bgRzlKKFQQMUYo
24+
6x1FvRDRXWjpzB/H+OSqOaoNLEB9FfEl8I7nn6uTenr5WxjPAOpwjZl9ObB/95xM
25+
p91abKbLQLev5I8npM9G3C/n01l3IzRs7DNHqGJTZO7frGhicD7/jNa+tkSioeJ2
26+
fIMqgDOvQE+gMxs19zw1tsI3+kqX7+ptTkU4Lan5V5ZKGfU8xtcVIlyRk5/yDUI5
27+
1dfQVubs6z07s6De2qa92LFz9l8sT6QuVer+c/wPPhBdMwbzcHyUJIBjFaBpxH86
28+
F7Mr5Zr/+qcbHglAHow1lBqdZzimqGd1koqFRat/pFUFh0iqktMmpl+ZUCjyoQEX
29+
93j8aMU2UQjYM8NJDE2aRculo9OEoqERYFM2m3nHvrtE7iZgddryLNH7ZmC1EquX
30+
MhZJ26GuZ2U4b9dAX858WTv0q1EF5S8KObMlxMU7IDk+cWlSD+puWliwfUKoTR/4
31+
JErSfjCSaRqh
2932
-----END CERTIFICATE-----
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
8b23238088af65cbae6ee9c23821068d896ec1dad081e2a1035ff70866943247
1+
d403930d5296f1515aadd3f730757e7719188b63a276687a3475128b746e4340

0 commit comments

Comments
 (0)