Skip to content

Commit

Permalink
Merge pull request #868 from roboflow/opcua-block-updates
Browse files Browse the repository at this point in the history
Updated the OPC UA Writer Block Descriptions and Examples
  • Loading branch information
PawelPeczek-Roboflow authored Dec 13, 2024
2 parents 4acd85b + ad866bb commit d24d6f6
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 74 deletions.
181 changes: 108 additions & 73 deletions inference/enterprise/workflows/enterprise_blocks/sinks/opc_writer/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,47 +32,44 @@

BLOCK_TYPE = "roboflow_enterprise/opc_writer_sink@v1"
LONG_DESCRIPTION = """
The **OPC Writer** block enables sending a data from Workflow into OPC server
by setting value of OPC object under OPC namespace.
The **OPC UA Writer** block enables you to write data to a variable on an OPC UA server, leveraging the
[asyncua](https://github.com/FreeOpcUa/opcua-asyncio) library for seamless communication.
This block is making use of [asyncua](https://github.com/FreeOpcUa/opcua-asyncio) in order to
perform communication with OPC servers.
### Supported Data Types
This block supports writing the following data types to OPC UA server variables:
- Numbers (integers, floats)
- Booleans
- Strings
Block will attempt to send:
* numbers (integers, floats)
* booleans
* strings
Type of sent data must match type of OPC object.
**Note:** The data type you send must match the expected type of the target OPC UA variable.
### Cooldown
The block accepts `cooldown_seconds` (which **defaults to `5` seconds**) to prevent unintended bursts of
traffic sent to OPC server. Please adjust it according to your needs, setting `0` indicate no cooldown.
During cooldown period, consecutive runs of the step will cause `throttling_status` output to be set `True`
and no data will be sent.
### Async execution
Configure the `fire_and_forget` property. Set it to True if you want the data to be sent in the background,
allowing the Workflow to proceed without waiting on data to be sent. In this case you will not be able to rely on
`error_status` output which will always be set to `False`, so we **recommend setting the `fire_and_forget=False` for
debugging purposes**.
### Disabling notifications based on runtime parameter
Sometimes it would be convenient to manually disable the **OPC Writer** block. This can be achieved by
setting `disable_sink` flag to hold reference to Workflow input. With such setup, caller cat disable the sink
by sending agreed input parameter.
!!! warning "Cooldown limitations"
Current implementation of cooldown is limited to video processing - using this block in context of a
Workflow that is run behind HTTP service (Roboflow Hosted API, Dedicated Deployment or self-hosted
`inference` server) will have no effect with regards to cooldown timer.
To prevent excessive traffic to the OPC UA server, the block includes a `cooldown_seconds` parameter,
which defaults to **5 seconds**. During the cooldown period:
- Consecutive executions of the block will set the `throttling_status` output to `True`.
- No data will be sent to the server.
You can customize the `cooldown_seconds` parameter based on your needs. Setting it to `0` disables
the cooldown entirely.
### Asynchronous Execution
The block provides a `fire_and_forget` property for asynchronous execution:
- **When `fire_and_forget=True`**: The block sends data in the background, allowing the Workflow to
proceed immediately. However, the `error_status` output will always be set to `False`, so we do not
recommend this mode for debugging.
- **When `fire_and_forget=False`**: The block waits for confirmation before proceeding, ensuring errors
are captured in the `error_status` output.
### Disabling the Block Dynamically
You can disable the **OPC UA Writer** block during execution by linking the `disable_sink` parameter
to a Workflow input. By providing a specific input value, you can dynamically prevent the block from
executing.
### Cooldown Limitations
!!! warning "Cooldown Limitations"
The cooldown feature is optimized for workflows involving video processing.
- In other contexts, such as Workflows triggered by HTTP services (e.g., Roboflow Hosted API,
Dedicated Deployment, or self-hosted `Inference` server), the cooldown timer will not be applied effectively.
"""

