Skip to main content

This post is a hands-on introduction to the features supported in the Transformations Python SDK.

Prerequisites

Knowledge:

  • Basic knowledge of Azure Functions and Azure Data Factory

  • Basic knowledge of Cognite Data Fusion RAW and SQL Transformations

  • Prior experience with Python, Postgres and SQL

Required Datasets:

Download and Unzip the attached hub.zip file, you should find the below structure

  • Use Case 1 :

    • asset-hierarchy.csv

  • UseCase 2:

    • OID-Asset-hirerachy.csv

    • OID-Timeseries.csv

    • OID-Datapoints.csv

Use Case 1: Triggering Transformations

Data is extracted from source systems and ingested to RAW. Once the data is ingested, we trigger Transformations so that the data is processed and immediately available for consumption.

Step 1 - Create RAW Tables

Log in to your project in Cognite Data Fusion and click on "Browse staged data".

Create a Database "Transformations_SDK" and create a table "Asset_Hierarchy"

 

Step 2 - Uploading data to RAW using Postgres Gateway

2a. Upload the asset-hierarchy.csv file to azure blob storage

 

2b. If you have not used Postgres Gateway, follow the Postgres Gateway documentation for setup. Next, create a RAW foreign table called "asset_hierarchy" by following this documentation. I have attached below the screenshot of how the columns in your RAW foreign table should look.

SQL for creating your RAW foreign table (in case you don’t want to create it manually):

CREATE FOREIGN TABLE public.asset_hierarchy(
description character varying NULL,
parent_func_loc character varying NULL,
"WMT_LOCATION_ID" bigint NULL,
"WMT_TAG_HISTORYREQUIRED" character varying NULL,
"WMT_TAG_DESC" character varying NULL,
"WMT_TAG_NAME" character varying NULL,
"WMT_CONTRACTOR_ID" bigint NULL,
"WMT_PO_ID" bigint NULL,
"WMT_AREA_ID" bigint NULL,
"WMT_TAG_ID_ANCESTOR" bigint NULL,
loc character varying NULL,
"WMT_TAG_ID" bigint NULL,
"Key" bigint NULL
)
SERVER raw_server
OPTIONS (database 'Transformations_SDK', table 'Asset-Hierarchy', primary_key 'Key');

ALTER FOREIGN TABLE public.asset_hierarchy
OWNER TO <USERNAME>;

2c.Create a simple copy pipeline to ingest the “asset-hierarchy.csv” data from Azure blob storage to the "asset_hierarchy" RAW foreign table you just created. Follow this documentation to create your copy pipeline in Azure Data Factory.

 

Step 3 - Create new SQL Transformations

Create a new transformation in Cognite Data Fusion using the Python SDK. Download the SDK - pip install cognite-sdk-experimental==0.60.22

from cognite.experimental import CogniteClient
from cognite.experimental.data_classes import Transformation, TransformationDestination, OidcCredentials
c = CogniteClient(
base_url="<base_url>",
client_name="<client_name>",
token_client_id="<token_client_id>",
token_client_secret="<token_client_secret>",
token_url="<token_url>",
token_scopes= "<token_scopes>"],
project="<project_name>",
)
transformations = s
Transformation(
name="SAP asset hierarchy",
external_id = "SAP asset hierarchy",
source_oidc_credentials = OidcCredentials(
base_url="<base_url>",
client_name="<client_name>",
client_id="<token_client_id>",
client_secret="<token_client_secret>",
token_url="<token_url>",
scopes= "<token_scopes>"],
cdf_project_name="<project_name>",
),
destination_oidc_credentials=OidcCredentials(
base_url="<base_url>",
client_name="<client_name>",
client_id="<token_client_id>",
client_secret="<token_client_secret>",
token_url="<token_url>",
scopes= "<token_scopes>"],
cdf_project_name="<project_name>",
),
destination=TransformationDestination.raw(database= "Transformations_SDK", table = "Asset-Hierarchy"),
query = "SELECT concat('CogniteHub', loc) as externalId, IF(parent_func_loc='', '',concat('CogniteHub:',parent_func_loc)) AS parentExternalId,CAST(lastUpdatedTime AS STRING) AS name,to_metadata(*) AS metadata,description AS description FROM `Transformations_SDK`.`Asset-Hierarchy`"
)
]
res = c.transformations.create(transformations)

You should now be able to see the "SAP asset hierarchy" transformation created in Cognite Data Fusion.

SELECT
concat('CogniteHub', loc) as externalId,
IF(parent_func_loc='', '',concat('CogniteHub:',parent_func_loc)) AS parentExternalId,CAST(lastUpdatedTime AS STRING) AS name,to_metadata(*) AS metadata,description AS description
FROM `Transformations_SDK`.`Asset-Hierarchy`

