Solved

Timeseries creation - how to make it faster

  • 7 August 2023
  • 7 replies
  • 141 views

Badge +2

Hi 

I have a final computed dataframe that looks like this. sample set alone listed here (15 rows)

 

There are 570 records. I need to create timeseries for sumprod and derivedYields columns as per the value in Yieldcode. I have written the code and it is running and timeseries is getting created. But it is running slow. How to make it get done faster?

Any best method? Please advise

 

    sumprod Yieldcode Rank Group derivedYields MBBLD Vol LP-Class
1 532 0.009636 VBALFG1 1 VBAL 0.009636 3.524946 FG1
2 533 0 VBALLP1 2 VBAL 0 0 LP1
3 534 0.210269 VBALFN1 3 VBAL 0.218807 80.0382 FN1
4 535 0.037774 VBALLW1 4 VBAL 0 0 LW1
5 536 0.048263 VBALSK1 5 VBAL 0.094867 34.70158 SK1
6 537 0.017368 VBALHK1 6 VBAL 0 0 HK1
7 538 0.128149 VBALSR1 7 VBAL 0.142624 52.17094 SR1
8 539 0.014722 VBALDS1 8 VBAL 0 0 DS1
9 540 0.034342 VBALAG1 9 VBAL 0.034588 12.65213 AG1
10 541 0.00965 VBALLV1 10 VBAL 0.00965 3.529802 LV1
11 542 0.12273 VBALMV1 11 VBAL 0.151158 55.29262 MV1
12 543 0.028428 VBALSV1 12 VBAL 0 0 SV1
13 544 0.042119 VBALHV1 13 VBAL 0.077783 28.45268 HV1
14 545 0.035664 VBALXV1 14 VBAL 0 0 XV1
15 546 0.260887 VBALVR1 15 VBAL 0.260887 95.43076 VR1

 

 

 


# Function to create or update timeseries and datapoints
def create_or_update_timeseries_and_datapoint(yieldcode, column_name, value):
timeseries_external_id = f"ts-YT-LP-{yieldcode}-{column_name.lower()}"
existing_timeseries = client.time_series.retrieve(external_id=timeseries_external_id)

if not existing_timeseries:
# Create the timeseries if it doesn't exist
ts = TimeSeries(asset_id=1186599902844586,external_id=timeseries_external_id,name=f"ts-YT-LP-{yieldcode}-{column_name}",is_string=False)
timeseries = client.time_series.create(time_series=ts)
else:
timeseries = existing_timeseries

# Prepare datapoint
datapoint = [
(datetime.datetime.utcnow(),value),
]

# Store the datapoint
client.time_series.data.insert(id=timeseries.id,datapoints=datapoint)

# Loop through the DataFrame and create/update timeseries and datapoints
for index, row in yld_lp_final.iterrows():
yieldcode = row['Yieldcode']
create_or_update_timeseries_and_datapoint(yieldcode, 'vol_sprod', row['sumprod'])
create_or_update_timeseries_and_datapoint(yieldcode, 'derived_yld', row['derivedYields'])

 

icon

Best answer by HaydenH 9 August 2023, 11:01

View original

7 replies

Hi @eashwar11, it is great to see that your use case is progressing along nicely. Here are some tips I can advise when developing on top of CDF:

  1. Try to minimise the number of calls to CDF where possible. One way to do this is to instead make use of a try-except block. You might have heard about this principle as “better to ask for forgiveness than ask for permission”. In the current if/else block, it will check every single time before it creates something. Instead just put the uploading part in the try block, and then catch the appropriate error (make sure not to do a plain except Exception) and then put the creation step in the except block.
  2. Where possible, upload the whole dataframe containing multiple datapoints for multiple time series objects in one go. Furthermore, iterating through a dataframe with iterrows is not super efficient.  Instead, it is best to perform vectorised operations where possible. Just make sure you have the external IDs in the columns of the dataframe and then set the external_id_headers variable here as True. So the structure of your code logic might look more like this pseudo code:
def upsert(...):

try:
upload_data(...)
except CogniteNotFoundError as e:
create_ts_objects(...)
upload_data(...)


output_df = vectorised_function(yld_lp_final)
upsert(output_df)

Regarding the vectorisation, you might find this blog post helpful. I hope this helps. :)

EDIT: You can also make use of the upsert function, which is a convenience method to create/update TS objects in one go.

Badge +2

