Skip to main content

Dynamic Tasks processing in sequence


 

I have created 1 workflow , in which I am creating dynamic tasks depending on input, it creates batch of ids and create tasks out of it. Below is workflow definition

WorkflowVersionUpsert(

   workflow_external_id="test_dynamic-0729",

   version="1",

   workflow_definition=WorkflowDefinitionUpsert(

       description="This workflow has two steps",

       tasks=[

           WorkflowTask(

                external_id="test_sub_tasks",

                parameters=FunctionTaskParameters(

                    external_id="test_sub_tasks",

                    data="${workflow.input}"

                ),

                retries=1,

                timeout=3600,

                depends_on=[],

                on_failure = "abortWorkflow",

            ),

           WorkflowTask(

                external_id="test_create_sub",

                parameters=DynamicTaskParameters(

                    tasks="${test_sub_tasks.output.response.tasks}"

                ),

                name="Dynamic Task",

                description="Executes a list of workflow tasks for subscription creation",

                retries=0,

                timeout=3600,

                depends_on=["test_sub_tasks"],

                on_failure = "abortWorkflow",

           )

       ]

   )

 As part of this workflow, I have some task that are needed to be executed in parallel and are expected to finish in around similar time if running parallel. However, dynamic tasks are not getting executed in parallel, I have 6 dynamic tasks, but they are finishing up sequentially. Sharing snapshot. If we can see time difference between first and last task is ~6m. Not able to take advantage of parallel execution with dynamic tasks.

Impact : Dynamic Execution taking sequential time to process tasks, further delaying completion of workflow

Jørgen Lund
Seasoned Practitioner

Hi @Rimmi Anand

When you say you have “6 dynamic tasks”, are you then referring to 6 Function Tasks within the Dynamic Task of the workflow definition you shared? And the problem being that these 6 Function Tasks are executed sequentially rather than in parallel?

Can you please share the workflow execution details (you can use this endpoint to retrieve it) of a run where you observe sequential execution when expecting parallel? Note that the execution details might contain sensitive information - if that’s the case, remove it before sharing (here or by DM).


I am referring to Function Tasks within dynamic tasks. They start in parallel but completion time is increased with each dynamic task. In Parallel execution, all task should finish up approx equal time. Let me know if that is not the case . Sharing execution details in txt , it might have 3 dynamic tasks, feel free to convert in JSON as it didn’t allowed to attach. If any other information is required. Let me know.


Jørgen Lund
Seasoned Practitioner

Thanks. We’ll take a look. The behavior you see could be explained by the task concurrency limitations explained in this thread: 

 


Jørgen Lund
Seasoned Practitioner

Hi @Rimmi Anand. We’ve investigated your issue and the root cause is a problem we’ve been experiencing with scalability/concurrent executions of Cognite Functions on Azure (Data Workflows functions as expected in this case). We’ll have to follow up on this next week. How critical is this specific issue for your use case?


  1. We have multiple workflows and some are dependent workflows. Long execution time is impacting performance of our workflow and execution time is deciding factor for schedule of workflows. So its critical for our workflows.

Hi @Jørgen Lund , is there any update on this, still facing issue , 

I didnt have any other function running while testing for this, 


Jørgen Lund
Seasoned Practitioner

Related issue with additonal commnets here. We’ll take another look at this.

@Rimmi Anand could you share the following for this particular case:

  • Which CDF-project is this and what cluster is it running on? 
  • What is the function-ID for the function in the pictures above?

Hi @Jørgen Lund 

Sharing details. I don’t have functionID for this screenshot as this was redeployed and new functionID was created. I can share externalID for this function if that helps.

  • cluster: westeurope-1, project: slb-uds-dev
  •   extenalID - wk_ingest_osdu_3

Jørgen Lund
Seasoned Practitioner

Thanks @Rimmi Anand. We have looked into this, and unfortunately we believe this is the result of an issue on the side of the cloud provider (Azure). We’ll have to continue working towards a resolution, but due to the nature of the issue we expect this to take some time. 

This might sound like a strange suggestion considering the issue, but would it be possible for you to try to parallelize the work even more? E.g. instead of making 5 calls i parllel, make 10 or more? Or is this not possible in your case due to limit on number of tasks in the data workflow (we might consider increasing this)? This might help work around the issue in Azure and prompt the Function to scale out as expected. 


we have tried by making more calls. 

function ID - 7220560071032348

cluster: westeurope-1

project: slb-uds-qa

 


Jørgen Lund
Seasoned Practitioner

A bit hard to tell from the screenshot as it’s not clear exactly when each call wass made, but do you see an improvement to parallel execution? 

We’ll also take a look at our logs.


We are working on this issue and have discovered that there are problems when “cold” functions are being called and they will not scale in the expected way. This looks to be a problem with Azure Functions which are being used to run Cognite Functions on Azure, and we are in dialog with Microsoft to understand why the problem occur, and how this can be fixed.

A mitigation that can be used until we find a more permanent solution is to warm up the function with a single call (e.g. use a cheap computation or have a separate parameter to tell the function not to do much work) as a separate step earlier in the workflow. This needs to be done before the real computations, around 30-40 seconds before to not trigger the bad scaling behavior for the real computation that you want to do later in the workflow. When the function is warm, our internal testing has shown that the function will scale out as expected.


Hi @Dag Brattli , can you please provide the example? 


We have received confirmation from Microsoft that this is a known issue affecting Azure Functions running on the Consumption Plan. Microsoft has scheduled a fix for mid-January.

In the interim, we recommend implementing one of these mitigation strategies:

  1. Maintain the function warm by implementing a lightweight health check call every 1-5 minutes. The function will become cold after approximately 30 minutes.
  2. Pre-warm the function by triggering it approximately a few minutes before your scheduled processing needs using a lightweight call. The function needs to be idle for about a minute between the warmup and the actual call in order to scale correctly.

These workarounds will help ensure consistent performance until Microsoft's planned update resolves the underlying issue. We will continue to monitor the situation and update you once the fix has been deployed.


@Dag Brattli Thanks for your response.

If the function is scheduled to run after every 5/10 mins, then do we still need to pre-warm the function? should we expect to scale if the functions are scheduled to run within 30 mins window?

 

I am asking this question because functions are not scaling even if they are scheduled within 30 mins internal. If the warm strategy works then that could help to mitigate this issue as well  

https://hub.cognite.com/slb-community-forum-288/cognite-functions-queue-and-timeout-4410

@Aditya Kotiyal 

 


Aditya Kotiyal
MVP
Forum|alt.badge.img+3

@Dag Brattli Your comments will be highly appreciated, considering that Cognite team will be away starting next week, if we can get some confirmation today, it will help ​@Niranjan Madhukar Karvekar  and team to continue delivering the project for the custome.

Your comment will help us solve this thread and the other thread as well:

https://hub.cognite.com/slb-community-forum-288/cognite-functions-queue-and-timeout-4410

 

@Jørgen Lund 


The function needs to be called every minute with a lightweight call to keep it warm. If not, you will need to pre-warm it with a single lightweight call a minute before you want to do concurrent calls. Then it should scale as needed.

We hope that the fix Microsoft will do mid-january will remove the need for this mitigation.

Dag 


@Dag Brattli thanks for your response. 

 

Below is the example that I tested with warm up for the scalability but still I don’t see the functions are scaling as expected.

 

trigger_workflow → create_tasks → create_payload →  call_external_api

create_payload and call_external_api runs under the dynamic task.

 

workflow definition:

{

	"items":[
	{
	"workflowExternalId": "scalability-demo-workflow",
	"version": "1",
	"workflowDefinition": {
  	"description": "",
	"tasks": [
			{
				"externalId": "create_tasks",
				"type": "function",
				"name": "Dynamic Task Generator",
				"description": "Returns a list of workflow tasks",
				"parameters": {
					"function": {
						"externalId": "create_tasks",
						"data": "${workflow.input}"
					}
				},
				"retries": 1,
				"timeout": 3600,
				"onFailure": "abortWorkflow",
				"dependsOn": []
			},
			{
				"externalId": "process_data",
				"type": "dynamic",
				"name": "Dynamic Task",
				"description": "Executes a list of workflow tasks",
				"parameters": {
					"dynamic": {
						"tasks": "${create_tasks.output.response.tasks}"
					}
				},
				"retries": 1,
				"timeout": 3600,
				"onFailure": "abortWorkflow",
				"dependsOn": [
					{
						"externalId": "create_tasks"
					}
				]
			}
		]
	}
}
	]
}

 

trigger_workflow code:

from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import OAuthClientCredentials
import os
import time
import json
# input
# {
# "total_workflows_count": 2,
# "total_parallel_tasks_per_workflow_count": 4,
# "wait_time": 60
# }

def handle(client, data,secrets):
    if 'health' in data:
        print("health check")
        return {}
    else:
        from cognite.client.data_classes import ClientCredentials
        number_of_workflows = data['total_workflows_count']
        number_of_parallel_tasks_in_workflow = data['total_parallel_tasks_per_workflow_count']
        wait_time = data['wait_time']

        number_of_wellbores = number_of_workflows*number_of_parallel_tasks_in_workflow
        external_id_list = list()
        for x in range(0, number_of_wellbores):
            wellbore = "wellbore" + str(x)
            external_id_list.append(wellbore)

        print(f'wellbores to process {external_id_list}')

        print(f'no of workflow instances:{number_of_workflows}')
        print(f'no of parallel tasks per workflow:{number_of_parallel_tasks_in_workflow}')

        subset_size = number_of_parallel_tasks_in_workflow

        sublists = [external_id_list[i:i + subset_size] for i in range(0, len(external_id_list), subset_size)]

        for x in range(number_of_workflows):
            res = client.workflows.executions.trigger("scalability-demo-workflow", "1", input={"wellbores": sublists[x],"wait_time": wait_time},client_credentials=ClientCredentials(
                secrets['client-id'], secrets['client-secret']
            ))
            print(f'workflows:{res}')
        output_json = {}
        return output_json

 

create_tasks code:

from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import OAuthClientCredentials
import os
import time
import json

def chunks(cognite_ext_id_df,split_size):
    for i in range(0,len(cognite_ext_id_df),split_size):
        yield cognite_ext_id_df[i:i+split_size] 

def handle(client, data):
    if 'health' in data:
        print("health check")
        return {}
    else:
        wellbores=data['wellbores']
        print(f'wellbores:{wellbores}')

        osdu_split = 1
        external_id_list = set()

        for wellbore in wellbores:
            external_id_list.add(wellbore)

        output = external_id_list

        size = 1
        task = []
        for chunk in chunks(list(output),osdu_split):
                new_ext_id = "create_payload"+str(size)
                print(f'create_payload task external id: {new_ext_id}')
                obj = {
                "externalId": new_ext_id,
                "type": "function",
                "description": "Some Cognite Function",
                "parameters": {
                    "function": {
                    "externalId": "create_payload",
                    "data": {
                        "wellbore": json.dumps(list(chunk))
                    }
                    }
                },
                "dependsOn": []
                }
                task.append(obj)

                ext_id_2 = "call_external_api"+str(size) #second function in dynamic task 
                print(f'call_external_api task external id: {ext_id_2}')
                obj2 = {
                "externalId": ext_id_2,
                "type": "function",
                "description": "Some Cognite Function",
                "parameters": {
                    "function": {
                    "externalId": "call_external_api",
                    "data": {
                        "wellbore": "${"+new_ext_id+".output.response}",
                        "wait_time": data['wait_time']
                        }
                    }
                },
                "dependsOn": [{"externalId": new_ext_id}]
                }
                task.append(obj2)
                size = size + 1  
        output_json = {}    
        print(f'total number of tasks created: {len(task)}')
        print(f'total number of parallel tasks created: {(size-1)}')
        output_json['tasks'] = task
        return output_json    

 

create_payload code:

from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import OAuthClientCredentials
import os
import time
import json

def handle(client, data):
    if 'health' in data:
        print("health check")
        return {}
    else:
        wellbore=data['wellbore']
        print(f'create_payload for wellbore {wellbore}')
        return data

 

call_external_api code:

from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import OAuthClientCredentials
import os
import time
import json


def handle(client, data):
    if 'health' in data:
        print("health check")
        return {}
    else:
        wellbore=data['wellbore']['wellbore']
        wait_time=data['wait_time']
        print(f'call_external_api for wellbore {wellbore}')
        print(f'starting long running process..waiting for {wait_time} seconds')
        time.sleep(wait_time)
        print("process completed")
        return data

 

create_tasks schedule for warm up with input as {"health": true}

05,10,15,20,25,30,35,40,45,50,55 * * * *

 

create_payload schedule for warm up with input as {"health": true}

05,10,15,20,25,30,35,40,45,50,55 * * * *

 

call_external_api schedule for warm up with input as {"health": true}

05,10,15,20,25,30,35,40,45,50,55 * * * *

 

trigger_workflow schedule for actual data processing with input and schedule as:

{
"total_workflows_count": 2,
"total_parallel_tasks_per_workflow_count": 2,
"wait_time": 120
}

This should process 4 wellbores, 2 per workflow. In each workflow instance, 2 wellbores are processed. In each workflow instance, 2 instances of create_tasks and call_external_api runs to process each wellbore. 

 

schedule:

07,12,17,22,27,32,37,42,47,52,57 * * * *

 

so here in this example, trigger workflow is scheduled to run after 2 mins after warm up. The trigger_workflow is triggering the workflow with input of wellbores based on total_parallel_tasks_per_workflow_count and then in workflow we are creating the parallel tasks for create_payload and call_external_api based on no of wellbores passed to each workflow.  We expect all the executions should be finished in approx same time, even the total_workflow_count and  total_parallel_tasks_per_workflow_count are different. For example:  For below inputs as well, the executions should be finished approx in same time but the create_payload :

{
"total_workflows_count": 2,
"total_parallel_tasks_per_workflow_count": 10,
"wait_time": 120
}

This should process 20 wellbores, 10 per workflow. In each workflow instance, 10 wellbores are processed. In each workflow instance, 10 instances of create_tasks and call_external_api runs to process each wellbore. 

 

{
"total_workflows_count": 2,
"total_parallel_tasks_per_workflow_count": 20,
"wait_time": 120
}

This should process 40 wellbores, 20 per workflow. In each workflow instance, 20 wellbores are processed. In each workflow instance, 20 instances of create_tasks and call_external_api runs to process each wellbore. 

 

Please let me know if anything missing in this.

 


@Niranjan Madhukar Karvekar Would it be possible to share any details on how you see that the functions do not scale as expected? Cognite Functions on Azure will scale up based on the amount of work. New instances are allocated, at most, once every 30 seconds. This means that you should rarely or never see concurrent functions calls finish exactly at the same time, since it will still take time before more instances are allocated. Ref: https://learn.microsoft.com/en-us/azure/azure-functions/event-driven-scaling?tabs=azure-cli#understanding-scaling-behaviors


Hi ​@Dag Brattli , I am just checking the execution time for each of the above function. Above is the sample example which I created for concurrency tests. In above the example, if you see trigger_workflow code, the input to that function is :

{ "total_workflows_count": 2, "total_parallel_tasks_per_workflow_count": 4, "wait_time": 60 }

We don’t expect the exact time to finish but the execution time is increasing if we increase the total_parallel_tasks_per_workflow_count or total_workflow_count. 

Do you see any issue in sample code above? 

 


Reply


Cookie Policy

We use cookies to enhance and personalize your experience. If you accept you agree to our full cookie policy. Learn more about our cookies.

 
Cookie Settings