Solved

SDK - Questions around Global Config

  • 11 October 2023
  • 13 replies
  • 227 views

Userlevel 1

Hello,

  1. What are the best practices around setting the max-workers for a sdk.
  2. If I do not set the Global Config all the default values for workers and retries would be picked correct ?
  3. Can you give any reference on how we can set GlobalConfig, I mean I did not see any way to pass the GlobalConfig to ClientConfig ?
icon

Best answer by Ivar Stangeby 18 October 2023, 14:04

View original

13 replies

Userlevel 1

Hello,
Can anyone checkout this ?
 

Hello,

When it comes to setting the `max-workers` for an SDK, you have a couple of options:

 

Global Configuration: You can set global configurations that will apply to any instance of `CogniteClient` that is created after the global configurations have been set. Here's how you can set the `GlobalConfig`: 

from cognite.client import global_config, ClientConfig
from cognite.client.credentials import Token

global_config.default_client_config = ClientConfig(
    client_name="my-client", project="myproj", credentials=Token("verysecret")
)
global_config.max_workers = <desired_value>  # Set the desired number for max_workers

Remember, any configurations set in the `global_config` should be made prior to instantiating a `CogniteClient` to ensure they take effect.

Directly in the CogniteClient Constructor: The documentation suggests that you can also pass a `ClientConfig` directly when you create a new `CogniteClient`. So, if you only want to set the `max_workers` for a specific instance, you can do so as:

config = ClientConfig(max_workers=<desired_value>, ...<other_params>)
client = CogniteClient(config)

To address your other questions:

  • If you do not set the `GlobalConfig`, then yes, the SDK would utilize the default values for workers, retries, and other parameters.
  • As for passing the `GlobalConfig` to `ClientConfig`, you don't need to pass it directly. Instead, the `ClientConfig` is set as an attribute of `global_config` (as shown in the example above).

Lastly, if you're working with multiple `CogniteClient` instances and making many concurrent requests, consider increasing the `max_connection_pool_size` in the `global_config` to efficiently reuse connections.

I hope this helps!
Let me know if you have any more questions or need further clarification.

Userlevel 3

Hi @Neerajkumar Bhatewara, did the above help you?

Userlevel 1
Badge

Hello,

We tried the approaches suggested and found out that there is not much difference in performance for fetching data from CDF.

For capturing the numbers, we used the following data model configuration:

 

  • Project: slb-test
  • Model: QA-PERF-DATA (QA_SPACE)
  • Version: 1
  • View: TimeseriesData
  • Properties: externalId, timeseriesId, propertyName, entity, frequency
  • Time Range: 15-11-2023 00:00 to 17-11-2023 23:59
  • Filters:
  • propertyName: TARGET-WATER-INJECTION-RATE              , YEP-GAS-PRODUCTION-RATE, METER-OPENING-GAS-VOLUME    ,UC-CONTRACTOR            ,SAND-PRODUCTION    ,UNIT-INJECTION-COST   ,WATER-PRODUCTION-VOLUME,GAS-SALES-VOLUME, GAS-PRODUCTION-RATE,DIFFERENTIAL-PRESSURE-HOLDING-TRIGGERED-VALUE
  • frequency: 15M

 

 

The time taken by the code which uses API call for fetching data from CDF is 25.945018529891968 seconds

 

Details about both the approaches tried and their results are as follows:

Approach 1: Setting max_workers in Global Configuration

Code change:

 

   global_config.default_client_config = ClientConfig(         credentials=OAuthClientCredentials(token_url=token_url,                                        client_id=client_id,client_secret=client_secret,                                  scopes=scopes),         project=cognite_project,         base_url=f"https://{cognite_cluster}.cognitedata.com",         client_name=client_name,         debug=False,     )     global_config.max_workers = 1     #client_config.max_workers=1     global_config.max_connection_pool_size= 10

    return global_config.default_client_config

Here we return the default_client_config object to the python files and each of them generates client object by passing it to CogniteClient

 

The performance numbers captured are:

 

max connection pool size max workers time to fetch data (in seconds)
10 10 185.9922695
10 16 219.8081455
10 32 196.185699
10 1 250.9608867
32 10 166.0820322
32 16 188.2735438
32 32 162.2813222

 

Approach 2: Setting max_workers directly in the ClientConfig:

 

Code change:

 

   client_config = ClientConfig(          credentials=OAuthClientCredentials(token_url=token_url,                                   client_id=client_id,client_secret=client_secret,                                   scopes=scopes),         project=cognite_project,         base_url=f"https://{cognite_cluster}.cognitedata.com",         client_name=client_name,         debug=False,     )

    client_config.max_workers=10     global_config.max_connection_pool_size= 32

    return client_config

