Hello,
Can anyone checkout this ?
Hello,
When it comes to setting the `max-workers` for an SDK, you have a couple of options:
Global Configuration: You can set global configurations that will apply to any instance of `CogniteClient` that is created after the global configurations have been set. Here's how you can set the `GlobalConfig`:
from cognite.client import global_config, ClientConfig
from cognite.client.credentials import Token
global_config.default_client_config = ClientConfig(
client_name="my-client", project="myproj", credentials=Token("verysecret")
)
global_config.max_workers = <desired_value> # Set the desired number for max_workers
Remember, any configurations set in the `global_config` should be made prior to instantiating a `CogniteClient` to ensure they take effect.
Directly in the CogniteClient Constructor: The documentation suggests that you can also pass a `ClientConfig` directly when you create a new `CogniteClient`. So, if you only want to set the `max_workers` for a specific instance, you can do so as:
config = ClientConfig(max_workers=<desired_value>, ...<other_params>)
client = CogniteClient(config)
To address your other questions:
- If you do not set the `GlobalConfig`, then yes, the SDK would utilize the default values for workers, retries, and other parameters.
- As for passing the `GlobalConfig` to `ClientConfig`, you don't need to pass it directly. Instead, the `ClientConfig` is set as an attribute of `global_config` (as shown in the example above).
Lastly, if you're working with multiple `CogniteClient` instances and making many concurrent requests, consider increasing the `max_connection_pool_size` in the `global_config` to efficiently reuse connections.
I hope this helps!
Let me know if you have any more questions or need further clarification.
Hi @Neerajkumar Bhatewara, did the above help you?
Hello,
We tried the approaches suggested and found out that there is not much difference in performance for fetching data from CDF.
For capturing the numbers, we used the following data model configuration:
- Project: slb-test
- Model: QA-PERF-DATA (QA_SPACE)
- Version: 1
- View: TimeseriesData
- Properties: externalId, timeseriesId, propertyName, entity, frequency
- Time Range: 15-11-2023 00:00 to 17-11-2023 23:59
- Filters:
- propertyName: TARGET-WATER-INJECTION-RATE , YEP-GAS-PRODUCTION-RATE, METER-OPENING-GAS-VOLUME ,UC-CONTRACTOR ,SAND-PRODUCTION ,UNIT-INJECTION-COST ,WATER-PRODUCTION-VOLUME,GAS-SALES-VOLUME, GAS-PRODUCTION-RATE,DIFFERENTIAL-PRESSURE-HOLDING-TRIGGERED-VALUE
- frequency: 15M
The time taken by the code which uses API call for fetching data from CDF is 25.945018529891968 seconds
Details about both the approaches tried and their results are as follows:
Approach 1: Setting max_workers in Global Configuration
Code change:
global_config.default_client_config = ClientConfig( credentials=OAuthClientCredentials(token_url=token_url, client_id=client_id,client_secret=client_secret, scopes=scopes), project=cognite_project, base_url=f"https://{cognite_cluster}.cognitedata.com", client_name=client_name, debug=False, ) global_config.max_workers = 1 #client_config.max_workers=1 global_config.max_connection_pool_size= 10
return global_config.default_client_config
Here we return the default_client_config object to the python files and each of them generates client object by passing it to CogniteClient
The performance numbers captured are:
max connection pool size | max workers | time to fetch data (in seconds) |
10 | 10 | 185.9922695 |
10 | 16 | 219.8081455 |
10 | 32 | 196.185699 |
10 | 1 | 250.9608867 |
32 | 10 | 166.0820322 |
32 | 16 | 188.2735438 |
32 | 32 | 162.2813222 |
Approach 2: Setting max_workers directly in the ClientConfig:
Code change:
client_config = ClientConfig( credentials=OAuthClientCredentials(token_url=token_url, client_id=client_id,client_secret=client_secret, scopes=scopes), project=cognite_project, base_url=f"https://{cognite_cluster}.cognitedata.com", client_name=client_name, debug=False, )
client_config.max_workers=10 global_config.max_connection_pool_size= 32
return client_config
Here we return the client_config object to the python files and each of them generates client object by passing it to CogniteClient
Performance numbers are:
max connection pool size | max workers | time to fetch data (in seconds) |
10 | 10 | 170.77724 |
10 | 16 | 221.9572637 |
10 | 32 | 201.0603657 |
10 | 1 | 178.7179568 |
32 | 10 | 240.3109298 |
32 | 16 | 203.6003859 |
32 | 32 | 211.1403549 |
So, we can see that there is no effect of increasing the max_workers on the time duration of fetching the data.
Could you please let us know how things could be done differently to get the job done in a lesser amount of time?
Hi @Gargi Bhosale, could you share the exact fetching code (logic) you used to perform the test (besides setting the ClientConfig)?
Hi @Everton Colling sure. Sharing the code for fetching data.
if aggregates:
dps = client.time_series.data.retrieve(external_id=external_id, start=int(startParam), end=int(endParam),
aggregates=aggregates, granularity=granularity,limit=limit)
elif is_latestvalue:
dps = client.time_series.data.retrieve_latest(external_id=external_id)
else:
dps = client.time_series.data.retrieve(external_id=external_id, start=int(startParam), end=int(endParam),limit=limit)
Thanks for sharing your benchmark code. I have a few comments that may help shed some light on performance.
- The default number of max workers in the SDK, 10, is a quite high value already. For comparison, other resource types (less performant than datapoints), the API will ignore a higher partitioning count than 10. I would suggest you do performance testing with 1, 2, 5, and 10 perhaps? ...and I don’t think you’ll need to touch the connection pool size at all.
- Whenever you are calling the API, you have a “burst budget” and a “sustained budget”; these are quite different. Btw, this is referred to as rate limiting, and is a vital component in protection of the API. After the initial burst budget is spent, the API will force you to slow down by replying with status code 429. Because these are automatically retried by the SDK after a self-imposed timeout (you may want to read up on the exponential backoff strategy with smearing which we use), you won’t notice this except from a lower throughput speed.
- When performance testing datapoints, you should thus add quite significant sleeps between test runs, or run with different credentials, since the rate limiting will for sure mess up your results and/or make them non-deterministic.
- Datapoints from rarely retrieved time series are put into cold storage (slower) and takes more time (initially) to retrieve. Thus, if you do two tests and compare the results, the first query might fetch from a physical disk, while the next is retrieved straight from memory. This may very likely give a false impression that the first were much slower than the second, although if you repeat the tests another day in reversed order, the effect is exactly the opposite.
- In your benchmark code I see that you pass the limit argument. In case you pass a non-unlimited argument, do know that the SDK is unable to parallelise as efficiently as for unlimited queries. I’ve added a small performance guide in the SDK documentation awhile back that you may want to check out: https://cognite-sdk-python.readthedocs-hosted.com/en/latest/core_data_model.html#retrieve-datapoints . I’ll also add the main points here:
-
For best speed, and significantly lower memory usage, consider using retrieve_arrays(...)
which uses numpy.ndarrays
for data storage.
-
Only unlimited queries with (limit=None
) are always fetched in parallel* so specifying a large finite limit
like 1 million, comes with severe performance penalty as data is fetched serially.
*For limited queries, concurrency is still used when datapoints from multiple time series are requested, but each is still fetch serially.
-
Try to avoid specifying start and end to be very far from the actual data: If you have data from 2000 to 2015, don’t set start=0 (1970).
- Although comparing the time taken for each query is of interest, I would urge you to convert that to datapoints/second instead as it makes it easier to reason about in my opinion.
I will write up some suggested test code and what results I’m getting myself (for you to compare with) in my next post here (hopefully some time tomorrow!)
Time for some actual benchmark code!
It is surprisingly hard to do right, as we use a singleton pattern in the SDK for the thread pool executor that executes all the parallel calls. Hence, in order to resize the number of threads it has available, we need to explicitly re-create it. This probably explains why in your previous tests you saw little to no effect after altering various configuration options.
The code below assumes a function exists that can provide you with a CogniteClient
, - replace that with your own. For the tests, I use a previously created time series that has 5 million datapoints randomly distributed in time between 1970 and now.
Note: It also assumes that you are testing performance for raw, (numeric) datapoints i.e. no aggregates. For aggregates, you need to decide how to count as fetching the 10 different aggregates is not that much slower than just fetching a single aggregate.
import time
from timeit import default_timer as timer
import cognite.client.utils._concurrency as conc
from cognite.client.data_classes import Datapoints, DatapointsArray
client = setup_cog_client("forge-sandbox", "greenfield")
ts = client.time_series.retrieve(
external_id="benchmark:9-dense-5mill-rand-dist-#1/1"
)
max_workers = 1, 2, 3, 5, 10]
def benchmark_raw_stats(res, t0, t1, client):
if isinstance(res, (Datapoints, DatapointsArray)):
res = res]
if any(dps.value is None for dps in res):
raise ValueError("no raw dps found, did you fetch aggregates?")
n_dps = sum(map(len, res))
t = t1 - t0
print(
f"Dps/sec={n_dps/t:.0f}, (across {len(res)} ts) ~t: {t:.4f} "
f"sec, N dps: {n_dps}, max_workers={client.config.max_workers}"
)
for i, mw in enumerate(max_workers, 1):
if i == 1:
# A small warm-up meant for the API to give us peak performance:
client.time_series.data.retrieve_arrays(id=ts.id, limit=1_000_000)
time.sleep(10)
# Overriding max_workers isn't enough, as the SDK reuse the threadpool
# (singleton pattern), so after the first API call has been made, the
# theadpool size is fixed. We delete it to force re-init:
client.config.max_workers = mw
del conc._THREAD_POOL_EXECUTOR_SINGLETON
t0 = timer()
res = client.time_series.data.retrieve_arrays(id=ts.id)
benchmark_raw_stats(res, t0, timer(), client)
# Make rate limiting less likely...
if i != len(max_workers): # skip after last test
time.sleep(15)
So, what kind of performance do I get on my Mac laptop on Wi-Fi (5 GHz)?
Dps/sec=299473, (across 1 ts) ~t: 16.696 sec, N dps: 5000000, max_workers=1
Dps/sec=692511, (across 1 ts) ~t: 7.2201 sec, N dps: 5000000, max_workers=2
Dps/sec=961558, (across 1 ts) ~t: 5.1999 sec, N dps: 5000000, max_workers=3
Dps/sec=1557154,(across 1 ts) ~t: 3.2110 sec, N dps: 5000000, max_workers=5
Dps/sec=2086234,(across 1 ts) ~t: 2.3967 sec, N dps: 5000000, max_workers=10
@Neerajkumar Bhatewara @Gargi Bhosale, was Hakon able to answer your queries? Are there any outstanding questions?
Hi @Håkon V. Treider , @Jason Dressel and Anders Daljord Morken,
We tried the above solution for fetching data points and we were able to achieve the expected performance number (around 20 seconds) for 1 max worker.
Can you please explain what you meant by fetching aggregates of numeric data points? Assuming we have around 6000 time-series and half a million data points what should be the max workers to attain a performance of ~20 sec for full fetch?
(...)
Can you please explain what you meant by fetching aggregates of numeric data points? Assuming we have around 6000 time-series and half a million data points what should be the max workers to attain a performance of ~20 sec for full fetch?
You should be able to run basically the same test as above to figure that out.
My previous comment was about how to measure the performance of fetching aggregates: There are a total of 10 different aggregates you may fetch:
- average
- continuous_variance
- count
- discrete_variance
- interpolation
- max
- min
- step_interpolation
- sum
- total_variation
→ The time it takes to fetch a single aggregate is about the same as fetching all.
Hello @Håkon V. Treider ,
We tried implementing the given solution in our code as mentioned above. We have an existing multi-threading logic implemented in our code, when we use the del conc._THREAD_POOL_EXECUTOR_SINGLETON with that, we get the performance numbers as expected. But when we remove the multi-threading logic. The performance is not improving.
We are using concurrent.futures.ThreadPoolExecutor for implementing multi-threading in our own code.
Please suggest a way in which we can use the SDK calls alone with your solution, so that we can get the ideal performance.
Hello @Håkon V. Treider ,
We tried implementing the given solution in our code as mentioned above. We have an existing multi-threading logic implemented in our code, when we use the del conc._THREAD_POOL_EXECUTOR_SINGLETON with that, we get the performance numbers as expected. But when we remove the multi-threading logic. The performance is not improving.
We are using concurrent.futures.ThreadPoolExecutor for implementing multi-threading in our own code.
Please suggest a way in which we can use the SDK calls alone with your solution, so that we can get the ideal performance.
Since the SDK already parallelizes its calls for you, I think you should not wrap it inside of another thread pool executor. Maybe you can elaborate on why you use this pattern? If there is a need for this, please share some code and we’ll figure out how to make it as performant as you need together!