Skip to content

Commit 65f24c3

Browse files
authored
Fix support for basic auth with the schema registry (#94)
* Fix support for basic auth with the schema registry This fixes an issue with supporting basic auth when using the schema registry The plugin currently expects 'schema_registry_secret` and `schema_registry_key` to be set to authenticate with the schema registry. However, while this works during the 'register' phase, the authentication type `basic.auth.credentials.source` is not set when creating the kafka consumer, leading to authentication failures when attempting to deserialize the kafka payload. This commit correctly sets the `basic.auth.credentials.source` to `USER_INFO` when `schema_registry_key` and `schema_registry_secret` are set, and also enables the use of a basic auth embedded in the `schema_registry_url` via `username:password@url` This commit also adds integration tests for auth'ed schema registry integration
1 parent 0465820 commit 65f24c3

12 files changed

+158
-35
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 10.7.7
2+
- Fix: Correct the settings to allow basic auth to work properly, either by setting `schema_registry_key/secret` or embedding username/password in the
3+
url [#94](https://github.com/logstash-plugins/logstash-integration-kafka/pull/94)
4+
15
## 10.7.6
26
- Test: specify development dependency version [#91](https://github.com/logstash-plugins/logstash-integration-kafka/pull/91)
37

kafka_test_setup.sh

+6-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ sleep 10
2727
echo "Downloading Confluent Platform"
2828
curl -s -o build/confluent_platform.tar.gz http://packages.confluent.io/archive/5.5/confluent-community-5.5.1-2.12.tar.gz
2929
mkdir build/confluent_platform && tar xzf build/confluent_platform.tar.gz -C build/confluent_platform --strip-components 1
30+
cp build/confluent_platform/etc/schema-registry/schema-registry.properties build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
31+
echo "authentication.method=BASIC" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
32+
echo "authentication.roles=admin,developer,user,sr-user" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
33+
echo "authentication.realm=SchemaRegistry-Props" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
34+
cp spec/fixtures/jaas.config build/confluent_platform/etc/schema-registry
35+
cp spec/fixtures/pwd build/confluent_platform/etc/schema-registry
3036

3137
echo "Setting up test topics with test data"
3238
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_plain --zookeeper localhost:2181
@@ -46,8 +52,4 @@ cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic lo
4652
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_snappy --broker-list localhost:9092 --compression-codec snappy
4753
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_lz4 --broker-list localhost:9092 --compression-codec lz4
4854

49-
echo "Starting SchemaRegistry"
50-
build/confluent_platform/bin/schema-registry-start build/confluent_platform/etc/schema-registry/schema-registry.properties > /dev/null 2>&1 &
51-
sleep 10
52-
5355
echo "Setup complete, running specs"

kafka_test_teardown.sh

-3
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22
# Setup Kafka and create test topics
33
set -ex
44

5-
echo "Stoppping SchemaRegistry"
6-
build/confluent_platform/bin/schema-registry-stop
7-
85
echo "Unregistering test topics"
96
build/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic 'logstash_integration_.*'
107
build/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic 'topic_avro.*'

lib/logstash/inputs/kafka.rb

+4-1
Original file line numberDiff line numberDiff line change
@@ -433,13 +433,16 @@ def create_consumer(client_id)
433433
if schema_registry_url
434434
props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, Java::io.confluent.kafka.serializers.KafkaAvroDeserializer.java_class)
435435
serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
436-
props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, schema_registry_url.to_s)
436+
props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, schema_registry_url.uri.to_s)
437437
if schema_registry_proxy && !schema_registry_proxy.empty?
438438
props.put(serdes_config::PROXY_HOST, @schema_registry_proxy_host)
439439
props.put(serdes_config::PROXY_PORT, @schema_registry_proxy_port)
440440
end
441441
if schema_registry_key && !schema_registry_key.empty?
442+
props.put(serdes_config::BASIC_AUTH_CREDENTIALS_SOURCE, 'USER_INFO')
442443
props.put(serdes_config::USER_INFO_CONFIG, schema_registry_key + ":" + schema_registry_secret.value)
444+
else
445+
props.put(serdes_config::BASIC_AUTH_CREDENTIALS_SOURCE, 'URL')
443446
end
444447
end
445448
if security_protocol == "SSL"

lib/logstash/plugin_mixins/common.rb

+1-2
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,8 @@ def check_for_schema_registry_connectivity_and_subjects
5353
options[:auth] = {:user => schema_registry_key, :password => schema_registry_secret.value}
5454
end
5555
client = Manticore::Client.new(options)
56-
5756
begin
58-
response = client.get(@schema_registry_url.to_s + '/subjects').body
57+
response = client.get(@schema_registry_url.uri.to_s + '/subjects').body
5958
rescue Manticore::ManticoreException => e
6059
raise LogStash::ConfigurationError.new("Schema registry service doesn't respond, error: #{e.message}")
6160
end

logstash-integration-kafka.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-integration-kafka'
3-
s.version = '10.7.6'
3+
s.version = '10.7.7'
44
s.licenses = ['Apache-2.0']
55
s.summary = "Integration with Kafka - input and output plugins"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+