QUERY_PARAMS_KIND = [
Expand Down Expand Up @@ -103,76 +100,89 @@
class BlockManifest(WorkflowBlockManifest):
model_config = ConfigDict(
json_schema_extra={
"name": "OPC Writer Sink",
"name": "OPC UA Writer Sink",
"version": "v1",
"short_description": "Pushes data to OPC server, this block is making use of [asyncua](https://github.com/FreeOpcUa/opcua-asyncio)",
"short_description": "Writes data to an OPC UA server using the [asyncua](https://github.com/FreeOpcUa/opcua-asyncio) library for communication.",
"long_description": LONG_DESCRIPTION,
"license": "Roboflow Enterprise License",
"block_type": "sink",
}
)
type: Literal[BLOCK_TYPE]
url: Union[Selector(kind=[STRING_KIND]), str] = Field(
description="URL of OPC server where data should be pushed to",
examples=["$inputs.opc_url", "opc.tcp://localhost:4840/freeopcua/server/"],
description="URL of the OPC UA server to which data will be written.",
examples=["opc.tcp://localhost:4840/freeopcua/server/", "$inputs.opc_url"],
)
namespace: Union[Selector(kind=[STRING_KIND]), str] = Field(
description="OPC namespace",
examples=["$inputs.opc_namespace", "http://examples.freeopcua.github.io"],
description="The OPC UA namespace URI or index used to locate objects and variables.",
examples=["http://examples.freeopcua.github.io", "2", "$inputs.opc_namespace"],
)
user_name: Optional[Union[str, Selector(kind=[STRING_KIND])]] = Field(
default=None,
description="Optional user name to be used for authentication when connecting to OPC server",
examples=["$inputs.opc_user_name", "John"],
description="Optional username for authentication when connecting to the OPC UA server.",
examples=["John", "$inputs.opc_user_name"],
)
password: Optional[Union[str, Selector(kind=[STRING_KIND])]] = Field(
default=None,
description="Optional password to be used for authentication when connecting to OPC server",
examples=["$inputs.opc_password", "secret"],
description="Optional password for authentication when connecting to the OPC UA server.",
examples=["secret", "$inputs.opc_password"],
)
object_name: Union[Selector(kind=[STRING_KIND]), str] = Field(
description="Name of object to be searched in namespace",
examples=["$inputs.opc_object_name", "Line1"],
description="The name of the target object in the namespace to search for.",
examples=["Line1", "$inputs.opc_object_name"],
)
variable_name: Union[Selector(kind=[STRING_KIND]), str] = Field(
description="Name of variable to be set under found object",
description="The name of the variable within the target object to be updated.",
examples=[
"$inputs.opc_variable_name",
"InspectionSuccess",
"$inputs.opc_variable_name",
],
)
value: Union[
Selector(kind=[BOOLEAN_KIND, FLOAT_KIND, INTEGER_KIND, STRING_KIND]),
Union[bool, float, int, str],
str,
bool,
float,
int,
] = Field(
description="value to be written into variable",
examples=["$other_block.result", "running"],
description="The value to be written to the target variable on the OPC UA server.",
examples=["running", "$other_block.result"],
)
value_type: Union[
Selector(kind=[STRING_KIND]),
Literal["Boolean", "Float", "Integer", "String"],
] = Field(
default="String",
description="The type of the value to be written to the target variable on the OPC UA server.",
examples=["Boolean", "Float", "Integer", "String"],
json_schema_extra={
"always_visible": True,
},
)
timeout: Union[int, Selector(kind=[INTEGER_KIND])] = Field(
default=2,
description="Number of seconds to wait for OPC server to respond",
examples=["$inputs.timeout", 10],
description="The number of seconds to wait for a response from the OPC UA server before timing out.",
examples=[10, "$inputs.timeout"],
)
fire_and_forget: Union[bool, Selector(kind=[BOOLEAN_KIND])] = Field(
default=True,
description="Boolean flag dictating if sink is supposed to be executed in the background, "
"not waiting on status of registration before end of workflow run. Use `True` if best-effort "
"registration is needed, use `False` while debugging and if error handling is needed",
examples=["$inputs.fire_and_forget", True],
description="Determines if the block should execute asynchronously. Set to `True` for best-effort updates "
"where the Workflow continues without waiting for confirmation. Use `False` during debugging or when error "
"handling is required.",
examples=[True, "$inputs.fire_and_forget"],
)
disable_sink: Union[bool, Selector(kind=[BOOLEAN_KIND])] = Field(
default=False,
description="boolean flag that can be also reference to input - to arbitrarily disable "
"data collection for specific request",
description="Boolean flag to disable the block for specific requests. Can be dynamically linked to a Workflow input.",
examples=[False, "$inputs.disable_opc_writers"],
)
cooldown_seconds: Union[int, Selector(kind=[INTEGER_KIND])] = Field(
default=5,
description="Number of seconds to wait until next value update can be sent",
description="The minimum number of seconds to wait between consecutive updates to the OPC UA server.",
json_schema_extra={
"always_visible": True,
},
examples=["$inputs.cooldown_seconds", 10],
examples=[10, "$inputs.cooldown_seconds"],
)

@classmethod
Expand Down Expand Up @@ -216,11 +226,12 @@ def run(
password: Optional[str],
object_name: str,
variable_name: str,
value: Union[bool, float, int, str],
timeout: int,
fire_and_forget: bool,
disable_sink: bool,
cooldown_seconds: int,
value: Union[str, bool, float, int],
value_type: Literal["Boolean", "Float", "Integer", "String"] = "String",
timeout: int = 2,
fire_and_forget: bool = True,
disable_sink: bool = False,
cooldown_seconds: int = 5,
) -> BlockResult:
if disable_sink:
return {
Expand All @@ -242,6 +253,27 @@ def run(
"error_status": False,
"message": "Sink cooldown applies",
}

value_str = str(value)
try:
if value_type in [BOOLEAN_KIND, "Boolean"]:
decoded_value = value_str.strip().lower() in ("true", "1")
elif value_type in [FLOAT_KIND, "Float"]:
decoded_value = float(value_str)
elif value_type in [INTEGER_KIND, "Integer"]:
decoded_value = int(value_str)
elif value_type in [STRING_KIND, "String"]:
decoded_value = value_str
else:
raise ValueError(f"Unsupported value type: {value_type}")
except ValueError as exc:
return {
"disabled": False,
"error_status": True,
"throttling_status": False,
"message": f"Failed to convert value: {exc}",
}

opc_writer_handler = partial(
opc_connect_and_write_value,
url=url,
Expand All @@ -250,7 +282,7 @@ def run(
password=password,
object_name=object_name,
variable_name=variable_name,
value=value,
value=decoded_value,
timeout=timeout,
)
self._last_notification_fired = datetime.now()
Expand All @@ -260,15 +292,15 @@ def run(
"disabled": False,
"error_status": False,
"throttling_status": False,
"message": "Writing to OPC in the background task",
"message": "Writing to the OPC UA server in the background task",
}
if fire_and_forget and self._thread_pool_executor:
self._thread_pool_executor.submit(opc_writer_handler)
return {
"disabled": False,
"error_status": False,
"throttling_status": False,
"message": "Writing to OPC in the background task",
"message": "Writing to the OPC UA server in the background task",
}
error_status, message = opc_writer_handler()
return {
Expand Down Expand Up @@ -338,7 +370,10 @@ def _opc_connect_and_write_value(
)

try:
nsidx = get_namespace_index(namespace)
if namespace.isdigit():
nsidx = int(namespace)
else:
nsidx = get_namespace_index(namespace)
except ValueError as exc:
client.disconnect()
raise Exception(f"WRONG NAMESPACE ERROR: {exc}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
{"type": "InferenceParameter", "name": "opc_object_name"},
{"type": "InferenceParameter", "name": "opc_variable_name"},
{"type": "InferenceParameter", "name": "opc_value"},
{"type": "InferenceParameter", "name": "opc_value_type"},
],
"steps": [
{
Expand All @@ -38,6 +39,7 @@
"object_name": "$inputs.opc_object_name",
"variable_name": "$inputs.opc_variable_name",
"value": "$inputs.opc_value",
"value_type": "$inputs.opc_value_type",
"fire_and_forget": False,
}
],
Expand Down Expand Up @@ -124,7 +126,10 @@ def _opc_connect_and_read_value(
)

try:
nsidx = get_namespace_index(namespace)
if namespace.isdigit():
nsidx = int(namespace)
else:
nsidx = get_namespace_index(namespace)
except ValueError as exc:
client.disconnect()
raise Exception(f"WRONG NAMESPACE ERROR: {exc}")
Expand Down Expand Up @@ -215,6 +220,7 @@ def test_workflow_with_opc_writer_sink() -> None:
"opc_object_name": opc_object_name,
"opc_variable_name": opc_variable_name,
"opc_value": 41,
"opc_value_type": "Integer",
}
)

Expand Down

0 comments on commit d24d6f6

Please sign in to comment.