Skip to content

Commit

Permalink
[FLINK-341082][config] Refactor callers that use deprecated get/setXX…
Browse files Browse the repository at this point in the history
…X in python module
  • Loading branch information
1996fanrui committed Sep 14, 2024
1 parent 4782217 commit 2f9cd9f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
26 changes: 13 additions & 13 deletions flink-python/pyflink/datastream/connectors/tests/test_pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,26 @@ def test_pulsar_source(self):

configuration = get_field_value(pulsar_source.get_java_function(), "sourceConfiguration")
self.assertEqual(
configuration.getString(
configuration.get(
ConfigOptions.key('pulsar.client.serviceUrl')
.string_type()
.no_default_value()._j_config_option), 'pulsar://localhost:6650')
self.assertEqual(
configuration.getString(
configuration.get(
ConfigOptions.key('pulsar.admin.adminUrl')
.string_type()
.no_default_value()._j_config_option), 'http://localhost:8080')
self.assertEqual(
configuration.getString(
configuration.get(
ConfigOptions.key('pulsar.consumer.subscriptionName')
.string_type()
.no_default_value()._j_config_option), 'ff')
test_option = ConfigOptions.key(TEST_OPTION_NAME).boolean_type().no_default_value()
self.assertEqual(
configuration.getBoolean(
configuration.get(
test_option._j_config_option), True)
self.assertEqual(
configuration.getLong(
configuration.get(
ConfigOptions.key('pulsar.source.autoCommitCursorInterval')
.long_type()
.no_default_value()._j_config_option), 1000)
Expand Down Expand Up @@ -107,10 +107,10 @@ def test_source_deprecated_method(self):
.build()
configuration = get_field_value(pulsar_source.get_java_function(), "sourceConfiguration")
self.assertEqual(
configuration.getBoolean(
configuration.get(
test_option._j_config_option), True)
self.assertEqual(
configuration.getLong(
configuration.get(
ConfigOptions.key('pulsar.source.autoCommitCursorInterval')
.long_type()
.no_default_value()._j_config_option), 1000)
Expand Down Expand Up @@ -200,17 +200,17 @@ def test_pulsar_sink(self):
self.assertEqual('pulsar sink: Writer', plan['nodes'][1]['type'])
configuration = get_field_value(pulsar_sink.get_java_function(), "sinkConfiguration")
self.assertEqual(
configuration.getString(
configuration.get(
ConfigOptions.key('pulsar.client.serviceUrl')
.string_type()
.no_default_value()._j_config_option), 'pulsar://localhost:6650')
self.assertEqual(
configuration.getString(
configuration.get(
ConfigOptions.key('pulsar.admin.adminUrl')
.string_type()
.no_default_value()._j_config_option), 'http://localhost:8080')
self.assertEqual(
configuration.getString(
configuration.get(
ConfigOptions.key('pulsar.producer.producerName')
.string_type()
.no_default_value()._j_config_option), 'fo - %s')
Expand All @@ -225,7 +225,7 @@ def test_pulsar_sink(self):
'org.apache.flink.api.common.serialization.SimpleStringSchema'))

self.assertEqual(
configuration.getString(
configuration.get(
ConfigOptions.key('pulsar.sink.deliveryGuarantee')
.string_type()
.no_default_value()._j_config_option), 'at-least-once')
Expand All @@ -242,10 +242,10 @@ def test_pulsar_sink(self):

test_option = ConfigOptions.key(TEST_OPTION_NAME).boolean_type().no_default_value()
self.assertEqual(
configuration.getBoolean(
configuration.get(
test_option._j_config_option), True)
self.assertEqual(
configuration.getLong(
configuration.get(
ConfigOptions.key('pulsar.producer.batchingMaxMessages')
.long_type()
.no_default_value()._j_config_option), 100)
Expand Down
6 changes: 3 additions & 3 deletions flink-python/pyflink/datastream/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ def reduce(self, func: Union[Callable, ReduceFunction]) -> 'DataStream':
gateway = get_gateway()
j_conf = get_j_env_configuration(self._j_data_stream.getExecutionEnvironment())
python_execution_mode = (
j_conf.getString(
j_conf.get(
gateway.jvm.org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE))

class ReduceProcessKeyedProcessFunctionAdapter(KeyedProcessFunction):
Expand Down Expand Up @@ -2783,7 +2783,7 @@ def _get_one_input_stream_operator(data_stream: DataStream,
j_output_type_info = output_type_info.get_java_type_info()
j_conf = get_j_env_configuration(data_stream._j_data_stream.getExecutionEnvironment())
python_execution_mode = (
j_conf.getString(gateway.jvm.org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE))
j_conf.get(gateway.jvm.org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE))

from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
if func_type == UserDefinedDataStreamFunction.PROCESS: # type: ignore
Expand Down Expand Up @@ -2864,7 +2864,7 @@ def _get_two_input_stream_operator(connected_streams: ConnectedStreams,
j_conf = get_j_env_configuration(
connected_streams.stream1._j_data_stream.getExecutionEnvironment())
python_execution_mode = (
j_conf.getString(gateway.jvm.org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE))
j_conf.get(gateway.jvm.org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE))

from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
if func_type == UserDefinedDataStreamFunction.CO_PROCESS: # type: ignore
Expand Down

0 comments on commit 2f9cd9f

Please sign in to comment.