Currently, CDF Workflows have no built-in scheduling mechanisms (it is on our roadmap). A simple way to implement schedule-based execution of workflow is to leverage Cognite Functions. Below is a two-step example to explain how it works.
1. Create the Cognite Function that will act as the workflow trigger
Note that you need to specify the client_credentials
parameter inside the call to client.workflows.executions.trigger
for the authentication to work at runtime.
# Enter credentials and instantiate client
from cognite.client import CogniteClient
cdf_cluster = "" # "api", "greenfield", etc.
cdf_project = "" # CDF project name
tenant_id = "" # IdP tenant ID
client_id = "" # IdP client ID
client_secret = "" # IdP client secret
client = CogniteClient.default_oauth_client_credentials(
cdf_project, cdf_cluster, tenant_id, client_id, client_secret
)
# Define Function handle
def handle(client, data, secrets):
from cognite.client.data_classes import ClientCredentials
execution = client.workflows.executions.trigger(
workflow_external_id=data["workflow-external-id"],
version=data["workflow-version"],
client_credentials=ClientCredentials(
secrets["client-id"], secrets["client-secret"]
),
)
return f"Workflow execution ID: {execution.id}"
# Create the Function
function_name = "workflow_trigger"
client.functions.create(
name=function_name,
external_id=function_name,
function_handle=handle,
secrets={
"client-id": client_id,
"client-secret": client_secret,
"project": cdf_project,
},
)
2. Create a Cognite Function schedule for each scheduled workflow trigger you need
from cognite.client.data_classes import ClientCredentials
# Enter a cron expression for the schedule
cron_expression = "0 12 * * *" # E.g. daily at 12:00. Use https://crontab.guru/ to create a valid expression
# Enter the details of the workflow to schedule
workflow_external_id = "<Enter workflow external ID>"
workflow_version = "<Enter workflow version>"
function_id = client.functions.retrieve(external_id=function_name).id
client.functions.schedules.create(
name=f"Scheduled trigger: {workflow_external_id}",
cron_expression=cron_expression,
function_id=function_id,
client_credentials=ClientCredentials(client_id, client_secret),
data={
"workflow-external-id": workflow_external_id,
"workflow-version": workflow_version,
},
)
Leave a comment below if you have questions!