# Synchronous Example
import novu_py
from novu_py import Novu
import os
with Novu(
secret_key=os.getenv("NOVU_SECRET_KEY", ""),
) as novu:
res = novu.trigger(trigger_event_request_dto=novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
to={
"subscriber_id": "<id>",
},
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides={
"fcm": {
"data": {
"key": "value",
},
},
},
))
# Handle response
print(res)
The same SDK client can also be used to make asychronous requests by importing asyncio.
# Asynchronous Example
import asyncio
import novu_py
from novu_py import Novu
import os
async def main():
async with Novu(
secret_key=os.getenv("NOVU_SECRET_KEY", ""),
) as novu:
res = await novu.trigger_async(trigger_event_request_dto=novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
to={
"subscriber_id": "<id>",
},
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides={
"fcm": {
"data": {
"key": "value",
},
},
},
))
# Handle response
print(res)
asyncio.run(main())
# Synchronous Example
import novu_py
from novu_py import Novu
import os
with Novu(
secret_key=os.getenv("NOVU_SECRET_KEY", ""),
) as novu:
res = novu.trigger_bulk(bulk_trigger_event_dto={
"events": [
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
to={
"subscriber_id": "<id>",
},
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides={
"fcm": {
"data": {
"key": "value",
},
},
},
),
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
to=[
{
"topic_key": "<value>",
"type": novu_py.TriggerRecipientsTypeEnum.SUBSCRIBER,
},
],
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides={
"fcm": {
"data": {
"key": "value",
},
},
},
),
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
to=[
"SUBSCRIBER_ID",
"SUBSCRIBER_ID",
],
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides={
"fcm": {
"data": {
"key": "value",
},
},
},
),
],
})
# Handle response
print(res)
The same SDK client can also be used to make asychronous requests by importing asyncio.
# Asynchronous Example
import asyncio
import novu_py
from novu_py import Novu
import os
async def main():
async with Novu(
secret_key=os.getenv("NOVU_SECRET_KEY", ""),
) as novu:
res = await novu.trigger_bulk_async(bulk_trigger_event_dto={
"events": [
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
to={
"subscriber_id": "<id>",
},
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides={
"fcm": {
"data": {
"key": "value",
},
},
},
),
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
to=[
{
"topic_key": "<value>",
"type": novu_py.TriggerRecipientsTypeEnum.SUBSCRIBER,
},
],
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides={
"fcm": {
"data": {
"key": "value",
},
},
},
),
novu_py.TriggerEventRequestDto(
workflow_id="workflow_identifier",
to=[
"SUBSCRIBER_ID",
"SUBSCRIBER_ID",
],
payload={
"comment_id": "string",
"post": {
"text": "string",
},
},
overrides={
"fcm": {
"data": {
"key": "value",
},
},
},
),
],
})
# Handle response
print(res)
asyncio.run(main())
# Synchronous Example
from novu_py import Novu
import os
with Novu(
secret_key=os.getenv("NOVU_SECRET_KEY", ""),
) as novu:
res = novu.trigger_broadcast(trigger_event_to_all_request_dto={
"name": "<value>",
"payload": {
"comment_id": "string",
"post": {
"text": "string",
},
},
})
# Handle response
print(res)
The same SDK client can also be used to make asychronous requests by importing asyncio.
# Asynchronous Example
import asyncio
from novu_py import Novu
import os
async def main():
async with Novu(
secret_key=os.getenv("NOVU_SECRET_KEY", ""),
) as novu:
res = await novu.trigger_broadcast_async(trigger_event_to_all_request_dto={
"name": "<value>",
"payload": {
"comment_id": "string",
"post": {
"text": "string",
},
},
})
# Handle response
print(res)
asyncio.run(main())
# Synchronous Example
from novu_py import Novu
import os
with Novu(
secret_key=os.getenv("NOVU_SECRET_KEY", ""),
) as novu:
res = novu.cancel(transaction_id="<id>")
# Handle response
print(res)
The same SDK client can also be used to make asychronous requests by importing asyncio.
# Asynchronous Example
import asyncio
from novu_py import Novu
import os
async def main():
async with Novu(
secret_key=os.getenv("NOVU_SECRET_KEY", ""),
) as novu:
res = await novu.cancel_async(transaction_id="<id>")
# Handle response
print(res)
asyncio.run(main())