Skip to main content

Welcome to the Time Series Subscriptions community

  • August 22, 2023
  • 14 replies
  • 325 views

Glen Sykes
Seasoned Practitioner

Welcome to the Time Series Subscriptions community group for Early Adopters!

Time series subscriptions are aimed at data engineers who need to monitor changes to Time Series datapoints / write only the updated datapoints to their application storage etc.  This feature negates the need for consumers to download an entire time series again, just to get the changes.  The feature is now available in public beta (including Python SDK support), and customers are encouraged to try it out.

This group is staffed by the product team and we are available for Q&A, providing support and receiving feedback on the feature.

To get started, please take a look at the user guide, the API specification.

We look forward to supporting you!

14 replies

  • New
  • October 10, 2023

If I want to read timeseries datapoints outside CDF, is using subscription the best way to do it? I feel the subscription is mostly used for capturing datapoints changes instead of retrieving datapoints. Let me know. 

If using subscription is better, could you show me both C# examples and Python examples?

Thank you


Glen Sykes
Seasoned Practitioner
  • Author
  • Seasoned Practitioner
  • October 11, 2023

Hi @johnpiger 

Subscriptions only retrieve changes to data points from the time after the subscription was created.  So if you’re looking to get all the datapoints, and then ensure your system remains updated with the changes, then I’d recommend two steps. 

Firstly, create the subscription to your time series, and secondly, using either the list or retrieve API endpoint, get all the data points you want that were created prior to the subscription.

I cannot offer you any sample C# code, but the Python SDK has been updated to support the Subscriptions API already.  Here’s a link to the changelog https://github.com/cognitedata/cognite-sdk-python/blob/master/CHANGELOG.md#added-24

 


Hi Glen,

First, thank you for this new api!

I am writing to seek assistance regarding a recurring issue. I am facing while working with this new API. I have been encountering frequent timeout errors (cognite.client.exceptions.CogniteReadTimeout) that are hindering my ability to efficiently utilize the service. I use the same code as documentation example :
 

for batch in c.time_series.subscriptions.iterate_data("my_subscription", "Now"):
print(f"Added {len(batch.subscription_changes.added)} timeseries")
print(f"Removed {len(batch.subscription_changes.removed)} timeseries")
print(f"Changed timeseries data in {len(batch.updates)} updates")
if not batch.has_next:
time.sleep(1)

 

Thank you for your time and support. I am looking forward to your prompt response. If there are any further details or information required from my end, please let me know, and I will provide them promptly.-

Best regards,

Oussama

 


I also wanted to know what the conditions are for this error to occur:  requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))


Glen Sykes
Seasoned Practitioner
  • Author
  • Seasoned Practitioner
  • November 27, 2023

Thanks Oussama, one of our developers will be along soon to help out!


  • Practitioner
  • November 27, 2023

Hi Oussama!

 

We will look into this issue, but we might need some more info to speed up the investigation.

Could you share some more details on your use of subscriptions?

In which CDF project is this happening?

What proportion of calls are timing out?

Is this the only subscription on your project, or do you have many?

Are you using the default timeout of 30 seconds?

How much data is typically present in each response?

 

Best regards,

Lars Hagen


Thank you Lars, I sent you a private message !


Forum|alt.badge.img+1

@Lars Hagen 

I work with Celanese and we are trying to work on creating Power BI dashboards for streaming Data (Timeseries).  I am following below solution for doing the same. 


I am facing same “timeout” error while creating Subscriptions. I am using below code. 
my_filter = f.Prefix(
    property=p.external_id,
    value="EVE"
)
sub = DataPointSubscriptionCreate(
    external_id="powerbi-subscription-test",
    name="PowerBI Subscription Test",
    description="Subscription for PowerBI",
    partition_count=1,
    filter=my_filter
)
client.time_series.subscriptions.create(sub)


I see this post is almost 2 years old and nobody else has mentioned the issue I’m seeing so I assume I’m missing something, but whenever I try to call iterate_data() on a subscription, it returns nothing.

 

