Skip to main content
Solved

Concurrent API calls with multithreading. I don't get the same result as from one large call

  • October 31, 2024
  • 3 replies
  • 52 views

Anders Brakestad
Seasoned

Hi Developers! :)

I have a use case where I need to retrieve a large number of Time Series data points from CDF. We are a final DataFrame with a shape of approximately (1000, 400000).  So a total of 4 billion data points + the index. I also want the data retrieve to be fast…. :) 

 

So instead of retrieving everything in a single call to `client.time_series.data.retrieve_dataframe`, I thought to use multithreading to start the retrieves in an approximately parallel fashion, and then concatenate the resulting dataframaes at huge runtime gains! However, I observe that the results from multithreading do not equal the results from the single call. :(

 

Specifically:

I split the data retrieval on Time Series IDs, and concatenate along axis=1. I observe that data from the “first two” threads have a lot of missing data, while data from the last two threads are 100% correct. I have also tried to split the DatetimeIndex and multithread along this dimension, but I see exactly the same result in that some of the “splits” show correct data some do not.

 

Here is a working example (although a bit reduced in data size) that produces the observed discrepancies:

#######
# Setup
#######
import pandas as pd
import numpy as np

from cognite.client import CogniteClient
from multiprocess.pool import ThreadPool


client = CogniteClient()

ids = [151613618059, 1084354269001, 4066648994349, 4278941460069, 4709909452530, 5042390272421, 6066084198436, 8037054137423, 9102266532608, 10454740690987, 11198241701932, 11442771394712, 11924255376671, 12205298840213, 12450015417880, 13102387455282, 13867954881929, 14302640993401, 15260857832022, 16362008262827, 17197578322170, 17418680798200, 17647258485390, 18690562773274, 22110867945088, 22604477330101, 22803842097478, 24743043888447, 28003968041505, 28705019938259, 32124743715415, 32174031561617, 32628253858149, 32800502091827, 33092491779293]

retrieve_kws = {
    "id": ids,
    "granularity": "1min",
    "aggregates": ["average"],
    "limit": None,
    "uniform_index": True,
    "include_aggregate_name": False,
    "column_names": "id",
}


###########################
# Define retrieve functions
###########################
def split_date_range(dr: pd.DatetimeIndex, n: int) -> list:
    """Split a date range into n subsets.

    End exclusive in each split.

    Args:
        dr (pd.DatetimeIndex): Date range
        n (int): Number of subsets

    Returns:
        list: List of tuples with start and end times

    """
    splits = np.array_split(dr, n)
    result = []
    for split in splits:
        result.append((split[0], split[-1] + pd.Timedelta(dr.freq)))
    return result


def retrieve_single(client: CogniteClient, start: pd.Timestamp, end: pd.Timestamp, kwargs) -> pd.DataFrame:
    """Retrieve time series data in a single call to the API.

    Args:
        client (CogniteClient): Cognite client
        start (pd.Timestamp): Start time
        end (pd.Timestamp): End time
        kwargs (dict): Keyword arguments for the retrieve method

    Returns:
        pd.DataFrame: DataFrame with time series data

    """
    return client.time_series.data.retrieve_dataframe(
        start=start,
        end=end,
        **kwargs,
    )


def retrieve_concurrent(client: CogniteClient, start: pd.Timestamp, end: pd.Timestamp, kwargs) -> pd.DataFrame:
    """Retrieve time series data concurrently with multiple threads.

    Args:
        client (CogniteClient): Cognite client
        start (pd.Timestamp): Start time
        end (pd.Timestamp): End time
        kwargs (dict): Keyword arguments for the retrieve method

    Returns:
        pd.DataFrame: DataFrame with time series data

    """
    dr = pd.date_range(start, end, freq=kwargs["granularity"], inclusive="left")
    splits = split_date_range(dr, 4)
    args = [(client, start, end, kwargs) for start, end in splits]
    with ThreadPool(4) as pool:
        results = pool.starmap(
            retrieve_single, args
        )
    return pd.concat(results, axis=0)



###############
# Retrieve data
###############
end = pd.Timestamp.now().floor("1d")
start = end - pd.Timedelta(weeks=30)

df1 = retrieve_single(client=client, start=start, end=end, kwargs=retrieve_kws)
df2 = retrieve_concurrent(client=client, start=start, end=end, kwargs=retrieve_kws)

 

Sample output:

############################
# Data from normal retrieval
############################

                     151613618059  1084354269001  4066648994349  4278941460069  4709909452530
2024-04-04 00:00:00           NaN      75.687863            0.0       0.609659            NaN
2024-04-04 00:01:00           NaN      75.675496            0.0       0.625718            NaN
2024-04-04 00:02:00           NaN      75.667475            0.0       0.587738            NaN
2024-04-04 00:03:00           NaN      75.693901            0.0       0.615769            NaN
2024-04-04 00:04:00           NaN      75.681177            0.0       0.622529            NaN
...                           ...            ...            ...            ...            ...
2024-10-30 23:55:00           NaN      77.970664            0.0       0.000000            NaN
2024-10-30 23:56:00           NaN      78.110861            0.0       0.000000            NaN
2024-10-30 23:57:00           NaN      78.313353            0.0       0.000000            NaN
2024-10-30 23:58:00           NaN      78.445857            0.0       0.000000            NaN
2024-10-30 23:59:00           NaN      78.362344            0.0       0.000000            NaN

[302400 rows x 5 columns]



###################################
# Data from multithreaded retrieval
###################################
                     151613618059  1084354269001  4066648994349  4278941460069  4709909452530
2024-04-04 00:00:00           NaN      75.687863            0.0       0.609659            NaN
2024-04-04 00:01:00           NaN      75.675496            0.0       0.625718            NaN
2024-04-04 00:02:00           NaN      75.667475            0.0       0.587738            NaN
2024-04-04 00:03:00           NaN      75.693901            0.0       0.615769            NaN
2024-04-04 00:04:00           NaN      75.681177            0.0       0.622529            NaN
...                           ...            ...            ...            ...            ...
2024-10-30 23:55:00           NaN            NaN            NaN            NaN            NaN
2024-10-30 23:56:00           NaN            NaN            NaN            NaN            NaN
2024-10-30 23:57:00           NaN            NaN            NaN            NaN            NaN
2024-10-30 23:58:00           NaN            NaN            NaN            NaN            NaN
2024-10-30 23:59:00           NaN            NaN            NaN            NaN            NaN

[302400 rows x 5 columns]

 

Another thing I observe is that if I reduce the amounts of data to be retrieved (by limiting the number of time series or the date range), I get 100% match between the normal and multithreaded method.

 

I can already anticipate the reply: “We do not recommend retrieving data from the API in this manner.” :P If so, what would be the best practice for speeding up large data retrieval?

Thanks for the help!

Anders

Best answer by Håkon V. Treider

Short answer as to why retrieve_concurrent is no faster, is that the CogniteClient uses a singleton thread pool executor (TPE), and each of these tasks probably saturate it on its own - or - on the off chance they are not, the API is probably 429’ing you and telling you to slow down.

There’s ~three/four approaches here given that you really need to speed up this retrieval process:

  1. A lot of time is spent combining numpy arrays into the final dataframe. Use retrieve_arrays instead and write your own concatenator logic (the SDK simply uses pd.concat).
    1. If you don’t need all time series in the final dataframe, you can save a ton of time concatenating smaller subsets.
  2. Patch the SDK and replace the singleton TPE with “a new TPE on each method call invocation” so that your 4 threads each calls into a pool of their own.
    1. This will probably lead to excessive 429 (as you are basically DOSing the API). To solve this, each client should be a separately instantiated principal with its own request budget towards the API.
  3. Don’t split the time domain, but split on the number of time series.
  4. Play around with max_workers . Note that due to the nature of the singleton TPE, after the first function call, the size won’t change so you need to restart e.g. your notebook to test with a different value.

And lastly, we do not recommend retrieving data from the API in this manner 😉

View original
Did this topic help you find an answer to your question?

3 replies

Forum|alt.badge.img

Short answer as to why retrieve_concurrent is no faster, is that the CogniteClient uses a singleton thread pool executor (TPE), and each of these tasks probably saturate it on its own - or - on the off chance they are not, the API is probably 429’ing you and telling you to slow down.

There’s ~three/four approaches here given that you really need to speed up this retrieval process:

  1. A lot of time is spent combining numpy arrays into the final dataframe. Use retrieve_arrays instead and write your own concatenator logic (the SDK simply uses pd.concat).
    1. If you don’t need all time series in the final dataframe, you can save a ton of time concatenating smaller subsets.
  2. Patch the SDK and replace the singleton TPE with “a new TPE on each method call invocation” so that your 4 threads each calls into a pool of their own.
    1. This will probably lead to excessive 429 (as you are basically DOSing the API). To solve this, each client should be a separately instantiated principal with its own request budget towards the API.
  3. Don’t split the time domain, but split on the number of time series.
  4. Play around with max_workers . Note that due to the nature of the singleton TPE, after the first function call, the size won’t change so you need to restart e.g. your notebook to test with a different value.

And lastly, we do not recommend retrieving data from the API in this manner 😉


  • Backend developer
  • 36 replies
  • October 31, 2024

I’m not an expert in the Python SDK, but in general, if you want to retrieve time series in parallel, I would let the SDK deal with time ranges and paging, and only do parallelization across time series. That should also be the fastest.

Edit: Too slow, and Håkon had a better answer anyway :)


Anders Brakestad
Seasoned

Thanks for the insights!

It looks like the effort to make multithreading work is not worth it. I might play around with retrieving numpy arrays, but I’ll probably settle for a bit of wait time.


Reply


Cookie Policy

We use cookies to enhance and personalize your experience. If you accept you agree to our full cookie policy. Learn more about our cookies.

 
Cookie Settings