Skip to main content

How to create a schedule-based execution of workflows using Cognite Functions [Cognite Official]

  • November 30, 2023
  • 0 replies
  • 101 views

Jørgen Lund
Seasoned Practitioner

Currently, CDF Workflows have no built-in scheduling mechanisms (it is on our roadmap). A simple way to implement schedule-based execution of workflow is to leverage Cognite Functions. Below is a two-step example to explain how it works.

1. Create the Cognite Function that will act as the workflow trigger

Note that you need to specify the client_credentials parameter inside the call to client.workflows.executions.trigger for the authentication to work at runtime.

# Enter credentials and instantiate client
from cognite.client import CogniteClient
cdf_cluster = "" # "api", "greenfield", etc.
cdf_project = "" # CDF project name 
tenant_id = "" # IdP tenant ID
client_id = "" # IdP client ID 
client_secret = "" # IdP client secret

client = CogniteClient.default_oauth_client_credentials(
    cdf_project, cdf_cluster, tenant_id, client_id, client_secret
)

# Define Function handle
def handle(client, data, secrets):
    from cognite.client.data_classes import ClientCredentials

    execution = client.workflows.executions.trigger(
        workflow_external_id=data["workflow-external-id"],
        version=data["workflow-version"],
        client_credentials=ClientCredentials(
            secrets["client-id"], secrets["client-secret"]
        ),
    )

    return f"Workflow execution ID: {execution.id}"

# Create the Function
function_name = "workflow_trigger"
client.functions.create(
    name=function_name,
    external_id=function_name,
    function_handle=handle,
    secrets={
        "client-id": client_id,
        "client-secret": client_secret,
        "project": cdf_project,
    },
)

2. Create a Cognite Function schedule for each scheduled workflow trigger you need

from cognite.client.data_classes import ClientCredentials

# Enter a cron expression for the schedule
cron_expression = "0 12 * * *"  # E.g. daily at 12:00. Use https://crontab.guru/ to create a valid expression
# Enter the details of the workflow to schedule
workflow_external_id = "<Enter workflow external ID>"
workflow_version = "<Enter workflow version>"

function_id = client.functions.retrieve(external_id=function_name).id
client.functions.schedules.create(
    name=f"Scheduled trigger: {workflow_external_id}",
    cron_expression=cron_expression,
    function_id=function_id,
    client_credentials=ClientCredentials(client_id, client_secret),
    data={
        "workflow-external-id": workflow_external_id,
        "workflow-version": workflow_version,
    },
)

 

Leave a comment below if you have questions! 

0 replies

Be the first to reply!

Reply


Cookie Policy

We use cookies to enhance and personalize your experience. If you accept you agree to our full cookie policy. Learn more about our cookies.

 
Cookie Settings