From 12c7c7ec214d120dd821a5595652871279924239 Mon Sep 17 00:00:00 2001
From: nick <nickjobszh@gmail.com>
Date: Wed, 30 Dec 2020 19:56:10 +0100
Subject: [PATCH 1/2] save work

---
 configuration.c | 17 +++++++++--------
 php_kafka_int.h |  8 +++++++-
 topic.c         | 20 ++++++++++++++++----
 topic.stub.php  |  4 ++--
 topic_arginfo.h |  4 +++-
 5 files changed, 37 insertions(+), 16 deletions(-)

diff --git a/configuration.c b/configuration.c
index f8e88cb..328cafd 100644
--- a/configuration.c
+++ b/configuration.c
@@ -125,29 +125,30 @@ static void kafka_conf_error_cb(rd_kafka_t *rk, int err, const char *reason, voi
     zval_ptr_dtor(&args[2]);
 }
 
-static void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
+void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
 {
     kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
-    zval args[2];
-
-    if (!opaque) {
-        return;
-    }
+    zval args[3];
 
-    if (!cbs->dr_msg) {
+    if (!opaque || !cbs->dr_msg) {
         return;
     }
 
     ZVAL_NULL(&args[0]);
     ZVAL_NULL(&args[1]);
+    ZVAL_NULL(&args[2]);
 
     ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0);
     kafka_message_new(&args[1], msg);
+    if (NULL != msg->_private) {
+        ZVAL_ZVAL(&args[2], msg->_private, 1, 0);
+    }
 
-    kafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 2, args);
+    kafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 3, args);
 
     zval_ptr_dtor(&args[0]);
     zval_ptr_dtor(&args[1]);
+    zval_ptr_dtor(&args[2]);
 }
 
 static int kafka_conf_stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
diff --git a/php_kafka_int.h b/php_kafka_int.h
index b00eecf..46a45d6 100644
--- a/php_kafka_int.h
+++ b/php_kafka_int.h
@@ -61,13 +61,16 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta
 
 #else // PHP 7
 
+#define IS_MIXED 16
+
 #define Z_KAFKA_OBJ zval
 
 #define Z_KAFKA_PROP_OBJ(object) object
 
 #define kafka_get_debug_object(type, object) get_object(object)
 
-#define ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(pass_by_ref, name, type_hint, allow_null, default_value) ZEND_ARG_INFO(pass_by_ref, name)
+#define ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(pass_by_ref, name, type_hint, allow_null, default_value) \
+    ZEND_ARG_INFO(pass_by_ref, name)
 
 #define Z_PARAM_ARRAY_HT_OR_NULL(dest) \
 	Z_PARAM_ARRAY_HT_EX(dest, 1, 0)
@@ -81,6 +84,9 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta
 #define Z_PARAM_STRING_OR_NULL(dest, dest_len) \
 	Z_PARAM_STRING_EX(dest, dest_len, 1, 0)
 
+#define Z_PARAM_ZVAL_OR_NULL(dest) \
+	Z_PARAM_ZVAL_EX(dest, 1, 0)
+
 #endif
 
 #ifdef PHP_WIN32
diff --git a/topic.c b/topic.c
index cfc2d5d..fc44bd5 100644
--- a/topic.c
+++ b/topic.c
@@ -80,13 +80,15 @@ ZEND_METHOD(Kafka_ProducerTopic, produce)
     int ret;
     rd_kafka_resp_err_t err;
     kafka_topic_object *intern;
+    zval *opaque = NULL;
 
-    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 4)
+    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 5)
         Z_PARAM_LONG(partition)
         Z_PARAM_LONG(msgflags)
         Z_PARAM_OPTIONAL
         Z_PARAM_STRING_OR_NULL(payload, payload_len)
         Z_PARAM_STRING_OR_NULL(key, key_len)
+        Z_PARAM_ZVAL_OR_NULL(opaque)
     ZEND_PARSE_PARAMETERS_END();
 
     if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
@@ -99,9 +101,13 @@ ZEND_METHOD(Kafka_ProducerTopic, produce)
         return;
     }
 
+    if (NULL != opaque) {
+        Z_ADDREF_P(opaque);
+    }
+
     intern = get_kafka_topic_object(getThis());
 
-    ret = rd_kafka_produce(intern->rkt, partition, msgflags | RD_KAFKA_MSG_F_COPY, payload, payload_len, key, key_len, NULL);
+    ret = rd_kafka_produce(intern->rkt, partition, msgflags | RD_KAFKA_MSG_F_COPY, payload, payload_len, key, key_len, opaque);
 
     if (ret == -1) {
         err = rd_kafka_last_error();
@@ -127,12 +133,12 @@ ZEND_METHOD(Kafka_ProducerTopic, producev)
     HashTable *headersParam = NULL;
     HashPosition headersParamPos;
     char *header_key;
-    zval *header_value;
+    zval *header_value, *opaque = NULL;
     rd_kafka_headers_t *headers;
     zend_long timestamp_ms = 0;
     zend_bool timestamp_ms_is_null = 0;
 
-    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 6)
+    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 7)
         Z_PARAM_LONG(partition)
         Z_PARAM_LONG(msgflags)
         Z_PARAM_OPTIONAL
@@ -140,6 +146,7 @@ ZEND_METHOD(Kafka_ProducerTopic, producev)
         Z_PARAM_STRING_OR_NULL(key, key_len)
         Z_PARAM_ARRAY_HT_OR_NULL(headersParam)
         Z_PARAM_LONG_OR_NULL(timestamp_ms, timestamp_ms_is_null)
