Hi Guys,
I tried to use a transformation from the SDK but i keep getting error on Transformation credentials. Do you guys know what I need to do to solve this credentials issue. I have check the current credential for the Service Account that I use and the access for tranformation has been granted.
def fetch_raw_table_via_transformation(
client: CogniteClient,
db_name: str,
table_name: str,
calculation_date: Optional[str] = None,
columns: Optional[List[str]] = None
) -> pd.DataFrame:
"""
Fetch data by running a temporary CDF Transformation (uses Spark SQL engine).
This is the RECOMMENDED approach for production workloads:
- Runs on CDF's managed Spark infrastructure
- Handles billions of rows efficiently
- Server-side filtering and aggregation
- Can write results to temp Raw table or return directly
Note: Requires transformation permissions in CDF
Args:
client: CogniteClient instance
db_name: Name of the RAW database
table_name: Name of the RAW table
calculation_date: Optional date filter (YYYY-MM-DD)
columns: Optional list of columns to select
Returns:
DataFrame with query results
"""
try:
table_start = time.time()
print(f" [Transformation] Running Spark query on {db_name}.{table_name}...", flush=True)
# Build Spark SQL query
select_cols = ', '.join(columns) if columns else '*'
where_clause = f"WHERE DATE = '{calculation_date}'" if calculation_date else ""
sql_query = f"""
SELECT {select_cols}
FROM `{db_name}`.`{table_name}`
{where_clause}
"""
# Create temporary transformation using proper SDK classes
temp_table = f"temp_query_result_{int(time.time())}"
transformation_id = f"temp_query_{table_name}_{int(time.time())}"
print(f" Creating and running transformation...", flush=True)
# Use TransformationDestination class
destination = TransformationDestination.raw(
database=db_name,
table=temp_table
)
# Create transformation with proper structure
transformation = client.transformations.create(
Transformation(
external_id=transformation_id,
name=f"Temp Query: {table_name}",
query=sql_query,
destination=destination,
ignore_null_fields=False,
is_public=True
)
)
# Run the transformation job
job = client.transformations.run(
transformation_id=transformation.id,
wait=True # Wait for completion
)
if job.status == "Completed":
# Read results from temp table
result_rows = client.raw.rows.list(db_name=db_name, table_name=temp_table, limit=None)
df = pd.DataFrame([row.columns for row in result_rows])
# Cleanup
try:
client.raw.tables.delete(db_name=db_name, name=temp_table)
client.transformations.delete(id=transformation.id)
except:
pass # Ignore cleanup errors
elapsed = time.time() - table_start
print(f" ✓ [Transformation] {len(df):,} rows in {elapsed:.1f}s")
return df
else:
print(f" ✗ [Transformation] Job failed: {job.status}")
# Cleanup on failure
try:
client.transformations.delete(id=transformation.id)
except:
pass
return pd.DataFrame()
except Exception as e:
print(f" ✗ Transformation Error: {str(e)}, falling back to standard fetch")
return fetch_raw_table(client, db_name, table_name, calculation_date, columns)
Error:
✗ Transformation Error: Transform config is invalid: Source credentials are not configured.
Destination credentials are not configured. | code: 400 | X-Request-ID: e12703cc-8e36-99de-9b24-aece108a1ece

Check the
documentation
Ask the
Community
Take a look
at
Academy
Cognite
Status
Page
Contact
Cognite Support