Skip to main content

I want to create subflows after output from first function. As number of task is not fixed, curious if we can use dynamic task within workflows.

Can we do it via SDK, 

WorkflowTask(

               external_id="subtask1",

               parameters=DynamicTaskParameters(

                  "tasks" = "${task1.output.tasks}",

               ),

               depends_on = /"task1"]

)

Is there example how task will be created and get the input from function ? I assume ‘tasks’ key needs to be send from previous function. Example will be appreciated 

Hi @Rimmi Anand, thanks for reaching out. Here’s a code example. Let me know if you have further questions!

First, we’ll need a Cognite Function that generates the list of tasks that will be executed dynamically in the workflow. To keep it simple, below is an example Function that expects as input a dict with a “tasks” key that should contain the list of tasks to be returned. You can adapt the Function to contain whatever logic needed to generate the desired list of tasks to execute.

 

def handle(client, data, secrets):
if not data.get("tasks"):
print("'tasks' not in input data.")
return
return data

external_id = "dynamic_task_generator"
function = client.functions.create(
name=external_id,
external_id=external_id,
function_handle=handle,
)

 

Second, we’ll need the workflow definition with a Funciton task and a Dynamic task.

Note:

  • When we trigger a workflow execution, we can pass input data. In the example below, this input is subsequently injected into the Function task (data="${workflow.input}")
  • The Function returns the list of tasks (according to the logic in the Function defined above).
  • The output of the Function is injected into the Dynamic task (tasks="${dynamic_task_generator.output.response.tasks}").
from cognite.client.data_classes import (
WorkflowDefinitionUpsert,
WorkflowTask,
FunctionTaskParameters,
DynamicTaskParameters
)

workflow_definiton = WorkflowDefinitionUpsert(
description="A Workflow with a Dynamic Task",
tasks=k
WorkflowTask(
external_id="dynamic_task_generator",
parameters=FunctionTaskParameters(
external_id="dynamic_task_generator", data="${workflow.input}"
),
name="Dynamic Task Generator",
description="Returns a list of workflow tasks",
retries=1,
timeout=3600,
depends_on= ],
),
WorkflowTask(
external_id="dynamic_task",
parameters=DynamicTaskParameters(
tasks="${dynamic_task_generator.output.response.tasks}"
),
name="Dynamic Task",
description="Executes a list of workflow tasks",
retries=1,
timeout=3600,
depends_on= "dynamic_task_generator"],
),
],
)

Finally, we’ll trigger the workflow. In this case, we need to provide the list of tasks to dynamically execute as workflow input, but in practice the Function task can be used to generate this list in any other way. In that case, we would just trigger the workflow without thinking about workflow input.

workflow_input = {
"tasks": =
{
"externalId": "function_123",
"type": "function",
"name": "Function 1",
"description": "Some Cognite Function",
"parameters": {
"function": {
"externalId": "function_123",
"data": {
"count": 1
}
}
},
"dependsOn": >]
}
]
}

run = client.workflows.executions.trigger(
workflow_external_id=workflow_external_id, version=version, input=workflow_input
)

 


You can also refer to the documentation: 

Task Input and Output, and References

Dynamic Task


Hi, Thanks for the response. Actually wanted to understand , more on how dynamic task  work or will get  created , 1. which function to execute in that

2. can that be dissimilar functions to be executed by function. 3. how to pass input to dynamically created tasks

I tried above workflow_input with function that was running independent. Getting below error.

Task a1350b39-101e-4403-8477-f2520848a3a8 failed with status: FAILED and reason: 'Task input deserialization failed'

from cognite.client.data_classes import (
WorkflowDefinitionUpsert,
WorkflowTask,
FunctionTaskParameters,
DynamicTaskParameters
)
new_version = WorkflowVersionUpsert(
workflow_external_id="test-dywkflw",
version="1",
workflow_definition=WorkflowDefinitionUpsert(
tasks=k
WorkflowTask(
external_id="dynamic_task_generator",
parameters=FunctionTaskParameters(
external_id="get-gql-task", data="${workflow.input}"
),
name="Dynamic Task Generator",
description="Returns a list of workflow tasks",
retries=1,
timeout=3600,
depends_on=o],
),
WorkflowTask(
external_id="dynamic_task",
parameters=DynamicTaskParameters(
tasks="${dynamic_task_generator.output.response.tasks}"
),
name="Dynamic Task",
description="Executes a list of workflow tasks",
retries=1,
timeout=3600,
depends_on=o"dynamic_task_generator"],
)
],
description="This workflow has two steps",
),
)
res = client.workflows.versions.upsert(new_version)
workflow_input = {
"tasks": "{
"externalId": "subtask1",
"type": "function",
"name": "Function 1",
"description": "Some Cognite Function",
"parameters": {
"function": {
"externalId": "subtask1",
"data": {
"Input1": "${get-gql-task.output.response.first}"
}
}
},
"dependsOn": "]
},
{
"externalId": "subtask2",
"type": "function",
"name": "Function 2",
"description": "Some Cognite Function",
"parameters": {
"function": {
"externalId": "subtask2",
"data": {
"Input2": "${get-gql-task.output.response.second}"
}
}
},
"dependsOn": "]
}
]
}

