Skip to content

Commit

Permalink
Used transaction disposition in python
Browse files Browse the repository at this point in the history
  • Loading branch information
astitcher committed Feb 20, 2025
1 parent d4ba607 commit 0a50479
Showing 1 changed file with 6 additions and 17 deletions.
23 changes: 6 additions & 17 deletions python/proton/_reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,7 @@
import re
import os
import queue
from typing import Any, Dict, Iterator, Optional, List, Union, Callable, TYPE_CHECKING, Tuple, Type

try:
from typing import Literal
except ImportError:
# https://www.python.org/dev/peps/pep-0560/#class-getitem
class GenericMeta(type):
def __getitem__(self, item):
pass

class Literal(metaclass=GenericMeta):
pass
from typing import Any, Callable, Dict, Iterator, List, Literal, Optional, Union, TYPE_CHECKING, Tuple, Type

import time
import traceback
Expand All @@ -44,7 +33,7 @@ class Literal(metaclass=GenericMeta):
from cproton import PN_ACCEPTED, PN_EVENT_NONE

from ._data import Described, symbol, ulong
from ._delivery import Delivery, GeneralDisposition
from ._delivery import Delivery, TransactionalDisposition
from ._endpoints import Connection, Endpoint, Link, Session, Terminus
from ._events import Collector, EventType, EventBase, Event
from ._exceptions import SSLUnavailable
Expand Down Expand Up @@ -569,11 +558,11 @@ def abort(self) -> None:
self.discharge(True)

def declare(self) -> None:
self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
self._declare = self._send_ctrl(symbol('amqp:declare:list'), [None])

def discharge(self, failed: bool) -> None:
self.failed = failed
self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
self._discharge = self._send_ctrl(symbol('amqp:discharge:list'), [self.id, failed])

def _send_ctrl(self, descriptor: 'PythonAMQPData', value: 'PythonAMQPData') -> Delivery:
delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
Expand All @@ -595,7 +584,7 @@ def send(
:return: Delivery object for this message.
"""
dlv = sender.send(msg, tag=tag)
dlv.local = GeneralDisposition(0x34, [self.id])
dlv.local = TransactionalDisposition(self.id)
dlv.update()
return dlv

Expand All @@ -613,7 +602,7 @@ def accept(self, delivery: Delivery) -> None:

def update(self, delivery: Delivery, state: Optional[ulong] = None) -> None:
if state:
delivery.local = GeneralDisposition(0x34, [self.id, Described(ulong(state), [])])
delivery.local = TransactionalDisposition( self.id, state)
delivery.update()

def _release_pending(self):
Expand Down

0 comments on commit 0a50479

Please sign in to comment.