Hi Developers! :)
I have a use case where I need to retrieve a large number of Time Series data points from CDF. We are a final DataFrame with a shape of approximately (1000, 400000). So a total of 4 billion data points + the index. I also want the data retrieve to be fast…. :)
So instead of retrieving everything in a single call to `client.time_series.data.retrieve_dataframe`, I thought to use multithreading to start the retrieves in an approximately parallel fashion, and then concatenate the resulting dataframaes at huge runtime gains! However, I observe that the results from multithreading do not equal the results from the single call. :(
Specifically:
I split the data retrieval on Time Series IDs, and concatenate along axis=1. I observe that data from the “first two” threads have a lot of missing data, while data from the last two threads are 100% correct. I have also tried to split the DatetimeIndex and multithread along this dimension, but I see exactly the same result in that some of the “splits” show correct data some do not.
Here is a working example (although a bit reduced in data size) that produces the observed discrepancies:
#######
# Setup
#######
import pandas as pd
import numpy as np
from cognite.client import CogniteClient
from multiprocess.pool import ThreadPool
client = CogniteClient()
ids = [151613618059, 1084354269001, 4066648994349, 4278941460069, 4709909452530, 5042390272421, 6066084198436, 8037054137423, 9102266532608, 10454740690987, 11198241701932, 11442771394712, 11924255376671, 12205298840213, 12450015417880, 13102387455282, 13867954881929, 14302640993401, 15260857832022, 16362008262827, 17197578322170, 17418680798200, 17647258485390, 18690562773274, 22110867945088, 22604477330101, 22803842097478, 24743043888447, 28003968041505, 28705019938259, 32124743715415, 32174031561617, 32628253858149, 32800502091827, 33092491779293]
retrieve_kws = {
"id": ids,
"granularity": "1min",
"aggregates": ["average"],
"limit": None,
"uniform_index": True,
"include_aggregate_name": False,
"column_names": "id",
}
###########################
# Define retrieve functions
###########################
def split_date_range(dr: pd.DatetimeIndex, n: int) -> list:
"""Split a date range into n subsets.
End exclusive in each split.
Args:
dr (pd.DatetimeIndex): Date range
n (int): Number of subsets
Returns:
list: List of tuples with start and end times
"""
splits = np.array_split(dr, n)
result = []
for split in splits:
result.append((split[0], split[-1] + pd.Timedelta(dr.freq)))
return result
def retrieve_single(client: CogniteClient, start: pd.Timestamp, end: pd.Timestamp, kwargs) -> pd.DataFrame:
"""Retrieve time series data in a single call to the API.
Args:
client (CogniteClient): Cognite client
start (pd.Timestamp): Start time
end (pd.Timestamp): End time
kwargs (dict): Keyword arguments for the retrieve method
Returns:
pd.DataFrame: DataFrame with time series data
"""
return client.time_series.data.retrieve_dataframe(
start=start,
end=end,
**kwargs,
)
def retrieve_concurrent(client: CogniteClient, start: pd.Timestamp, end: pd.Timestamp, kwargs) -> pd.DataFrame:
"""Retrieve time series data concurrently with multiple threads.
Args:
client (CogniteClient): Cognite client
start (pd.Timestamp): Start time
end (pd.Timestamp): End time
kwargs (dict): Keyword arguments for the retrieve method
Returns:
pd.DataFrame: DataFrame with time series data
"""
dr = pd.date_range(start, end, freq=kwargs["granularity"], inclusive="left")
splits = split_date_range(dr, 4)
args = [(client, start, end, kwargs) for start, end in splits]
with ThreadPool(4) as pool:
results = pool.starmap(
retrieve_single, args
)
return pd.concat(results, axis=0)
###############
# Retrieve data
###############
end = pd.Timestamp.now().floor("1d")
start = end - pd.Timedelta(weeks=30)
df1 = retrieve_single(client=client, start=start, end=end, kwargs=retrieve_kws)
df2 = retrieve_concurrent(client=client, start=start, end=end, kwargs=retrieve_kws)
Sample output:
############################
# Data from normal retrieval
############################
151613618059 1084354269001 4066648994349 4278941460069 4709909452530
2024-04-04 00:00:00 NaN 75.687863 0.0 0.609659 NaN
2024-04-04 00:01:00 NaN 75.675496 0.0 0.625718 NaN
2024-04-04 00:02:00 NaN 75.667475 0.0 0.587738 NaN
2024-04-04 00:03:00 NaN 75.693901 0.0 0.615769 NaN
2024-04-04 00:04:00 NaN 75.681177 0.0 0.622529 NaN
... ... ... ... ... ...
2024-10-30 23:55:00 NaN 77.970664 0.0 0.000000 NaN
2024-10-30 23:56:00 NaN 78.110861 0.0 0.000000 NaN
2024-10-30 23:57:00 NaN 78.313353 0.0 0.000000 NaN
2024-10-30 23:58:00 NaN 78.445857 0.0 0.000000 NaN
2024-10-30 23:59:00 NaN 78.362344 0.0 0.000000 NaN
[302400 rows x 5 columns]
###################################
# Data from multithreaded retrieval
###################################
151613618059 1084354269001 4066648994349 4278941460069 4709909452530
2024-04-04 00:00:00 NaN 75.687863 0.0 0.609659 NaN
2024-04-04 00:01:00 NaN 75.675496 0.0 0.625718 NaN
2024-04-04 00:02:00 NaN 75.667475 0.0 0.587738 NaN
2024-04-04 00:03:00 NaN 75.693901 0.0 0.615769 NaN
2024-04-04 00:04:00 NaN 75.681177 0.0 0.622529 NaN
... ... ... ... ... ...
2024-10-30 23:55:00 NaN NaN NaN NaN NaN
2024-10-30 23:56:00 NaN NaN NaN NaN NaN
2024-10-30 23:57:00 NaN NaN NaN NaN NaN
2024-10-30 23:58:00 NaN NaN NaN NaN NaN
2024-10-30 23:59:00 NaN NaN NaN NaN NaN
[302400 rows x 5 columns]
Another thing I observe is that if I reduce the amounts of data to be retrieved (by limiting the number of time series or the date range), I get 100% match between the normal and multithreaded method.
I can already anticipate the reply: “We do not recommend retrieving data from the API in this manner.” :P If so, what would be the best practice for speeding up large data retrieval?
Thanks for the help!
Anders