run = client.workflows.executions.trigger(
workflow_external_id='test-dywkflw', version=1, input=workflow_input
)

 

I want to run subtask1, subtask2, subtasksN...so on ( depends how many number of tasks are passed from dynamic_task_generator) , all function expects input from function passed in dynamic_task_generator.

Sample ouput from dynamic_task_generator

 


Hi, I’ll try to elaborate on some topics and provide an example based on my understanding on what you’re trying to achieve.

  • The Dynamic task will take 1 parameter, a list of workfow task definitions, and try to execute these tasks (if a task definition in this list is invalid, the workflow will fail, like in your example)
  • This list of tasks can come from:
    • A previous workflow step (a Cognite Function generates and outputs the task list) 
    • From workflow input (in the run call, the task list required is included directly to the workflow as input) 

If you want the Cognite Function to generate the list of tasks, you can do this: 

from cognite.client.data_classes import (
WorkflowDefinitionUpsert,
WorkflowTask,
FunctionTaskParameters,
DynamicTaskParameters
)
new_version = WorkflowVersionUpsert(
workflow_external_id="test-dywkflw",
version="1",
workflow_definition=WorkflowDefinitionUpsert(
tasks=
WorkflowTask(
external_id="dynamic_task_generator",
parameters=FunctionTaskParameters(
external_id="get-gql-task",
),
name="Dynamic Task Generator",
description="Returns a list of workflow tasks",
retries=1,
timeout=3600,
depends_on= ],
),
WorkflowTask(
external_id="dynamic_task",
parameters=DynamicTaskParameters(
tasks="${dynamic_task_generator.output.response.tasks}"
),
name="Dynamic Task",
description="Executes a list of workflow tasks",
retries=1,
timeout=3600,
depends_on= "dynamic_task_generator"],
)
],
description="This workflow has two steps",
),
)
res = client.workflows.versions.upsert(new_version)

Note that I removed the data parameter of the Function task, as this is not needed if the Function is responsible for generating the list of tasks. 

The Dynamic task will try to resolve the list of tasks to execute based on the response of the Function ("${dynamic_task_generator.output.response.tasks}"), and as you can see it expects the Function to have a tasks key in its response, containing the list of valid workflow tasks. This is the crucial part: the Function needs to return a list of valid workflow task definitions, and it needs to return it under the same key that you reference in the Dynamic task in the workflow.

In this example, your Funciton should look something like this: 

def handle(client, data, secrets):
tasks = <]
# insert any logic here to generate valid workflow task definitions and add to the list
return {"tasks": tasks, "some_other_key": 123}

Then execute the workflow without input data.

run = client.workflows.executions.trigger(
workflow_external_id='test-dywkflw', version=1
)

Let me know if you have further questions!


Thank you @ Jørgen Lund. Quick reply was helpful. 

On doubt, We can create only one DynamicTask, under which we can run sequence of task or parallel task defined in first function only? for example below is giving me 'Task input deserialization failed' error.

tasks =

      {

        "externalId": "task1",

        "type": "function",

        "parameters": {

          "function": {

            "externalId": "f1",

            "data": {

              "output1": updated_df_h.to_json()

            }

          }

        },

        "dependsOn": ]

      },

      {

        "externalId": "task2",

        "type": "function",

        "parameters": {

          "function": {

            "externalId": "f1",

            "data": {

              "output1": updated_df_t.to_json()

            }

          }

        },

        "dependsOn": d]

      },

      {

        "externalId": "subtask1",

        "type": "function",

        "parameters": {

          "function": {

            "externalId": "subtask1",

            "data": {

              "output1": "${task1.output.response.output2}"

            }

          }

        },

        "dependsOn": d"task1"]

      },

      {

        "externalId": "subtask2",

        "type": "function",

        "parameters": {

          "function": {

            "externalId": "subtask2",

            "data": {

              "output2": "${task2.output.response.output2}"

            }

          }

        },

        "dependsOn": "task2"]

      }

    ]

 or we can create more dynamic task after previous is finished(chaining of dynamic tasks) ? 


You can choose different patterns: 

  • Function 1 → Dynamic Task 1 (Set of Tasks)
  • Function 1 → Dynamic Task 1 (Set of Tasks) → Function 2 → Dynamic Task 2 (Set of Tasks)
    • In this pattern, Function 2 will be triggered when all tasks within Dynamic Task 1 are completed.

What you cannot do is “chain” Dynamic Tasks, e.g. Function 1 → Dynamic Task 1 (Set of Tasks, including another Dynamic Task)

Did this answer your question? 


Reply