We are trying to implement automatic monitoring over our transformations in CDF. We are doing this by running a transformation and a monitoring function in the same workflow, where the monitoring function is dependent on the transformation being finished.
Then we retrieve the sum of updated and created events like this:
def _created_and_updated_sum(self, transformation_external_id: str) -> int:
"""
Get the sum of the number of created and updated events in the last
transformation job.
If there are no finished jobs, return 0 instead.
"""
transformation = self.cdf_client.transformations.retrieve(
external_id = transformation_external_id,
)
if transformation is None:
raise ValueError(f"Transformation {transformation_external_id} not found")
if transformation.last_finished_job is None:
DUMMY = 0
self.logger.info(f"Transformation {transformation_external_id} has no finished jobs. Setting sum to dummy value {DUMMY}")
return DUMMY
metrics = transformation.last_finished_job.metrics()
counts: defaultdict[str, int] = defaultdict(lambda: 0)
for metric in metrics:
if metric.name is None \
or metric.count is None:
continue
counts[metric.name] = max(counts[metric.name], metric.count)
return \
counts['events.created'] + \
counts['events.updated']
The issue we are having is this only works sometimes. The transformation job will be marked as complete, but the metrics returned from the API won’t be accurate.
I saw that this was also the case in the GUI for Cognite, where the transformation would be marked as complete, and then slowly the count of updated/created events would increase.
The only solution we can think of right now is to put the monitoring function to sleep for an arbitrary amount of minutes, to wait for the metrics to (hopefully) be accurate. But we want to avoid this if possible.
Is there a better solution? Maybe some sort of event to subscribe to for when the metrics are accurately counted?
Check the
documentation
Ask the
Community
Take a look
at
Academy
Cognite
Status
Page
Contact
Cognite Support