Thanks @HaydenH . I will adopt the vectorization construct and implement those for creating timeseries objects for each row in my final dataframe. 

I can try to vectorize and create a list that will contain all the list of externalIDs for the ‘volc_prod’ and a list for datapoints for those corresponding externalIDs

so like 

list_volc_prod = [ts_<value1>_volc_prod , ts_<value2>_volc_prod , ………..]

list_datapoints_volc_prod [<value-for-prod1>, <val-for-prod2>,……..]

I am not sure how to upsert them all in one -go when I have to create the objects or update (if they existed already in CDF) using the above two lists. 

Also, the docs (with examples) only lists for individual timeseries objects and not able to follow for bulk upsert.

TimeSeriesAPI.upsert(item: TimeSeries | Sequence[TimeSeries], mode: Literal[('patch', 'replace')] = 'patch') → TimeSeries | TimeSeriesList

 

Also, I need to upsert datapoints as well for those corresponding timeseries objects upserted. 

 

Thanks @HaydenH . I will adopt the vectorization construct and implement those for creating timeseries objects for each row in my final dataframe. 

I can try to vectorize and create a list that will contain all the list of externalIDs for the ‘volc_prod’ and a list for datapoints for those corresponding externalIDs

so like 

list_volc_prod = [ts_<value1>_volc_prod , ts_<value2>_volc_prod , ………..]

list_datapoints_volc_prod [<value-for-prod1>, <val-for-prod2>,……..]

I am not sure how to upsert them all in one -go when I have to create the objects or update (if they existed already in CDF) using the above two lists. 

Also, the docs (with examples) only lists for individual timeseries objects and not able to follow for bulk upsert.

TimeSeriesAPI.upsert(item: TimeSeries | Sequence[TimeSeries], mode: Literal[('patch', 'replace')] = 'patch') → TimeSeries | TimeSeriesList

 

Also, I need to upsert datapoints as well for those corresponding timeseries objects upserted. 

 

No worries, happy to help. Regarding your current questions:

  1. Regarding the docs/examples, if you take a look at the type annotation, you will see that you can have either a TimeSeries or a Sequence[TimeSeries] (I have bolded this in your quoted reply). NB: That the vertical line between them (in bold) is equivalent to a logical OR operation. This means you could have any type of a sequence of TimeSeries passed into the upsert function as well as a singular TimeSeries. A nice article on sequences in Python can be found here. In short, I would recommend upserting a List[TimeSeries] if you want to update/create TS objects in CDF in one go, however creating a time series and uploading points is still a two-step operation (sorry for any confusion caused).
  2. Inserting datapoints has to be done separately, but note that this is actually an upsert operation too. So if you have a datapoint at time t, and then try to insert a new datapoint at time t, then it will overwrite it.
  3. Yes, you could upload as a list of datapoints, however, my preference would be to use a Dataframe. Here, I will write out an example of what this might look like:

If you have a DF that looks like this (let us call it df):

timestamp external_id0 external_id1
t_0 value_00 value_01
t_1 value_10 value_11

Then this gives you two benefits:

  1. The upsert operation is going to upload everything in one go as long as they are on CDF with:
    client.time_series.data.insert_dataframe(df, external_headers=True)
  2. Pandas is a library that makes many vector operations readily available. For example, let us assume you want to multiply one of your columns by a constant K. Then a vectorised version would simply be (compare this with how much time it takes to do it in a loop with iterrows and the benefits really show up in large datasets): df[“new_col”] = df[“external_id0”] * K

Note that you still need to do handling if there are time series that may not be found in the DF as I mentioned in my first post, i.e., a try-except clause. You can get the items that failed to be processed from the custom exceptions (e.g., here). I hope that helps out. :)

Badge +2

Hi @HaydenH 

I have tried to create the dataframe and the attached csv is the final df. (It has all the external IDs that is to be created in CDF along with the row for the corresponding values. there is also a timestamp column)

 

Before I run this line to insert the datapoints with :

client.time_series.data.insert_dataframe(df,external_id_headers=True)

I need to create the timeseries objects using the list of externalIDs that are there in this df. 

Can I pass the list of externalIDs to the upsert operation? TimeSeriesAPI.upsert()

Also, each of the timeseries has to be linked to an asset. Can I Pass that as a list into the upsert function?

Please advise. 