spec/fixtures/jaas.config

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
SchemaRegistry-Props {
2+
org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
3+
file="build/confluent_platform/etc/schema-registry/pwd"
4+
debug="true";
5+
};

spec/fixtures/pwd

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
fred: OBF:1w8t1tvf1w261w8v1w1c1tvn1w8x,user,admin
2+
barney: changeme,user,developer
3+
admin:admin,admin
4+
betty: MD5:164c88b302622e17050af52c89945d44,user
5+
wilma: CRYPT:adpexzg3FUZAk,admin,sr-user

spec/integration/inputs/kafka_spec.rb

+113-24
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,16 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:)
206206

207207

208208
describe "schema registry connection options" do
209+
schema_registry = Manticore::Client.new
210+
before (:all) do
211+
shutdown_schema_registry
212+
startup_schema_registry(schema_registry)
213+
end
214+
215+
after(:all) do
216+
shutdown_schema_registry
217+
end
218+
209219
context "remote endpoint validation" do
210220
it "should fail if not reachable" do
211221
config = {'schema_registry_url' => 'http://localnothost:8081'}
@@ -232,8 +242,7 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:)
232242
end
233243

234244
after(:each) do
235-
schema_registry_client = Manticore::Client.new
236-
delete_remote_schema(schema_registry_client, SUBJECT_NAME)
245+
delete_remote_schema(schema_registry, SUBJECT_NAME)
237246
end
238247

239248
it "should correctly complete registration phase" do
@@ -264,9 +273,25 @@ def delete_remote_schema(schema_registry_client, subject_name)
264273

265274
# AdminClientConfig = org.alpache.kafka.clients.admin.AdminClientConfig
266275

276+
def startup_schema_registry(schema_registry, auth=false)
277+
system('./stop_schema_registry.sh')
278+
auth ? system('./start_auth_schema_registry.sh') : system('./start_schema_registry.sh')
279+
url = auth ? "http://barney:changeme@localhost:8081" : "http://localhost:8081"
280+
Stud.try(20.times, [Manticore::SocketException, StandardError, RSpec::Expectations::ExpectationNotMetError]) do
281+
expect(schema_registry.get(url).code).to eq(200)
282+
end
283+
end
284+
267285
describe "Schema registry API", :integration => true do
286+
schema_registry = Manticore::Client.new
287+
288+
before(:all) do
289+
startup_schema_registry(schema_registry)
290+
end
268291

269-
let(:schema_registry) { Manticore::Client.new }
292+
after(:all) do
293+
shutdown_schema_registry
294+
end
270295

271296
context 'listing subject on clean instance' do
272297
it "should return an empty set" do
@@ -292,37 +317,58 @@ def delete_remote_schema(schema_registry_client, subject_name)
292317
expect( subjects ).to be_empty
293318
end
294319
end
320+
end
321+
322+
def shutdown_schema_registry
323+
system('./stop_schema_registry.sh')
324+
end
325+
326+
describe "Deserializing with the schema registry", :integration => true do
327+
schema_registry = Manticore::Client.new
328+
329+
shared_examples 'it reads from a topic using a schema registry' do |with_auth|
330+
331+
before(:all) do
332+
shutdown_schema_registry
333+
startup_schema_registry(schema_registry, with_auth)
334+
end
335+
336+
after(:all) do
337+
shutdown_schema_registry
338+
end
295339

296-
context 'use the schema to serialize' do
297340
after(:each) do
298-
expect( schema_registry.delete('http://localhost:8081/subjects/topic_avro-value').code ).to be(200)
341+
expect( schema_registry.delete("#{subject_url}/#{avro_topic_name}-value").code ).to be(200)
299342
sleep 1
300-
expect( schema_registry.delete('http://localhost:8081/subjects/topic_avro-value?permanent=true').code ).to be(200)
343+
expect( schema_registry.delete("#{subject_url}/#{avro_topic_name}-value?permanent=true").code ).to be(200)
301344