Step 4 - Trigger the transformation from Azure Data Factory

Trigger the SQL Transformation right after the data is ingested to RAW by Azure Data Factory.

requirements.txt for Azure functions

cognite-sdk-experimental==0.60.22
numpy==1.21.0; python_full_version >= "3.7.1" and python_version >= "3.7"
oauthlib==3.1.1; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.6"
pandas==1.3.0; python_full_version >= "3.7.1" and python_version >= "3.5" and python_full_version < "4.0.0"
openpyxl==3.0.9;python_full_version >= "3.7.1" and python_version >= "3.5" and python_full_version < "4.0.0

Azure Functions

import logging
from cognite.experimental import CogniteClient
import azure.functions as func


def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
c = CogniteClient(
base_url="<base_url>",
client_name="<client_name>",
token_client_id="<token_client_id>",
token_client_secret="<token_client_secret>",
token_url="<token_url>",
token_scopes= "<token_scopes>"],
project="<project_name>",
)
transformation_job = c.transformations.run(transformation_external_id= "OID_Asset_Hierarchy", wait= True)
if (transformation_job.status == "Completed"):
return func.HttpResponse(f"All success")
else:
return func.HttpResponse(
"OID_Datapoints Failed",
status_code=400
)

Azure Data Factory successfully triggered the SQL Transformations once the data is uploaded to RAW. You can do the same with the extractors that you build using the python extractor utils ( please use cognite-sdk-experimental==0.60.22 ). Once the data is ingested to RAW by the extractor it can subsequently trigger the SQL Transformation.

 

Use Case 2: Orchestrating Transformations

Using the “wait= True” in transformations.run, Transformations can be run in sequence i.e. after successful completion of a transformation job the next transformation job is triggered. 

Step 1 - Create RAW Tables

In the "Transformations_SDK" database:

  • create a table "OID_Asset_Hierarchy" and upload OID-Asset-hirerachy.csv

  • create a table "OID_Timeseries" and upload OID-Timeseries.csv

  • create a table "OID_Datapoints" and upload OID-Datapoints.csv

Step 2 - Create new SQL Transformations

Using the Transformations python SDK create Transformations for "OID_Asset_Hierarchy", "OID_Timeseries" and "OID_Datapoints"

from cognite.experimental import CogniteClient
from cognite.experimental.data_classes import Transformation, TransformationDestination, OidcCredentials
c = CogniteClient(
base_url="<base_url>",
client_name="<client_name>",
token_client_id="<token_client_id>",
token_client_secret="<token_client_secret>",
token_url="<token_url>",
token_scopes= "<token_scopes>"],
project="<project_name>",
)
transformations = b
Transformation(
name="OID_Asset_Hierarchy",
external_id = "OID_Asset_Hierarchy",
source_oidc_credentials = OidcCredentials(
base_url="<base_url>",
client_name="<client_name>",
client_id="<token_client_id>",
client_secret="<token_client_secret>",
token_url="<token_url>",
scopes= "<token_scopes>"],
cdf_project_name="<project_name>",
),
destination_oidc_credentials=OidcCredentials(
base_url="<base_url>",
client_name="<client_name>",
client_id="<token_client_id>",
client_secret="<token_client_secret>",
token_url="<token_url>",
scopes= "<token_scopes>"],
cdf_project_name="<project_name>",
),
destination=TransformationDestination.assets(),
query = "select externalId, parentExternalId, 'open_industrial_data' as source, name, description,map(\"source_internal_id\", get_json_object(metadata, '$._replicatedInternalId'),\"datadump\", \"1\") as metadata,null as dataSetId, null as labels,bigint(id),bigint(parentId)from Transformations_SDK.OID_Asset_Hierarchy where is_new(\"datadump\", lastUpdatedTime)"
),
Transformation(
name="OID_Timeseries",
external_id = "OID_Timeseries",
source_oidc_credentials = OidcCredentials(
base_url="<base_url>",
client_name="<client_name>",
client_id="<token_client_id>",
client_secret="<token_client_secret>",
token_url="<token_url>",
scopes= "<token_scopes>"],
cdf_project_name="<project_name>",
),
destination_oidc_credentials=OidcCredentials(
base_url="<base_url>",
client_name="<client_name>",
client_id="<token_client_id>",
client_secret="<token_client_secret>",
token_url="<token_url>",
scopes= "<token_scopes>"],
cdf_project_name="<project_name>",
),
destination=TransformationDestination.timeseries(),
query = "select null as id,ts.name, ts.externalId,to_metadata(ts.metadata) as metadata,ts.unit,a.id as assetId,ts.description,null as securityCategories,false as isStep,false as isString,null as dataSetId from Transformations_SDK.OID_Timeseries ts left join _cdf.assets a on ts.externalId_asset == a.externalId"
),
Transformation(
name="OID_Datapoints",
external_id = "OID_Datapoints",
source_oidc_credentials = OidcCredentials(
base_url="<base_url>",
client_name="<client_name>",
client_id="<token_client_id>",
client_secret="<token_client_secret>",
token_url="<token_url>",
scopes= "<token_scopes>"],
cdf_project_name="<project_name>",
),
destination_oidc_credentials=OidcCredentials(
base_url="<base_url>",
client_name="<client_name>",
client_id="<token_client_id>",
client_secret="<token_client_secret>",
token_url="<token_url>",
scopes= "<token_scopes>"],
cdf_project_name="<project_name>",
),
destination=TransformationDestination.datapoints(),
query = "select externalId,to_timestamp(timestamp) as timestamp,double(value) as value from Transformations_SDK.OID_Datapoints"
)
]
res = c.transformations.create(transformations)