So, you could perform the upsert on the Timeseries first, this means you need to have all the external IDs and if you need to contextualise them (such as attaching to an asset), then you should have that data as well to supply. This will be safe, and should not need any error handling, but will be slower. If the bottlenecks you experienced before came more from the lack of vectorisation, then this might be fine for this solution. Note that the example here shows how you can create one as well as update two TS objects.

from cognite.client import CogniteClient
from cognite.client.data_classes import TimeSeries
c = CogniteClient()
existing_time_series = c.time_series.retrieve(id=1)
existing_time_series.description = "New description"
new_time_series = TimeSeries(external_id="new_timeSeries", description="New timeSeries")
res = c.time_series.upsert([existing_time_series, new_time_series], mode="replace")

So, as you can see in the last line, there is a list of two different TimeSeries objects, this is what you can upsert. It does not matter if they are existing or not, the upsert will create the new and update the old. Notice how the new_time_series variable is defined, you need to use the TimeSeries class. Now if you look at the page for the TimeSeries class here, then one can see the parameters that are accepted, which include things like metadata (a dictionary/JSON structure to put additional info into), unit (e.g., might be a physical unit like kg), and indeed as you asked, the asset_id. So if you were creating/updating a list of TimeSeries with assets, descriptions, metadata, etc, it might look like this:

ts_list = [
TimeSeries(
external_id=,
name=,
description=,
metadata={"key1": "val1", "key2": numerical_val, ...},
asset_id=,
),
...
]

Note that this list should probably be generated by a function that can build the time series object given the external ID. Then you can use the try-except clause as I mentioned, which will be quicker overall. So then you have a structure like this:

try:
# upload data
client.time_series.data.insert_dataframe(df)
except CogniteNotFoundError as e:
# get list of failed items as per https://cognite-sdk-python.readthedocs-hosted.com/en/latest/exceptions.html#cognitenotfounderror
missing_xids = e.failed
create_ts_objects(client, missing_xids, other_params)
# upload remaining TS objects
client.time_series.data.insert_dataframe(df[missing_xids])

The ts_list would be what is created inside the create_ts_objects function (don’t forget to create with the client inside though). Note that you may need to pass other variables in as well to create TS objects that can be contextualised. Does this make more sense?

Also, just to clarify, the example csv file only has one row because you wanted to show as an example or you are only getting one row of data at a time, is that correct? If neither, it is best you upload as many as possible (subject to limits).

Badge +2

Hi @HaydenH 

I completed the code and ran the code-block. It created all the necessary timeseries objects and I also was able to link these objects to its corresponding assets too. 

I have one further question: This does the job when I take one record at a time and perform through a series of computation to create the necessary timeseries objects. 

When I am given a range, I will now have to pick all the records in that range (say, 300+ rows) and then perform the same series of computations for each row. After successfully processing and deriving the datapoints for the timeseries objects, I will have to move to the next row and continue until I finish the range. 

In this case, I cannot get away from loops but iterate through each row to derive the datapoints. Each processing job takes around 90 - 100 seconds. 

 

Please advise if this approach is okay in CDF.

Great to know that you have successfully created and linked them @eashwar11. 😀 Just as an aside in case you were not aware: another good practice I can advise is to create a “bootstrap” script/program out of your code you have created, such that you can replicate the process later if necessary. You may have both a “dev” and a “prod” environment (or perhaps you have more), and so it is important to be able to replicate across different projects/tenants to see that your solutions scale and bugs are not pushed to prod (or at least fewer bugs are pushed on).

Now answering your actual question, well it all depends. Depending on whether the cloud service being used in your project is Google or Microsoft, you will have either a 10 or 15 minute time limit (with inverted CPU resources, i.e., lower time limit has higher resources) for your Cognite Function. So if you only have a few iterations to do, then you should be fine. You can of course also set multiple schedules if there is a way to partition the jobs, however this obviously increases resource usage.

However, without knowing the details of your calculation, I cannot advise for certain as to whether you can vectorise the operations or not. Sometimes it can be done, and sometimes not. You could potentially make use of the groupby method in pandas and group them by a column in your DF, or maybe create a categorical column and then group by that column. That might break your DF up into chunks where you can apply the same operation across the whole chunk and you then loop over the chunks which is better than iterating every row. If you are doing “rolling” calculations, then you could make use of the rolling functionality in pandas. As you can see in the examples in the docs, there are numerous functions one could use (sum, mean, etc).

Hope that helps. 😊

Reply