Skip to main content

How to create a schedule-based execution of workflows using Cognite Functions [Cognite Official]

  • November 30, 2023
  • 0 replies
  • 164 views

Jørgen Lund
Seasoned Practitioner
Forum|alt.badge.img

Currently, CDF Workflows have no built-in scheduling mechanisms (it is on our roadmap). A simple way to implement schedule-based execution of workflow is to leverage Cognite Functions. Below is a two-step example to explain how it works.

1. Create the Cognite Function that will act as the workflow trigger

Note that you need to specify the client_credentials parameter inside the call to client.workflows.executions.trigger for the authentication to work at runtime.

# Enter credentials and instantiate client
from cognite.client import CogniteClient
cdf_cluster = "" # "api", "greenfield", etc.
cdf_project = "" # CDF project name
tenant_id = "" # IdP tenant ID
client_id = "" # IdP client ID
client_secret = "" # IdP client secret

client = CogniteClient.default_oauth_client_credentials(
cdf_project, cdf_cluster, tenant_id, client_id, client_secret
)

# Define Function handle
def handle(client, data, secrets):
from cognite.client.data_classes import ClientCredentials

execution = client.workflows.executions.trigger(
workflow_external_id=data["workflow-external-id"],
version=data["workflow-version"],
client_credentials=ClientCredentials(
secrets["client-id"], secrets["client-secret"]
),
)

return f"Workflow execution ID: {execution.id}"

# Create the Function
function_name = "workflow_trigger"
client.functions.create(
name=function_name,
external_id=function_name,
function_handle=handle,
secrets={
"client-id": client_id,
"client-secret": client_secret,
"project": cdf_project,
},
)

2. Create a Cognite Function schedule for each scheduled workflow trigger you need

from cognite.client.data_classes import ClientCredentials

# Enter a cron expression for the schedule
cron_expression = "0 12 * * *" # E.g. daily at 12:00. Use https://crontab.guru/ to create a valid expression
# Enter the details of the workflow to schedule
workflow_external_id = "<Enter workflow external ID>"
workflow_version = "<Enter workflow version>"

function_id = client.functions.retrieve(external_id=function_name).id
client.functions.schedules.create(
name=f"Scheduled trigger: {workflow_external_id}",
cron_expression=cron_expression,
function_id=function_id,
client_credentials=ClientCredentials(client_id, client_secret),
data={
"workflow-external-id": workflow_external_id,
"workflow-version": workflow_version,
},
)

 

Leave a comment below if you have questions!