Step 3 - Orchestrate Transformations in sequence

requirements.txt for Azure functions

azure-functions
cognite-sdk-experimental==0.60.22
numpy==1.21.0; python_full_version >= "3.7.1" and python_version >= "3.7"
oauthlib==3.1.1; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.6"
pandas==1.3.0; python_full_version >= "3.7.1" and python_version >= "3.5" and python_full_version < "4.0.0"
openpyxl==3.0.9;python_full_version >= "3.7.1" and python_version >= "3.5" and python_full_version < "4.0.0

Azure Functions

import logging
from cognite.experimental import CogniteClient
import azure.functions as func


def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
c = CogniteClient(
base_url="<base_url>",
client_name="<client_name>",
token_client_id="<token_client_id>",
token_client_secret="<token_client_secret>",
token_url="<token_url>",
token_scopes= "<token_scopes>"],
project="<project_name>",
)
transformation_job_1 = c.transformations.run(transformation_external_id= "OID_Asset_Hierarchy", wait= True)
if ( transformation_job_1.status == "Completed"):
transformation_job_2 = c.transformations.run(transformation_external_id= "OID_Timeseries", wait= True)
else :
logging.error("OID_Asset_Hierarchy Failed")
if ( transformation_job_1.status == "Completed" and transformation_job_2.status == "Completed"):
transformation_job_3 = c.transformations.run(transformation_external_id= "OID_Datapoints", wait= True)
print("All done")
else:
print("OID_Timeseries Failed")

if (transformation_job_3.status == "Completed"):
return func.HttpResponse(f"All success")
else:
return func.HttpResponse(
"OID_Datapoints Failed",
status_code=400
)

Using the Transformations python SDK you have triggered Transformations in sequence. You can Schedule your Azure functions in the “App service plan”.

Unverified - If your Transformations take longer you can increase the duration of your Azure function by updating host.json in App service plan or you can try orchestrating using Azure durable functions.

Congratulations if you made it up here! :clap:

Now that you played around with the SDK, it's feedback time down in the comments! We'd love to know how you are using it in your use cases.

 

Could Cognite Functions be used now instead of  Azure Functions to avoid the inherent timeout issues with Az Fns?  Would Cognite Functions be able to be configured with a long enough timeout for something like a 12 hr transformation run?


@Ben Brandt  Unfortunately no 🙂 Cognite functions is similar to Az functions i.e. for a long running Transformations the lifetime of a Cognite function is short. A patch solution for now would be to run the python script on a VM.

A proper solution we believe is to have an orchestration service in CDF. We are currently working on the details and designs to figure out exactly how it's going to work. I can share more once the plans are more concrete. 


@Sunil Krishnamoorthy We are heavy Azure Durable Functions users for our orchestration, but we are very much looking forward to these developments in Dapr you may be interested in as well that allow building in a more cloud-agnostic and language-agnostic way.

>Proposal] Workflow building block and engine · Issue #4576 · dapr/dapr (github.com)
Dapr Community Call 67 - YouTube


Of course, Temporal is also a great starting point for workflow as it is already more mature.  I just don’t have as much experience with it since it does not have native .NET support.  Python, Go, Java, PHP, and JS/TypeScript are supported though.

Temporal Application development guide | Temporal Documentation

https://temporal.io/


Thank you for sharing @Ben Brandt !! Really appreciate it.

I have shared these with the development team currently evaluating orchestrators to find a good fit for our requirements and tech stack. 


You’re welcome!  The trouble is there are so many options around orchestration and workflow and its difficult to know which will actually remain through the test of time.

 

workflow-engine · GitHub Topics

orchestration-framework · GitHub Topics


Reply