Open Policy Agent (OPA) plugin for Kafka authorization.
- Kafka 2.7.0+
- Java 11 or above
- OPA installed and running on the brokers
Download the latest OPA authorizer plugin jar from Releases (or Maven Central) and put the
file (opa-authorizer-{$VERSION}.jar
) somewhere Kafka recognizes it - this could be directly in Kafka's libs
directory
or in a separate plugin directory pointed out to Kafka at startup, e.g:
CLASSPATH=/usr/local/share/kafka/plugins/*
To activate the opa-kafka-plugin add the authorizer.class.name
to server.properties
authorizer.class.name=org.openpolicyagent.kafka.OpaAuthorizer
The plugin supports the following properties:
Property Key | Example | Default | Description |
---|---|---|---|
opa.authorizer.url |
http://opa:8181/v1/data/kafka/authz/allow |
Name of the OPA policy to query. [required] | |
opa.authorizer.allow.on.error |
false |
false |
Fail-closed or fail-open if OPA call fails. |
opa.authorizer.cache.initial.capacity |
5000 |
5000 |
Initial decision cache size. |
opa.authorizer.cache.maximum.size |
50000 |
50000 |
Max decision cache size. |
opa.authorizer.cache.expire.after.seconds |
3600 |
3600 |
Decision cache expiry in seconds. |
opa.authorizer.metrics.enabled |
true |
false |
Whether or not expose JMX metrics for monitoring. |
super.users |
User:alice;User:bob |
Super users which are always allowed. | |
opa.authorizer.truststore.path |
/path/to/mytruststore.p12 |
Path to the PKCS12 truststore for HTTPS requests to OPA. | |
opa.authorizer.truststore.password |
ichangedit |
changeit |
Password for the truststore. |
opa.authorizer.truststore.type |
PKCS12 , JKS or whatever your JVM supports |
PKCS12 |
Type of the truststore. |
Example structure of input data provided from opa-kafka-plugin to Open Policy Agent.
{
"action": {
"logIfAllowed": true,
"logIfDenied": true,
"operation": "DESCRIBE",
"resourcePattern": {
"name": "alice-topic",
"patternType": "LITERAL",
"resourceType": "TOPIC",
"unknown": false
},
"resourceReferenceCount": 1
},
"requestContext": {
"clientAddress": "192.168.64.1",
"clientInformation": {
"softwareName": "unknown",
"softwareVersion": "unknown"
},
"connectionId": "192.168.64.4:9092-192.168.64.1:58864-0",
"header": {
"data": {
"clientId": "rdkafka",
"correlationId": 5,
"requestApiKey": 3,
"requestApiVersion": 2
},
"headerVersion": 1
},
"listenerName": "SASL_PLAINTEXT",
"principal": {
"name": "alice-consumer",
"principalType": "User"
},
"securityProtocol": "SASL_PLAINTEXT"
}
}
The following table summarizes the supported resource types and operation names.
input.action.resourcePattern.resourceType |
input.action.operation |
---|---|
CLUSTER |
CLUSTER_ACTION |
CLUSTER |
CREATE |
CLUSTER |
DESCRIBE |
GROUP |
READ |
GROUP |
DESCRIPTION |
TOPIC |
CREATE |
TOPIC |
ALTER |
TOPIC |
DELETE |
TOPIC |
DESCRIBE |
TOPIC |
READ |
TOPIC |
WRITE |
TRANSACTIONAL_ID |
DESCRIBE |
TRANSACTIONAL_ID |
WRITE |
These are handled by the method authorizeAction, and passed to OPA with an action, that identifies the accessed resource and the performed operation. patternType is always LITERAL.
Creation of a topic checks for CLUSTER + CREATE. If this is denied, it will check for TOPIC with its name + CREATE.
When doing idepotent write to a topic, and the first request for operation=IDEMPOTENT_WRITE on the resourceType=CLUSTER is denied, the method authorizeByResourceType to check, if the user has the right to write to any topic. If yes, the idempotent write is granted by Kafka's ACL-implementation. To allow for a similar check, it is mapped to OPA with patternType=PREFIXED, resourceType=TOPIC, and name="".
{
"action": {
"logIfAllowed": true,
"logIfDenied": true,
"operation": "DESCRIBE",
"resourcePattern": {
"name": "",
"patternType": "PREFIXED",
"resourceType": "TOPIC",
"unknown": false
},
"resourceReferenceCount": 1
},
...
}
It's likely possible to use all different resource types and operations described in the Kafka API docs: https://kafka.apache.org/24/javadoc/org/apache/kafka/common/acl/AclOperation.html https://kafka.apache.org/24/javadoc/org/apache/kafka/common/resource/ResourceType.html
Protocol | Description |
---|---|
PLAINTEXT |
Un-authenticated, non-encrypted channel |
SASL_PLAINTEXT |
authenticated, non-encrypted channel |
SASL |
authenticated, SSL channel |
SSL |
SSL channel |
More info:
https://kafka.apache.org/24/javadoc/org/apache/kafka/common/security/auth/SecurityProtocol.html
With the sample policy rego you will out of the box get
a structure where an "owner" can one user per type (consumer
, producer
, mgmt
). The owner and user type is separated by -
.
- Username structure:
<owner>-<type>
- Topic name structure:
<owner->.*
Example:
User alice-consumer
will be...
- allowed to consume on topic
alice-topic1
- allowed to consume on topic
alice-topic-test
- denied to produce on any topic
- denied to consume on topic
bob-topic
Using gradle wrapper: ./gradlew clean test shadowJar
The resulting jar (with dependencies embedded) will be named opa-authorizer-{$VERSION}-all.jar
and stored in
build/libs
.
Set log level log4j.logger.org.openpolicyagent=INFO
in config/log4j.properties
Use DEBUG or TRACE for debugging.
In a busy Kafka cluster it might be good to tweak the cache since it may produce a lot of log entries in Open Policy Agent, especially if decision logs are turned on. If the policy isn't dynamically updated very often it's recommended to cache a lot to improve performance and reduce the amount of log entries.
The plugin exposes some metrics that can be useful in operation.
opa.authorizer:type=authorization-result
authorized-request-count
: number of allowed requestsunauthorized-request-count
: number of denied requests
opa.authorizer:type=request-handle
request-to-opa-count
: number of HTTP request sent to OPA to get authorization resultcache-hit-rate
: Cache hit rate. Cache miss rate should be1 - cache-hit-rate
cache-usage-percentage
: the ratio of cache size over maximum cache capacity
For questions, discussions and announcements related to Styra products, services and open source projects, please join the Styra community on Slack!