Here we return the client_config object to the python files and each of them generates client object by passing it to CogniteClient

 

Performance numbers are:

max connection pool size max workers time to fetch data (in seconds)
10 10 170.77724
10 16 221.9572637
10 32 201.0603657
10 1 178.7179568
32 10 240.3109298
32 16 203.6003859
32 32 211.1403549

 

So, we can see that there is no effect of increasing the max_workers on the time duration of fetching the data.

Could you please let us know how things could be done differently to get the job done in a lesser amount of time?

Userlevel 4

Hi @Gargi Bhosale, could you share the exact fetching code (logic) you used to perform the test (besides setting the ClientConfig)?

Userlevel 1
Badge

Hi @Everton Colling sure. Sharing the code for fetching data.

        if aggregates:

            dps = client.time_series.data.retrieve(external_id=external_id,                    start=int(startParam), end=int(endParam),

                  aggregates=aggregates, granularity=granularity,limit=limit)

        elif is_latestvalue:

            dps =                                        client.time_series.data.retrieve_latest(external_id=external_id)

        else:

            dps = client.time_series.data.retrieve(external_id=external_id,                    start=int(startParam), end=int(endParam),limit=limit)                 

Userlevel 4
Badge

Thanks for sharing your benchmark code. I have a few comments that may help shed some light on performance.

  1. The default number of max workers in the SDK, 10, is a quite high value already. For comparison, other resource types (less performant than datapoints), the API will ignore a higher partitioning count than 10. I would suggest you do performance testing with 1, 2, 5, and 10 perhaps? ...and I don’t think you’ll need to touch the connection pool size at all.
  2. Whenever you are calling the API, you have a “burst budget” and a “sustained budget”; these are quite different. Btw, this is referred to as rate limiting, and is a vital component in protection of the API. After the initial burst budget is spent, the API will force you to slow down by replying with status code 429. Because these are automatically retried by the SDK after a self-imposed timeout (you may want to read up on the exponential backoff strategy with smearing which we use), you won’t notice this except from a lower throughput speed. 
  3. When performance testing datapoints, you should thus add quite significant sleeps between test runs, or run with different credentials, since the rate limiting will for sure mess up your results and/or make them non-deterministic.
  4. Datapoints from rarely retrieved time series are put into cold storage (slower) and takes more time (initially) to retrieve. Thus, if you do two tests and compare the results, the first query might fetch from a physical disk, while the next is retrieved straight from memory. This may very likely give a false impression that the first were much slower than the second, although if you repeat the tests another day in reversed order, the effect is exactly  the opposite.
  5. In your benchmark code I see that you pass the limit argument. In case you pass a non-unlimited argument, do know that the SDK is unable to parallelise as efficiently as for unlimited queries. I’ve added a small performance guide in the SDK documentation awhile back that you may want to check out: https://cognite-sdk-python.readthedocs-hosted.com/en/latest/core_data_model.html#retrieve-datapoints . I’ll also add the main points here:
    1. For best speed, and significantly lower memory usage, consider using retrieve_arrays(...) which uses numpy.ndarrays for data storage.

    2. Only unlimited queries with (limit=None) are always fetched in parallel* so specifying a large finite limit like 1 million, comes with severe performance penalty as data is fetched serially.
      *For limited queries, concurrency is still used when datapoints from multiple time series are requested, but each is still fetch serially.

    3. Try to avoid specifying start and end to be very far from the actual data: If you have data from 2000 to 2015, don’t set start=0 (1970).

  6. Although comparing the time taken for each query is of interest, I would urge you to convert that to datapoints/second instead as it makes it easier to reason about in my opinion.

 

I will write up some suggested test code and what results I’m getting myself (for you to compare with) in my next post here (hopefully some time tomorrow!) 😊

Userlevel 4
Badge

Time for some actual benchmark code!

It is surprisingly hard to do right, as we use a singleton pattern in the SDK for the thread pool executor that executes all the parallel calls. Hence, in order to resize the number of threads it has available, we need to explicitly re-create it. This probably explains why in your previous tests you saw little to no effect after altering various configuration options.

The code below assumes a function exists that can provide you with a CogniteClient, - replace that with your own. For the tests, I use a previously created time series that has 5 million datapoints randomly distributed in time between 1970 and now.

Note: It also assumes that you are testing performance for raw, (numeric) datapoints i.e. no aggregates. For aggregates, you need to decide how to count as fetching the 10 different aggregates is not that much slower than just fetching a single aggregate.

import time
from timeit import default_timer as timer

import cognite.client.utils._concurrency as conc
from cognite.client.data_classes import Datapoints, DatapointsArray

