I have created 1 workflow , in which I am creating dynamic tasks depending on input, it creates batch of ids and create tasks out of it. Below is workflow definition
WorkflowVersionUpsert(
workflow_external_id="test_dynamic-0729",
version="1",
workflow_definition=WorkflowDefinitionUpsert(
description="This workflow has two steps",
tasks=
WorkflowTask(
external_id="test_sub_tasks",
parameters=FunctionTaskParameters(
external_id="test_sub_tasks",
data="${workflow.input}"
),
retries=1,
timeout=3600,
depends_on=1],
on_failure = "abortWorkflow",
),
WorkflowTask(
external_id="test_create_sub",
parameters=DynamicTaskParameters(
tasks="${test_sub_tasks.output.response.tasks}"
),
name="Dynamic Task",
description="Executes a list of workflow tasks for subscription creation",
retries=0,
timeout=3600,
depends_on=e"test_sub_tasks"],
on_failure = "abortWorkflow",
)
]
)
Also, I have created function to trigger this workflow. As I want this workflow to fetch new ids and do the required operation, I have scheduled this workflow for 10 minutes. Cron Expression - */10 * * * *
Issues I am facing:
Workflow schedule is inconsistent. sharing snapshot. No manual trigger was done.
Impact : As a part of this workflow, I have step to do clean up of staging table at end. Multiple access to same staging table at same time causing failure as tasks are not following schedule and hence, workflow is not able to proceed further.