302345
Stud.try(3.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
303346
wait(10).for do
304-
subjects = JSON.parse schema_registry.get('http://localhost:8081/subjects').body
347+
subjects = JSON.parse schema_registry.get(subject_url).body
305348
subjects.empty?
306349
end.to be_truthy
307350
end
308351
end
309352

310-
let(:group_id_1) {rand(36**8).to_s(36)}
311-
312-
let(:avro_topic_name) { "topic_avro" }
313-
314-
let(:plain_config) do
315-
{ 'schema_registry_url' => 'http://localhost:8081',
316-
'topics' => [avro_topic_name],
317-
'codec' => 'plain',
318-
'group_id' => group_id_1,
319-
'auto_offset_reset' => 'earliest' }
353+
let(:base_config) do
354+
{
355+
'topics' => [avro_topic_name],
356+
'codec' => 'plain',
357+
'group_id' => group_id_1,
358+
'auto_offset_reset' => 'earliest'
359+
}
320360
end
321361

322-
def delete_topic_if_exists(topic_name)
362+
let(:group_id_1) {rand(36**8).to_s(36)}
363+
364+
def delete_topic_if_exists(topic_name, user = nil, password = nil)
323365
props = java.util.Properties.new
324366
props.put(Java::org.apache.kafka.clients.admin.AdminClientConfig::BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
325-
367+
serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
368+
unless user.nil?
369+
props.put(serdes_config::BASIC_AUTH_CREDENTIALS_SOURCE, 'USER_INFO')
370+
props.put(serdes_config::USER_INFO_CONFIG, "#{user}:#{password}")
371+
end
326372
admin_client = org.apache.kafka.clients.admin.AdminClient.create(props)
327373
topics_list = admin_client.listTopics().names().get()
328374
if topics_list.contains(topic_name)
@@ -331,14 +377,18 @@ def delete_topic_if_exists(topic_name)
331377
end
332378
end
333379

334-
def write_some_data_to(topic_name)
380+
def write_some_data_to(topic_name, user = nil, password = nil)
335381
props = java.util.Properties.new
336382
config = org.apache.kafka.clients.producer.ProducerConfig
337383

338384
serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
339385
props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
340386

341387
props.put(config::BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
388+
unless user.nil?
389+
props.put(serdes_config::BASIC_AUTH_CREDENTIALS_SOURCE, 'USER_INFO')
390+
props.put(serdes_config::USER_INFO_CONFIG, "#{user}:#{password}")
391+
end
342392
props.put(config::KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.java_class)
343393
props.put(config::VALUE_SERIALIZER_CLASS_CONFIG, Java::io.confluent.kafka.serializers.KafkaAvroSerializer.java_class)
344394

@@ -360,11 +410,11 @@ def write_some_data_to(topic_name)
360410
end
361411

362412
it "stored a new schema using Avro Kafka serdes" do
363-
delete_topic_if_exists avro_topic_name
364-
write_some_data_to avro_topic_name
413+
auth ? delete_topic_if_exists(avro_topic_name, user, password) : delete_topic_if_exists(avro_topic_name)
414+
auth ? write_some_data_to(avro_topic_name, user, password) : write_some_data_to(avro_topic_name)
365415

366-
subjects = JSON.parse schema_registry.get('http://localhost:8081/subjects').body
367-
expect( subjects ).to contain_exactly("topic_avro-value")
416+
subjects = JSON.parse schema_registry.get(subject_url).body
417+
expect( subjects ).to contain_exactly("#{avro_topic_name}-value")
368418

369419
num_events = 1
370420
queue = consume_messages(plain_config, timeout: 30, event_count: num_events)
@@ -375,4 +425,43 @@ def write_some_data_to(topic_name)
375425
expect( elem.get("map_field")["inner_field"] ).to eq("inner value")
376426
end
377427
end
428+
429+
context 'with an unauthed schema registry' do
430+
let(:auth) { false }
431+
let(:avro_topic_name) { "topic_avro" }
432+
let(:subject_url) { "http://localhost:8081/subjects" }
433+
let(:plain_config) { base_config.merge!({'schema_registry_url' => "http://localhost:8081"}) }
434+
435+
it_behaves_like 'it reads from a topic using a schema registry', false
436+
end
437+
438+
context 'with an authed schema registry' do
439+
let(:auth) { true }
440+
let(:user) { "barney" }
441+
let(:password) { "changeme" }
442+
let(:avro_topic_name) { "topic_avro_auth" }
443+
let(:subject_url) { "http://#{user}:#{password}@localhost:8081/subjects" }
444+
445+
context 'using schema_registry_key' do
446+
let(:plain_config) do
447+
base_config.merge!({
448+
'schema_registry_url' => "http://localhost:8081",
449+
'schema_registry_key' => user,
450+
'schema_registry_secret' => password
451+
})
452+
end
453+
454+
it_behaves_like 'it reads from a topic using a schema registry', true
455+
end
456+
457+
context 'using schema_registry_url' do
458+
let(:plain_config) do
459+
base_config.merge!({
460+
'schema_registry_url' => "http://#{user}:#{password}@localhost:8081"
461+
})
462+
end
463+
464+
it_behaves_like 'it reads from a topic using a schema registry', true
465+
end
466+
end
378467
end

start_auth_schema_registry.sh

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
# Setup Kafka and create test topics
3+
set -ex
4+
5+
echo "Starting authed SchemaRegistry"
6+
SCHEMA_REGISTRY_OPTS=-Djava.security.auth.login.config=build/confluent_platform/etc/schema-registry/jaas.config build/confluent_platform/bin/schema-registry-start build/confluent_platform/etc/schema-registry/authed-schema-registry.properties > /dev/null 2>&1 &

start_schema_registry.sh

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
# Setup Kafka and create test topics
3+
set -ex
4+
5+
echo "Starting SchemaRegistry"
6+
build/confluent_platform/bin/schema-registry-start build/confluent_platform/etc/schema-registry/schema-registry.properties > /dev/null 2>&1 &

stop_schema_registry.sh

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#!/bin/bash
2+
# Setup Kafka and create test topics
3+
set -ex
4+
5+
echo "Stoppping SchemaRegistry"
6+
build/confluent_platform/bin/schema-registry-stop
7+
sleep 5

0 commit comments

Comments
 (0)