3939import javax .ws .rs .core .Response ;
4040import kafka .security .authorizer .AclAuthorizer ;
4141import org .apache .kafka .clients .admin .AdminClientConfig ;
42- import org .apache .kafka .common .security .auth .KafkaPrincipal ;
42+ import org .apache .kafka .common .acl .AccessControlEntry ;
43+ import org .apache .kafka .common .acl .AclBinding ;
44+ import org .apache .kafka .common .acl .AclOperation ;
45+ import org .apache .kafka .common .acl .AclPermissionType ;
46+ import org .apache .kafka .common .resource .PatternType ;
47+ import org .apache .kafka .common .resource .ResourcePattern ;
48+ import org .apache .kafka .common .resource .ResourceType ;
4349import org .apache .kafka .common .security .auth .SecurityProtocol ;
4450import org .apache .kafka .common .serialization .ByteArrayDeserializer ;
4551import org .junit .jupiter .api .AfterEach ;
@@ -151,18 +157,36 @@ public void testConsumerRequest(String quorum) {
151157 // test without acls
152158 verifySubscribeToTopic (true );
153159 // add acls
154- SecureTestUtils . setConsumerAcls (zkConnect , TOPIC_NAME , USERNAME , CONSUMER_GROUP );
160+ setConsumerAcls ();
155161 verifySubscribeToTopic (false );
156162 }
157163
164+ private void setConsumerAcls () {
165+ AclBinding topicAcl =
166+ new AclBinding (
167+ new ResourcePattern (ResourceType .TOPIC , TOPIC_NAME , PatternType .LITERAL ),
168+ new AccessControlEntry (
169+ "User:" + USERNAME , "*" , AclOperation .READ , AclPermissionType .ALLOW ));
170+ AclBinding groupAcl =
171+ new AclBinding (
172+ new ResourcePattern (ResourceType .GROUP , CONSUMER_GROUP , PatternType .LITERAL ),
173+ new AccessControlEntry (
174+ "User:" + USERNAME , "*" , AclOperation .READ , AclPermissionType .ALLOW ));
175+ try {
176+ createAcls (Arrays .asList (topicAcl , groupAcl ), adminProperties ());
177+ } catch (Exception e ) {
178+ throw new RuntimeException (e );
179+ }
180+ }
181+
158182 @ ParameterizedTest (name = TEST_WITH_PARAMETERIZED_QUORUM_NAME )
159183 @ ValueSource (strings = {"zk" })
160184 public void testProducerAuthorization (String quorum ) {
161185 BinaryTopicProduceRequest request = BinaryTopicProduceRequest .create (topicRecords );
162186 // test without any acls
163187 testProduceToAuthorizationError (TOPIC_NAME , request );
164188 // add acls
165- SecureTestUtils . setProduceAcls (zkConnect , TOPIC_NAME , USERNAME );
189+ setProduceAcls ();
166190 testProduceToTopic (
167191 TOPIC_NAME ,
168192 request ,
@@ -173,6 +197,19 @@ public void testProducerAuthorization(String quorum) {
173197 request .toProduceRequest ().getRecords ());
174198 }
175199
200+ private void setProduceAcls () {
201+ AclBinding topicAcl =
202+ new AclBinding (
203+ new ResourcePattern (ResourceType .TOPIC , TOPIC_NAME , PatternType .LITERAL ),
204+ new AccessControlEntry (
205+ "User:" + USERNAME , "*" , AclOperation .WRITE , AclPermissionType .ALLOW ));
206+ try {
207+ createAcls (Arrays .asList (topicAcl ), adminProperties ());
208+ } catch (Exception e ) {
209+ throw new RuntimeException (e );
210+ }
211+ }
212+
176213 private void verifySubscribeToTopic (boolean expectFailure ) {
177214 Response createResponse = createConsumerInstance (CONSUMER_GROUP );
178215 assertOKResponse (createResponse , Versions .KAFKA_V2_JSON );
@@ -224,7 +261,30 @@ protected SecurityProtocol getBrokerSecurityProtocol() {
224261 @ Override
225262 protected void setupAcls () {
226263 // to allow plaintext consumer
227- SecureTestUtils .setConsumerAcls (zkConnect , TOPIC_NAME , KafkaPrincipal .ANONYMOUS .getName (), "*" );
264+ AclBinding topicAcl =
265+ new AclBinding (
266+ new ResourcePattern (ResourceType .TOPIC , TOPIC_NAME , PatternType .LITERAL ),
267+ new AccessControlEntry (
268+ "User:ANONYMOUS" , "*" , AclOperation .READ , AclPermissionType .ALLOW ));
269+ AclBinding groupAcl =
270+ new AclBinding (
271+ new ResourcePattern (ResourceType .GROUP , "*" , PatternType .LITERAL ),
272+ new AccessControlEntry (
273+ "User:ANONYMOUS" , "*" , AclOperation .READ , AclPermissionType .ALLOW ));
274+ try {
275+ createAcls (Arrays .asList (topicAcl , groupAcl ), adminProperties ());
276+ } catch (Exception e ) {
277+ throw new RuntimeException (e );
278+ }
279+ }
280+
281+ private Properties adminProperties () {
282+ Properties adminProperties = new Properties ();
283+ adminProperties .put (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , brokerList );
284+ adminProperties .put ("security.protocol" , "SASL_PLAINTEXT" );
285+ adminProperties .setProperty ("sasl.mechanism" , "PLAIN" );
286+ adminProperties .put ("sasl.jaas.config" , createPlainLoginModule ("admin" , "admin-secret" ));
287+ return adminProperties ;
228288 }
229289
230290 @ AfterEach
0 commit comments