|
10 | 10 |
|
11 | 11 |
|
12 | 12 | class CustomizeProcessorCallback(ClientCallback): |
13 | | - """ |
14 | | - A client callback that dynamically inserts a dummy processor to the list of inbound processors. |
15 | | - """ |
| 13 | + """Insert a dummy processor into inbound and outbound pipelines before they run.""" |
| 14 | + |
| 15 | + def _insert_dummy(self, client, processor_pipeline, direction: str) -> None: |
| 16 | + """Prepend the dummy processor if a processor pipeline is available.""" |
| 17 | + if processor_pipeline is None or not hasattr(processor_pipeline, "processors"): |
| 18 | + logging.warning( |
| 19 | + "[%s] No %s processor pipeline available; skip customization.", |
| 20 | + client, |
| 21 | + direction, |
| 22 | + ) |
| 23 | + return |
16 | 24 |
|
17 | | - def on_inbound_process(self, client, inbound_processor): |
18 | | - """ |
19 | | - Insert a dummy processor to the list of inbound processors. |
20 | | - """ |
21 | 25 | logging.info( |
22 | | - "[%s] Current list of inbound processors: %s.", |
| 26 | + "[%s] Current list of %s processors: %s.", |
23 | 27 | client, |
24 | | - inbound_processor.processors, |
| 28 | + direction, |
| 29 | + processor_pipeline.processors, |
25 | 30 | ) |
26 | 31 | customized_processor = DummyProcessor( |
27 | 32 | client_id=client.client_id, |
28 | 33 | current_round=client.current_round, |
29 | 34 | name="DummyProcessor", |
30 | 35 | ) |
31 | | - inbound_processor.processors.insert(0, customized_processor) |
| 36 | + processor_pipeline.processors.insert(0, customized_processor) |
32 | 37 |
|
33 | 38 | logging.info( |
34 | | - "[%s] List of inbound processors after modification: %s.", |
| 39 | + "[%s] List of %s processors after modification: %s.", |
35 | 40 | client, |
36 | | - inbound_processor.processors, |
| 41 | + direction, |
| 42 | + processor_pipeline.processors, |
37 | 43 | ) |
38 | 44 |
|
39 | | - def on_outbound_process(self, client, outbound_processor): |
| 45 | + def on_inbound_received(self, client, inbound_processor): |
40 | 46 | """ |
41 | | - Insert a dummy processor to the list of outbound processors. |
| 47 | + Insert a dummy processor before inbound processors start to run. |
42 | 48 | """ |
43 | | - logging.info( |
44 | | - "[%s] Current list of outbound processors: %s.", |
45 | | - client, |
46 | | - outbound_processor.processors, |
47 | | - ) |
48 | | - customized_processor = DummyProcessor( |
49 | | - client_id=client.client_id, |
50 | | - current_round=client.current_round, |
51 | | - name="DummyProcessor", |
52 | | - ) |
53 | | - outbound_processor.processors.insert(0, customized_processor) |
| 49 | + self._insert_dummy(client, inbound_processor, "inbound") |
54 | 50 |
|
55 | | - logging.info( |
56 | | - "[%s] List of outbound processors after modification: %s.", |
57 | | - client, |
58 | | - outbound_processor.processors, |
59 | | - ) |
| 51 | + def on_outbound_ready(self, client, report, outbound_processor): |
| 52 | + """ |
| 53 | + Insert a dummy processor before outbound processors start to run. |
| 54 | + """ |
| 55 | + self._insert_dummy(client, outbound_processor, "outbound") |
0 commit comments