client = setup_cog_client("forge-sandbox", "greenfield")
ts = client.time_series.retrieve(
external_id="benchmark:9-dense-5mill-rand-dist-#1/1"
)
max_workers = [1, 2, 3, 5, 10]


def benchmark_raw_stats(res, t0, t1, client):
if isinstance(res, (Datapoints, DatapointsArray)):
res = [res]
if any(dps.value is None for dps in res):
raise ValueError("no raw dps found, did you fetch aggregates?")

n_dps = sum(map(len, res))
t = t1 - t0
print(
f"Dps/sec={n_dps/t:.0f}, (across {len(res)} ts) ~t: {t:.4f} "
f"sec, N dps: {n_dps}, max_workers={client.config.max_workers}"
)

for i, mw in enumerate(max_workers, 1):
if i == 1:
# A small warm-up meant for the API to give us peak performance:
client.time_series.data.retrieve_arrays(id=ts.id, limit=1_000_000)
time.sleep(10)

# Overriding max_workers isn't enough, as the SDK reuse the threadpool
# (singleton pattern), so after the first API call has been made, the
# theadpool size is fixed. We delete it to force re-init:
client.config.max_workers = mw
del conc._THREAD_POOL_EXECUTOR_SINGLETON

t0 = timer()
res = client.time_series.data.retrieve_arrays(id=ts.id)
benchmark_raw_stats(res, t0, timer(), client)

# Make rate limiting less likely...
if i != len(max_workers): # skip after last test
time.sleep(15)

So, what kind of performance do I get on my Mac laptop on Wi-Fi (5 GHz)?

Dps/sec=299473, (across 1 ts) ~t: 16.696 sec, N dps: 5000000, max_workers=1
Dps/sec=692511, (across 1 ts) ~t: 7.2201 sec, N dps: 5000000, max_workers=2
Dps/sec=961558, (across 1 ts) ~t: 5.1999 sec, N dps: 5000000, max_workers=3
Dps/sec=1557154,(across 1 ts) ~t: 3.2110 sec, N dps: 5000000, max_workers=5
Dps/sec=2086234,(across 1 ts) ~t: 2.3967 sec, N dps: 5000000, max_workers=10

 

Userlevel 4
Badge

@Neerajkumar Bhatewara @Gargi Bhosale, was Hakon able to answer your queries?  Are there any outstanding questions?

Userlevel 1
Badge

Hi @Håkon V. Treider , @Jason Dressel  and  Anders Daljord Morken,

We tried the above solution for fetching data points and we were able to achieve the expected performance number (around 20 seconds) for 1 max worker.

Can you please explain what you meant by fetching aggregates of numeric data points? Assuming we have around 6000 time-series and half a million data points what should be the max workers to attain a performance of ~20 sec for full fetch?

Userlevel 4
Badge

(...)

Can you please explain what you meant by fetching aggregates of numeric data points? Assuming we have around 6000 time-series and half a million data points what should be the max workers to attain a performance of ~20 sec for full fetch?

You should be able to run basically the same test as above to figure that out.

My previous comment was about how to measure the performance of fetching aggregates: There are a total of 10 different aggregates you may fetch:

  1. average
  2. continuous_variance
  3. count
  4. discrete_variance
  5. interpolation
  6. max
  7. min
  8. step_interpolation
  9. sum
  10. total_variation

→ The time it takes to fetch a single aggregate is about the same as fetching all.

Userlevel 1
Badge

Hello @Håkon V. Treider ,

We tried implementing the given solution in our code as mentioned above. We have an existing multi-threading logic implemented in our code, when we use the del conc._THREAD_POOL_EXECUTOR_SINGLETON with that, we get the performance numbers as expected. But when we remove the multi-threading logic. The performance is not improving.

We are using concurrent.futures.ThreadPoolExecutor for implementing multi-threading in our own code.

Please suggest a way in which we can use the SDK calls alone with your solution, so that we can get the ideal performance.

Userlevel 4
Badge

Hello @Håkon V. Treider ,

We tried implementing the given solution in our code as mentioned above. We have an existing multi-threading logic implemented in our code, when we use the del conc._THREAD_POOL_EXECUTOR_SINGLETON with that, we get the performance numbers as expected. But when we remove the multi-threading logic. The performance is not improving.

We are using concurrent.futures.ThreadPoolExecutor for implementing multi-threading in our own code.

Please suggest a way in which we can use the SDK calls alone with your solution, so that we can get the ideal performance.

Since the SDK already parallelizes its calls for you, I think you should not wrap it inside of another thread pool executor. Maybe you can elaborate on why you use this pattern? If there is a need for this, please share some code and we’ll figure out how to make it as performant as you need together!

Reply