Skip to main content
Answer

Concurrent API calls with multithreading. I don't get the same result as from one large call

  • October 31, 2024
  • 3 replies
  • 66 views

Anders Brakestad
Seasoned

Hi Developers! :)

I have a use case where I need to retrieve a large number of Time Series data points from CDF. We are a final DataFrame with a shape of approximately (1000, 400000).  So a total of 4 billion data points + the index. I also want the data retrieve to be fast…. :) 

 

So instead of retrieving everything in a single call to `client.time_series.data.retrieve_dataframe`, I thought to use multithreading to start the retrieves in an approximately parallel fashion, and then concatenate the resulting dataframaes at huge runtime gains! However, I observe that the results from multithreading do not equal the results from the single call. :(

 

Specifically:

I split the data retrieval on Time Series IDs, and concatenate along axis=1. I observe that data from the “first two” threads have a lot of missing data, while data from the last two threads are 100% correct. I have also tried to split the DatetimeIndex and multithread along this dimension, but I see exactly the same result in that some of the “splits” show correct data some do not.

 

Here is a working example (although a bit reduced in data size) that produces the observed discrepancies:

#######
# Setup
#######
import pandas as pd
import numpy as np

from cognite.client import CogniteClient
from multiprocess.pool import ThreadPool


client = CogniteClient()

ids = [151613618059, 1084354269001, 4066648994349, 4278941460069, 4709909452530, 5042390272421, 6066084198436, 8037054137423, 9102266532608, 10454740690987, 11198241701932, 11442771394712, 11924255376671, 12205298840213, 12450015417880, 13102387455282, 13867954881929, 14302640993401, 15260857832022, 16362008262827, 17197578322170, 17418680798200, 17647258485390, 18690562773274, 22110867945088, 22604477330101, 22803842097478, 24743043888447, 28003968041505, 28705019938259, 32124743715415, 32174031561617, 32628253858149, 32800502091827, 33092491779293]

retrieve_kws = {
"id": ids,
"granularity": "1min",
"aggregates": ["average"],
"limit": None,
"uniform_index": True,
"include_aggregate_name": False,
"column_names": "id",
}


###########################
# Define retrieve functions
###########################
def split_date_range(dr: pd.DatetimeIndex, n: int) -> list:
"""Split a date range into n subsets.

End exclusive in each split.

Args:
dr (pd.DatetimeIndex): Date range
n (int): Number of subsets

Returns:
list: List of tuples with start and end times

"""
splits = np.array_split(dr, n)
result = []
for split in splits:
result.append((split[0], split[-1] + pd.Timedelta(dr.freq)))
return result


def retrieve_single(client: CogniteClient, start: pd.Timestamp, end: pd.Timestamp, kwargs) -> pd.DataFrame:
"""Retrieve time series data in a single call to the API.

Args:
client (CogniteClient): Cognite client
start (pd.Timestamp): Start time
end (pd.Timestamp): End time
kwargs (dict): Keyword arguments for the retrieve method

Returns:
pd.DataFrame: DataFrame with time series data

"""
return client.time_series.data.retrieve_dataframe(
start=start,
end=end,
**kwargs,
)


def retrieve_concurrent(client: CogniteClient, start: pd.Timestamp, end: pd.Timestamp, kwargs) -> pd.DataFrame:
"""Retrieve time series data concurrently with multiple threads.

Args:
client (CogniteClient): Cognite client
start (pd.Timestamp): Start time
end (pd.Timestamp): End time
kwargs (dict): Keyword arguments for the retrieve method

Returns:
pd.DataFrame: DataFrame with time series data

"""
dr = pd.date_range(start, end, freq=kwargs["granularity"], inclusive="left")
splits = split_date_range(dr, 4)
args = [(client, start, end, kwargs) for start, end in splits]
with ThreadPool(4) as pool:
results = pool.starmap(
retrieve_single, args
)
return pd.concat(results, axis=0)



###############
# Retrieve data
###############
end = pd.Timestamp.now().floor("1d")
start = end - pd.Timedelta(weeks=30)

df1 = retrieve_single(client=client, start=start, end=end, kwargs=retrieve_kws)
df2 = retrieve_concurrent(client=client, start=start, end=end, kwargs=retrieve_kws)

 

Sample output:

############################
# Data from normal retrieval
############################

151613618059 1084354269001 4066648994349 4278941460069 4709909452530
2024-04-04 00:00:00 NaN 75.687863 0.0 0.609659 NaN
2024-04-04 00:01:00 NaN 75.675496 0.0 0.625718 NaN
2024-04-04 00:02:00 NaN 75.667475 0.0 0.587738 NaN
2024-04-04 00:03:00 NaN 75.693901 0.0 0.615769 NaN
2024-04-04 00:04:00 NaN 75.681177 0.0 0.622529 NaN
... ... ... ... ... ...
2024-10-30 23:55:00 NaN 77.970664 0.0 0.000000 NaN
2024-10-30 23:56:00 NaN 78.110861 0.0 0.000000 NaN
2024-10-30 23:57:00 NaN 78.313353 0.0 0.000000 NaN
2024-10-30 23:58:00 NaN 78.445857 0.0 0.000000 NaN
2024-10-30 23:59:00 NaN 78.362344 0.0 0.000000 NaN

[302400 rows x 5 columns]



###################################
# Data from multithreaded retrieval
###################################
151613618059 1084354269001 4066648994349 4278941460069 4709909452530
2024-04-04 00:00:00 NaN 75.687863 0.0 0.609659 NaN
2024-04-04 00:01:00 NaN 75.675496 0.0 0.625718 NaN
2024-04-04 00:02:00 NaN 75.667475 0.0 0.587738 NaN
2024-04-04 00:03:00 NaN 75.693901 0.0 0.615769 NaN
2024-04-04 00:04:00 NaN 75.681177 0.0 0.622529 NaN
... ... ... ... ... ...
2024-10-30 23:55:00 NaN NaN NaN NaN NaN
2024-10-30 23:56:00 NaN NaN NaN NaN NaN
2024-10-30 23:57:00 NaN NaN NaN NaN NaN
2024-10-30 23:58:00 NaN NaN NaN NaN NaN
2024-10-30 23:59:00 NaN NaN NaN NaN NaN

[302400 rows x 5 columns]

 

Another thing I observe is that if I reduce the amounts of data to be retrieved (by limiting the number of time series or the date range), I get 100% match between the normal and multithreaded method.

 

I can already anticipate the reply: “We do not recommend retrieving data from the API in this manner.” :P If so, what would be the best practice for speeding up large data retrieval?

Thanks for the help!

Anders

Best answer by Håkon V. Treider

Short answer as to why retrieve_concurrent is no faster, is that the CogniteClient uses a singleton thread pool executor (TPE), and each of these tasks probably saturate it on its own - or - on the off chance they are not, the API is probably 429’ing you and telling you to slow down.

There’s ~three/four approaches here given that you really need to speed up this retrieval process:

  1. A lot of time is spent combining numpy arrays into the final dataframe. Use retrieve_arrays instead and write your own concatenator logic (the SDK simply uses pd.concat).
    1. If you don’t need all time series in the final dataframe, you can save a ton of time concatenating smaller subsets.
  2. Patch the SDK and replace the singleton TPE with “a new TPE on each method call invocation” so that your 4 threads each calls into a pool of their own.
    1. This will probably lead to excessive 429 (as you are basically DOSing the API). To solve this, each client should be a separately instantiated principal with its own request budget towards the API.
  3. Don’t split the time domain, but split on the number of time series.
  4. Play around with max_workers . Note that due to the nature of the singleton TPE, after the first function call, the size won’t change so you need to restart e.g. your notebook to test with a different value.

And lastly, we do not recommend retrieving data from the API in this manner 😉

3 replies

Forum|alt.badge.img

Short answer as to why retrieve_concurrent is no faster, is that the CogniteClient uses a singleton thread pool executor (TPE), and each of these tasks probably saturate it on its own - or - on the off chance they are not, the API is probably 429’ing you and telling you to slow down.

There’s ~three/four approaches here given that you really need to speed up this retrieval process:

  1. A lot of time is spent combining numpy arrays into the final dataframe. Use retrieve_arrays instead and write your own concatenator logic (the SDK simply uses pd.concat).
    1. If you don’t need all time series in the final dataframe, you can save a ton of time concatenating smaller subsets.
  2. Patch the SDK and replace the singleton TPE with “a new TPE on each method call invocation” so that your 4 threads each calls into a pool of their own.
    1. This will probably lead to excessive 429 (as you are basically DOSing the API). To solve this, each client should be a separately instantiated principal with its own request budget towards the API.
  3. Don’t split the time domain, but split on the number of time series.
  4. Play around with max_workers . Note that due to the nature of the singleton TPE, after the first function call, the size won’t change so you need to restart e.g. your notebook to test with a different value.

And lastly, we do not recommend retrieving data from the API in this manner 😉


matiasholte
Practitioner
  • Backend developer
  • October 31, 2024

I’m not an expert in the Python SDK, but in general, if you want to retrieve time series in parallel, I would let the SDK deal with time ranges and paging, and only do parallelization across time series. That should also be the fastest.

Edit: Too slow, and Håkon had a better answer anyway :)


Anders Brakestad
Seasoned

Thanks for the insights!

It looks like the effort to make multithreading work is not worth it. I might play around with retrieving numpy arrays, but I’ll probably settle for a bit of wait time.