Subscription created about a week ago, with 2 time-series members added at this time. I’ve confirmed the subscription exists and contains the 2 time-series IDs as linked members.
Using the Python SDK, I’ve tried init the “start” variable with “1d-ago” and leaving it empty, but no matter how I call this endpoint, it always returns an empty data set. There is at least 1 datapoint per hour for both of these time-series.

 

for batch in cognite_client.time_series.subscriptions.iterate_data(external_id="MySubscription", start="1d-ago"):
# `updates` is always empty
print("# Updates: {len(batch.updates)}")

# `has_next` is always false
if not batch.has_next:
break

 

I’ve left a test polling this for a number of hours, and nothing ever comes through despite new data hitting the cloud and showing up in DataExplorer on the Cognite web platform.

Any clues what likely obvious thing I’m missing?

 

PS: None of the links in the original post work anymore.


Sigurd  Holsen
Practitioner
  • Practitioner
  • February 18, 2026

Hi!

I tested your example code with a toy subscribtion, and it worked for me. Just had to add an `f` string prefix to print out the actual number of updates.

Are you sure the subscription contains the expected time series?

If you run the following snippet, it should print the members of your subscription.

cognite_client.time_series.subscriptions.list_member_time_series(
external_id="MySubscription",
)

 


Hi ​@Sigurd Holsen,

Your suggested check is what I meant by:

I’ve confirmed the subscription exists and contains the 2 time-series IDs as linked members.

 

However just for absolute clarity, a notebook snippet of:

subbed_ts = cognite_client.time_series.subscriptions.list_member_time_series(external_id = SUB_NAME)
[ts.id for ts in subbed_ts]

Returns:

[5795765301272620, 7303489502581795]

 

Good to know that the toy example is working for you though (with the missing “f” formatter).


Sigurd  Holsen
Practitioner
  • Practitioner
  • February 19, 2026

Ah, sorry, I read that a bit fast, but good to have confirmation.

Could you create a new subscription with the same time series and check if that becomes empty as well?


Here’s the wider snippet I wrote which creates the subscription if it doesn’t exist. For the sake of this test I’ve just updates the SUB_NAME.

SUB_NAME = "MySubscription2"

sub = cognite_client.time_series.subscriptions.retrieve(SUB_NAME)
if sub is None:
sub = DataPointSubscriptionWrite(
external_id = SUB_NAME,
name = "Subscribe to power factor time-series and create events on low PF",
partition_count = 10,
time_series_ids = sub_to_ext_ids
)
print("Subscription created")
sub = cognite_client.time_series.subscriptions.create(sub)
else:
print("Subscription fetched")


subbed_ts = cognite_client.time_series.subscriptions.list_member_time_series(external_id = SUB_NAME)
[ts.id for ts in subbed_ts]

Yields:

Subscription created
[5795765301272620, 7303489502581795]

 

But I’m afraid I’m still seeing no data when attempting to fetch with:

batch_num = 0
for batch in cognite_client.time_series.subscriptions.iterate_data(
external_id=SUB_NAME,
start="1d-ago"
):
batch_num += 1

if len(batch.updates) == 0:
print(f"No updates in batch {batch_num}")

for update in updates:
# Analyze changes in timeseries
...

if not batch.has_next:
break

This still yields: “No updates in batch 1”

 

EDIT: After your first message where you confirmed it was working for you, I did go to double check the application permissions, I can confirm they look correct and contain read/write for both TimeSeries and TimeSeriesSubscriptions. (if you saw my previous edit please ignore, I found the role that was already set and giving me access).


Sigurd  Holsen
Practitioner
  • Practitioner
  • February 19, 2026

I think the issue is with partitions. You create the subscription with 10 partitions, but you don’t specify the partition in iterate_data, so it defaults to partition 0.

The API allows you to specify multiple partitions in a single API call, giving you multiple cursors back, but for the SDK you need multiple calls to iterate_data.

Just to check if this is indeed the issue, could you try this:

batch_num = 0
for partition in range(10):
print("Fetching from partition", partition)
for batch in cognite_client.time_series.subscriptions.iterate_data(
external_id=SUB_NAME,
start="1d-ago",
partition=partition,
):
batch_num += 1

if len(batch.updates) == 0:
print(f"No updates in batch {batch_num}")

for update in updates:
# Analyze changes in timeseries
...

if not batch.has_next:
break