Skip to content

Commit

Permalink
General message engineering in Producer (#559)
Browse files Browse the repository at this point in the history
* General message engineering in Producer

* remove user

* finish retry & isolation

* add comments&exception handler

* add license

* use snake case naming

* fix name & finish telemetry rebuild

* fix style issues

* finish retry&isolation test

* init delay&fifo message

* finish delay & fifo & transaction message & its tests
  • Loading branch information
yanchaomei authored Jul 25, 2023
1 parent 7a73ec9 commit 60115cf
Show file tree
Hide file tree
Showing 14 changed files with 1,270 additions and 58 deletions.
306 changes: 279 additions & 27 deletions python/rocketmq/client.py

Large diffs are not rendered by default.

30 changes: 27 additions & 3 deletions python/rocketmq/client_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,49 @@


class ClientConfig:
"""Client configuration class which holds the settings for a client.
The settings include endpoint configurations, session credential provider and SSL settings.
An instance of this class is used to setup the client with necessary configurations.
"""

def __init__(
self,
endpoints: Endpoints,
session_credentials_provider: SessionCredentialsProvider,
ssl_enabled: bool,
):
#: The endpoints for the client to connect to.
self.__endpoints = endpoints

#: The session credentials provider to authenticate the client.
self.__session_credentials_provider = session_credentials_provider

#: A flag indicating if SSL is enabled for the client.
self.__ssl_enabled = ssl_enabled

#: The request timeout for the client in seconds.
self.request_timeout = 10

@property
def session_credentials_provider(self):
def session_credentials_provider(self) -> SessionCredentialsProvider:
"""The session credentials provider for the client.
:return: the session credentials provider
"""
return self.__session_credentials_provider

@property
def endpoints(self):
def endpoints(self) -> Endpoints:
"""The endpoints for the client to connect to.
:return: the endpoints
"""
return self.__endpoints

@property
def ssl_enabled(self):
def ssl_enabled(self) -> bool:
"""A flag indicating if SSL is enabled for the client.
:return: True if SSL is enabled, False otherwise
"""
return self.__ssl_enabled
19 changes: 18 additions & 1 deletion python/rocketmq/client_id_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,36 @@


class ClientIdEncoder:
"""This class generates a unique client ID for each client based on
hostname, process id, index and the monotonic clock time.
"""

#: The current index for client id generation.
__INDEX = 0

#: The lock used for thread-safe incrementing of the index.
__INDEX_LOCK = threading.Lock()

#: The separator used in the client id string.
__CLIENT_ID_SEPARATOR = "@"

@staticmethod
def __get_and_increment_sequence():
def __get_and_increment_sequence() -> int:
"""Increment and return the current index in a thread-safe manner.
:return: the current index after incrementing it.
"""
with ClientIdEncoder.__INDEX_LOCK:
temp = ClientIdEncoder.__INDEX
ClientIdEncoder.__INDEX += 1
return temp

@staticmethod
def generate() -> str:
"""Generate a unique client ID.
:return: the generated client id
"""
index = ClientIdEncoder.__get_and_increment_sequence()
return (
socket.gethostname()
Expand Down
104 changes: 104 additions & 0 deletions python/rocketmq/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import List

from protocol.definition_pb2 import Broker as ProtoBroker
from protocol.definition_pb2 import Encoding as ProtoEncoding
from protocol.definition_pb2 import MessageQueue as ProtoMessageQueue
from protocol.definition_pb2 import MessageType as ProtoMessageType
from protocol.definition_pb2 import Permission as ProtoPermission
Expand All @@ -25,20 +26,55 @@
from rocketmq.rpc_client import Endpoints


class Encoding(Enum):
"""Enumeration of supported encoding types."""
IDENTITY = 0
GZIP = 1


class EncodingHelper:
"""Helper class for converting encoding types to protobuf."""

@staticmethod
def to_protobuf(mq_encoding):
"""Convert encoding type to protobuf.
:param mq_encoding: The encoding to be converted.
:return: The corresponding protobuf encoding.
"""
if mq_encoding == Encoding.IDENTITY:
return ProtoEncoding.IDENTITY
elif mq_encoding == Encoding.GZIP:
return ProtoEncoding.GZIP


class Broker:
"""Represent a broker entity."""

def __init__(self, broker):
self.name = broker.name
self.id = broker.id
self.endpoints = Endpoints(broker.endpoints)

def to_protobuf(self):
"""Convert the broker to its protobuf representation.
:return: The protobuf representation of the broker.
"""
return ProtoBroker(
Name=self.name, Id=self.id, Endpoints=self.endpoints.to_protobuf()
)


class Resource:
"""Represent a resource entity."""

def __init__(self, name=None, resource=None):
"""Initialize a resource.
:param name: The name of the resource.
:param resource: The resource object.
"""
if resource is not None:
self.namespace = resource.ResourceNamespace
self.name = resource.Name
Expand All @@ -47,22 +83,34 @@ def __init__(self, name=None, resource=None):
self.name = name

def to_protobuf(self):
"""Convert the resource to its protobuf representation.
:return: The protobuf representation of the resource.
"""
return ProtoResource(ResourceNamespace=self.namespace, Name=self.name)

def __str__(self):
return f"{self.namespace}.{self.name}" if self.namespace else self.name


class Permission(Enum):
"""Enumeration of supported permission types."""
NONE = 0
READ = 1
WRITE = 2
READ_WRITE = 3


class PermissionHelper:
"""Helper class for converting permission types to protobuf and vice versa."""

@staticmethod
def from_protobuf(permission):
"""Convert protobuf permission to Permission enum.
:param permission: The protobuf permission to be converted.
:return: The corresponding Permission enum.
"""
if permission == ProtoPermission.READ:
return Permission.READ
elif permission == ProtoPermission.WRITE:
Expand All @@ -76,6 +124,11 @@ def from_protobuf(permission):

@staticmethod
def to_protobuf(permission):
"""Convert Permission enum to protobuf permission.
:param permission: The Permission enum to be converted.
:return: The corresponding protobuf permission.
"""
if permission == Permission.READ:
return ProtoPermission.READ
elif permission == Permission.WRITE:
Expand All @@ -87,29 +140,47 @@ def to_protobuf(permission):

@staticmethod
def is_writable(permission):
"""Check if the permission is writable.
:param permission: The Permission enum to be checked.
:return: True if the permission is writable, False otherwise.
"""
if permission in [Permission.WRITE, Permission.READ_WRITE]:
return True
else:
return False

@staticmethod
def is_readable(permission):
"""Check if the permission is readable.
:param permission: The Permission enum to be checked.
:return: True if the permission is readable, False otherwise.
"""
if permission in [Permission.READ, Permission.READ_WRITE]:
return True
else:
return False


class MessageType(Enum):
"""Enumeration of supported message types."""
NORMAL = 0
FIFO = 1
DELAY = 2
TRANSACTION = 3


class MessageTypeHelper:
"""Helper class for converting message types to protobuf and vice versa."""

@staticmethod
def from_protobuf(message_type):
"""Convert protobuf message type to MessageType enum.
:param message_type: The protobuf message type to be converted.
:return: The corresponding MessageType enum.
"""
if message_type == ProtoMessageType.NORMAL:
return MessageType.NORMAL
elif message_type == ProtoMessageType.FIFO:
Expand All @@ -123,6 +194,11 @@ def from_protobuf(message_type):

@staticmethod
def to_protobuf(message_type):
"""Convert MessageType enum to protobuf message type.
:param message_type: The MessageType enum to be converted.
:return: The corresponding protobuf message type.
"""
if message_type == MessageType.NORMAL:
return ProtoMessageType.NORMAL
elif message_type == MessageType.FIFO:
Expand All @@ -136,7 +212,13 @@ def to_protobuf(message_type):


class MessageQueue:
"""A class that encapsulates a message queue entity."""

def __init__(self, message_queue):
"""Initialize a MessageQueue instance.
:param message_queue: The initial message queue to be encapsulated.
"""
self._topic_resource = Resource(message_queue.topic)
self.queue_id = message_queue.id
self.permission = PermissionHelper.from_protobuf(message_queue.permission)
Expand All @@ -148,12 +230,24 @@ def __init__(self, message_queue):

@property
def topic(self):
"""The topic resource name.
:return: The name of the topic resource.
"""
return self._topic_resource.name

def __str__(self):
"""Get a string representation of the MessageQueue instance.
:return: A string that represents the MessageQueue instance.
"""
return f"{self.broker.name}.{self._topic_resource}.{self.queue_id}"

def to_protobuf(self):
"""Convert the MessageQueue instance to protobuf message queue.
:return: A protobuf message queue that represents the MessageQueue instance.
"""
message_types = [
MessageTypeHelper.to_protobuf(mt) for mt in self.accept_message_types
]
Expand All @@ -167,12 +261,22 @@ def to_protobuf(self):


class TopicRouteData:
"""A class that encapsulates a list of message queues."""

def __init__(self, message_queues: List[definition_pb2.MessageQueue]):
"""Initialize a TopicRouteData instance.
:param message_queues: The initial list of message queues to be encapsulated.
"""
message_queue_list = []
for mq in message_queues:
message_queue_list.append(MessageQueue(mq))
self.__message_queue_list = message_queue_list

@property
def message_queues(self) -> List[MessageQueue]:
"""The list of MessageQueue instances.
:return: The list of MessageQueue instances that the TopicRouteData instance encapsulates.
"""
return self.__message_queue_list
Loading

0 comments on commit 60115cf

Please sign in to comment.