+        Z_PARAM_ZVAL_OR_NULL(opaque)
     ZEND_PARSE_PARAMETERS_END();
 
     if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) {
@@ -152,6 +159,10 @@ ZEND_METHOD(Kafka_ProducerTopic, producev)
         return;
     }
 
+    if (NULL != opaque) {
+        Z_ADDREF_P(opaque);
+    }
+
     if (timestamp_ms_is_null == 1) {
         timestamp_ms = 0;
     }
@@ -191,6 +202,7 @@ ZEND_METHOD(Kafka_ProducerTopic, producev)
             RD_KAFKA_V_KEY(key, key_len),
             RD_KAFKA_V_TIMESTAMP(timestamp_ms),
             RD_KAFKA_V_HEADERS(headers),
+            RD_KAFKA_V_OPAQUE(opaque),
             RD_KAFKA_V_END
     );
 
diff --git a/topic.stub.php b/topic.stub.php
index bcb90e6..ef137cf 100644
--- a/topic.stub.php
+++ b/topic.stub.php
@@ -18,7 +18,7 @@ class ProducerTopic extends Topic
 {
     private function __construct() {}
 
-    public function produce(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null): void {}
+    public function produce(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, mixed $opaque = null): void {}
 
-    public function producev(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, ?array $headers = null, ?int $timestampMs = null): void {}
+    public function producev(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, ?array $headers = null, ?int $timestampMs = null, mixed $opaque = null): void {}
 }
diff --git a/topic_arginfo.h b/topic_arginfo.h
index 751b307..016e9b7 100644
--- a/topic_arginfo.h
+++ b/topic_arginfo.h
@@ -1,5 +1,5 @@
 /* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 0c99d0aedca801c7ce5244af6f91e9c9af3685cb */
+ * Stub hash: 42cd23c50573fc0f32161ddf8bc6650b7f4a3657 */
 
 ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Kafka_Topic_getName, 0, 0, IS_STRING, 0)
 ZEND_END_ARG_INFO()
@@ -14,6 +14,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Kafka_ProducerTopic_produc
 	ZEND_ARG_TYPE_INFO(0, msgFlags, IS_LONG, 0)
 	ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, payload, IS_STRING, 1, "null")
 	ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, key, IS_STRING, 1, "null")
+	ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, opaque, IS_MIXED, 0, "null")
 ZEND_END_ARG_INFO()
 
 ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Kafka_ProducerTopic_producev, 0, 2, IS_VOID, 0)
@@ -23,6 +24,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Kafka_ProducerTopic_produc
 	ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, key, IS_STRING, 1, "null")
 	ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, headers, IS_ARRAY, 1, "null")
 	ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, timestampMs, IS_LONG, 1, "null")
+	ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, opaque, IS_MIXED, 0, "null")
 ZEND_END_ARG_INFO()
 
 

From d2bc62f198fa800575316941a78b0ad29b8b8f70 Mon Sep 17 00:00:00 2001
From: nick <nickjobszh@gmail.com>
Date: Sat, 24 Apr 2021 20:41:48 +0200
Subject: [PATCH 2/2] add test

---
 tests/produce_with_opaque.phpt | 61 ++++++++++++++++++++++++++++++++++
 1 file changed, 61 insertions(+)
 create mode 100644 tests/produce_with_opaque.phpt

diff --git a/tests/produce_with_opaque.phpt b/tests/produce_with_opaque.phpt
new file mode 100644
index 0000000..fffa753
--- /dev/null
+++ b/tests/produce_with_opaque.phpt
@@ -0,0 +1,61 @@
+--TEST--
+Produce, consume
+--SKIPIF--
+<?php
+require __DIR__ . '/integration-tests-check.php';
+--FILE--
+<?php
+require __DIR__ . '/integration-tests-check.php';
+
+$conf = new SimpleKafkaClient\Configuration();
+$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
+
+$conf->setDrMsgCb(function (SimpleKafkaClient\Producer $kafka, SimpleKafkaClient\Message $message, $opaque) {
+    if (RD_KAFKA_RESP_ERR_NO_ERROR !== $message->err) {
+        $errorStr = rd_kafka_err2str($message->err);
+
+        echo sprintf('Message FAILED (%s, %s) to send with payload => %s', $message->err, $errorStr, $message->payload) . PHP_EOL;
+    } else {
+        if (false === is_string($opaque)) {
+            $opaque = 'opaque was already freed';
+        }
+
+        echo sprintf('Message opaque: %s', $opaque) . PHP_EOL;
+    }
+});
+
+$producer = new SimpleKafkaClient\Producer($conf);
+$topic = $producer->getTopicHandle('pure-php-test-topic');
+$amountTestMessages = 10;
+
+for ($i = 0; $i < $amountTestMessages; ++$i) {
+    $topic->producev(
+        RD_KAFKA_PARTITION_UA,
+        RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full
+        sprintf('test message-%d',$i),
+        sprintf('test-key-%d', $i),
+        [
+            'some' => sprintf('header value %d', $i)
+        ],
+        null,
+        "opaque $i"
+    );
+
+    $producer->poll(0);
+}
+
+$result = $producer->flush(20000);
+if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
+    echo 'Was not able to shutdown within 20s. Messages might be lost!' . PHP_EOL;
+}
+--EXPECT--
+Message key test-key-0 and opaque: opaque 0
+Message key test-key-1 and opaque: opaque 1
+Message key test-key-2 and opaque: opaque 2
+Message key test-key-3 and opaque: opaque 3
+Message key test-key-4 and opaque: opaque 4
+Message key test-key-5 and opaque: opaque 5
+Message key test-key-6 and opaque: opaque 6
+Message key test-key-7 and opaque: opaque 7
+Message key test-key-8 and opaque: opaque 8
+Message key test-key-9 and opaque: opaque 9