Hi, I’ll try to elaborate on some topics and provide an example based on my understanding on what you’re trying to achieve.
- The Dynamic task will take 1 parameter, a list of workfow task definitions, and try to execute these tasks (if a task definition in this list is invalid, the workflow will fail, like in your example)
- This list of tasks can come from:
- A previous workflow step (a Cognite Function generates and outputs the task list)
- From workflow input (in the run call, the task list required is included directly to the workflow as input)
If you want the Cognite Function to generate the list of tasks, you can do this:
from cognite.client.data_classes import (
WorkflowDefinitionUpsert,
WorkflowTask,
FunctionTaskParameters,
DynamicTaskParameters
)
new_version = WorkflowVersionUpsert(
workflow_external_id="test-dywkflw",
version="1",
workflow_definition=WorkflowDefinitionUpsert(
tasks=[
WorkflowTask(
external_id="dynamic_task_generator",
parameters=FunctionTaskParameters(
external_id="get-gql-task",
),
name="Dynamic Task Generator",
description="Returns a list of workflow tasks",
retries=1,
timeout=3600,
depends_on=[],
),
WorkflowTask(
external_id="dynamic_task",
parameters=DynamicTaskParameters(
tasks="${dynamic_task_generator.output.response.tasks}"
),
name="Dynamic Task",
description="Executes a list of workflow tasks",
retries=1,
timeout=3600,
depends_on=["dynamic_task_generator"],
)
],
description="This workflow has two steps",
),
)
res = client.workflows.versions.upsert(new_version)
Note that I removed the data parameter of the Function task, as this is not needed if the Function is responsible for generating the list of tasks.
The Dynamic task will try to resolve the list of tasks to execute based on the response of the Function ("${dynamic_task_generator.output.response.tasks}"
), and as you can see it expects the Function to have a tasks key in its response, containing the list of valid workflow tasks. This is the crucial part: the Function needs to return a list of valid workflow task definitions, and it needs to return it under the same key that you reference in the Dynamic task in the workflow.
In this example, your Funciton should look something like this:
def handle(client, data, secrets):
tasks = []
return {"tasks": tasks, "some_other_key": 123}
Then execute the workflow without input data.
run = client.workflows.executions.trigger(
workflow_external_id='test-dywkflw', version=1
)
Let me know if you have further questions!