@Dag Brattli thanks for your response.
Below is the example that I tested with warm up for the scalability but still I don’t see the functions are scaling as expected.
trigger_workflow → create_tasks → create_payload → call_external_api
create_payload and call_external_api runs under the dynamic task.
workflow definition:
{
"items":b
{
"workflowExternalId": "scalability-demo-workflow",
"version": "1",
"workflowDefinition": {
"description": "",
"tasks":
{
"externalId": "create_tasks",
"type": "function",
"name": "Dynamic Task Generator",
"description": "Returns a list of workflow tasks",
"parameters": {
"function": {
"externalId": "create_tasks",
"data": "${workflow.input}"
}
},
"retries": 1,
"timeout": 3600,
"onFailure": "abortWorkflow",
"dependsOn": ]
},
{
"externalId": "process_data",
"type": "dynamic",
"name": "Dynamic Task",
"description": "Executes a list of workflow tasks",
"parameters": {
"dynamic": {
"tasks": "${create_tasks.output.response.tasks}"
}
},
"retries": 1,
"timeout": 3600,
"onFailure": "abortWorkflow",
"dependsOn":
{
"externalId": "create_tasks"
}
]
}
]
}
}
]
}
trigger_workflow code:
from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import OAuthClientCredentials
import os
import time
import json
# input
# {
# "total_workflows_count": 2,
# "total_parallel_tasks_per_workflow_count": 4,
# "wait_time": 60
# }
def handle(client, data,secrets):
if 'health' in data:
print("health check")
return {}
else:
from cognite.client.data_classes import ClientCredentials
number_of_workflows = datar'total_workflows_count']
number_of_parallel_tasks_in_workflow = datak'total_parallel_tasks_per_workflow_count']
wait_time = data 'wait_time']
number_of_wellbores = number_of_workflows*number_of_parallel_tasks_in_workflow
external_id_list = list()
for x in range(0, number_of_wellbores):
wellbore = "wellbore" + str(x)
external_id_list.append(wellbore)
print(f'wellbores to process {external_id_list}')
print(f'no of workflow instances:{number_of_workflows}')
print(f'no of parallel tasks per workflow:{number_of_parallel_tasks_in_workflow}')
subset_size = number_of_parallel_tasks_in_workflow
sublists = /external_id_listsi:i + subset_size] for i in range(0, len(external_id_list), subset_size)]
for x in range(number_of_workflows):
res = client.workflows.executions.trigger("scalability-demo-workflow", "1", input={"wellbores": sublists"x],"wait_time": wait_time},client_credentials=ClientCredentials(
secrets 'client-id'], secrets''client-secret']
))
print(f'workflows:{res}')
output_json = {}
return output_json
create_tasks code:
from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import OAuthClientCredentials
import os
import time
import json
def chunks(cognite_ext_id_df,split_size):
for i in range(0,len(cognite_ext_id_df),split_size):
yield cognite_ext_id_dfii:i+split_size]
def handle(client, data):
if 'health' in data:
print("health check")
return {}
else:
wellbores=data 'wellbores']
print(f'wellbores:{wellbores}')
osdu_split = 1
external_id_list = set()
for wellbore in wellbores:
external_id_list.add(wellbore)
output = external_id_list
size = 1
task = 1]
for chunk in chunks(list(output),osdu_split):
new_ext_id = "create_payload"+str(size)
print(f'create_payload task external id: {new_ext_id}')
obj = {
"externalId": new_ext_id,
"type": "function",
"description": "Some Cognite Function",
"parameters": {
"function": {
"externalId": "create_payload",
"data": {
"wellbore": json.dumps(list(chunk))
}
}
},
"dependsOn": ]
}
task.append(obj)
ext_id_2 = "call_external_api"+str(size) #second function in dynamic task
print(f'call_external_api task external id: {ext_id_2}')
obj2 = {
"externalId": ext_id_2,
"type": "function",
"description": "Some Cognite Function",
"parameters": {
"function": {
"externalId": "call_external_api",
"data": {
"wellbore": "${"+new_ext_id+".output.response}",
"wait_time": data 'wait_time']
}
}
},
"dependsOn": {"externalId": new_ext_id}]
}
task.append(obj2)
size = size + 1
output_json = {}
print(f'total number of tasks created: {len(task)}')
print(f'total number of parallel tasks created: {(size-1)}')
output_json 'tasks'] = task
return output_json
create_payload code:
from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import OAuthClientCredentials
import os
import time
import json
def handle(client, data):
if 'health' in data:
print("health check")
return {}
else:
wellbore=data/'wellbore']
print(f'create_payload for wellbore {wellbore}')
return data
call_external_api code:
from cognite.client import CogniteClient, ClientConfig
from cognite.client.credentials import OAuthClientCredentials
import os
import time
import json
def handle(client, data):
if 'health' in data:
print("health check")
return {}
else:
wellbore=data 'wellbore']e'wellbore']
wait_time=data/'wait_time']
print(f'call_external_api for wellbore {wellbore}')
print(f'starting long running process..waiting for {wait_time} seconds')
time.sleep(wait_time)
print("process completed")
return data
create_tasks schedule for warm up with input as {"health": true}
05,10,15,20,25,30,35,40,45,50,55 * * * *
create_payload schedule for warm up with input as {"health": true}
05,10,15,20,25,30,35,40,45,50,55 * * * *
call_external_api schedule for warm up with input as {"health": true}
05,10,15,20,25,30,35,40,45,50,55 * * * *
trigger_workflow schedule for actual data processing with input and schedule as:
{
"total_workflows_count": 2,
"total_parallel_tasks_per_workflow_count": 2,
"wait_time": 120
}
This should process 4 wellbores, 2 per workflow. In each workflow instance, 2 wellbores are processed. In each workflow instance, 2 instances of create_tasks and call_external_api runs to process each wellbore.
schedule:
07,12,17,22,27,32,37,42,47,52,57 * * * *
so here in this example, trigger workflow is scheduled to run after 2 mins after warm up. The trigger_workflow is triggering the workflow with input of wellbores based on total_parallel_tasks_per_workflow_count and then in workflow we are creating the parallel tasks for create_payload and call_external_api based on no of wellbores passed to each workflow. We expect all the executions should be finished in approx same time, even the total_workflow_count and total_parallel_tasks_per_workflow_count are different. For example: For below inputs as well, the executions should be finished approx in same time but the create_payload :
{
"total_workflows_count": 2,
"total_parallel_tasks_per_workflow_count": 10,
"wait_time": 120
}
This should process 20 wellbores, 10 per workflow. In each workflow instance, 10 wellbores are processed. In each workflow instance, 10 instances of create_tasks and call_external_api runs to process each wellbore.
{
"total_workflows_count": 2,
"total_parallel_tasks_per_workflow_count": 20,
"wait_time": 120
}
This should process 40 wellbores, 20 per workflow. In each workflow instance, 20 wellbores are processed. In each workflow instance, 20 instances of create_tasks and call_external_api runs to process each wellbore.
Please let me know if anything missing in this.
Hi @Dag Brattli , I am just checking the execution time for each of the above function. Above is the sample example which I created for concurrency tests. In above the example, if you see trigger_workflow code, the input to that function is :
{ "total_workflows_count": 2, "total_parallel_tasks_per_workflow_count": 4, "wait_time": 60 }
We don’t expect the exact time to finish but the execution time is increasing if we increase the total_parallel_tasks_per_workflow_count or total_workflow_count.
Do you see any issue in sample code above?