Skip to main content
Question

Hosted Event Hub Extractor Needs Multiplexing / Ability to Send to Multiple Datasets


Forum|alt.badge.img+3

 

 Within our implementation we have an existing Hosted Extractor reading data from an IoT Hub that contains multiple sites worth of data.

 

Our Hosted Extractor Mapping Template filters for events that have a particular deviceId on it, representative of the location these events are coming from.

 

In effort of ingesting another site-location datafeed, I wanted to extend the template with an ELSE IF condition that has the mapping rules for the other location, which are almost identical to the first except for the target datasets, which I’ve come to realize is set in the Sink section of the Extractor Configuration.

 

The net result here is needing to create redundant Hosted Extractor configurations that change only a filter, rather than having a cascading ELSE IF ruleset that applies to the full stream.

 

For example, this pseudo-template for our existing hosted extractor configuration for one site:

 

if (context.messageAnnotations.`iothub-connection-device-id` == "SITE_A") {
    input.map(record_unpack => { 
        "type": "raw_row",
        "table": "tb_iot_test",
        "database": "db_iot_testing",
        "key": concat("TS_CO_", record_unpack.NAME),
        "deviceId": context.messageAnnotations.`iothub-connection-device-id`,
        "TAG":record_unpack.NAME,
        "IP_INPUT_VALUE":record_unpack.IP_INPUT_VALUE,
        "IP_INPUT_TIME":record_unpack.IP_INPUT_TIME,
        "IP_INPUT_QUALITY":record_unpack.IP_INPUT_QUALITY
    }).filter(item => item.IP_INPUT_QUALITY == "Good").flatmap( item => [ {
        "type": "time_series",
        "name": item.TAG,
        "externalId": item.key,
        "metadata": {
            "IP21_DEVICE_ID": item.deviceId
        },
        "isString": false,
        "isStep": false
        },
        { "type": "datapoint",
            "externalId": item.key,
            "value": try_float(trim_whitespace(item.IP_INPUT_VALUE), null),
            "timestamp": to_unix_timestamp(item.IP_INPUT_TIME,"%Y-%m-%dT%H:%M:%S.%6fZ"),
        } ] ) }
    else { [] }

 

Ideally I want to add an ELSE IF condition where the only change to the outcome is the iothub-connection-device-id equals SITE_B in this case.

 

While the record_unpack step is able to route different locations to different staging tables, I was unable to find a means to specify an alternative target to route the time_series and datapoint items in the outside of creating a whole new configuration and having a different Sink setting for this extractor.

 

 

We have multiple sites all sharing one IoT Hub.

 

IoT Hub has a limit to the number of consumers that can be connected, in this case larger than our number of sites.

 

We handle routing in the Consumer logic of this stream however Cognite appears not to provide us with this mechanism in the templating language, instead specifying it in the sink.

 

 

This presents a configuration-limit issue for us with respect to our source data, and the constraints presented by the Cognite Hosted Extractor with specifying the target.

 

 

Am I missing something and is there a way to achieve what we’re looking for, or will this require a secondary hosted extractor configuration to be implemented?

 

If we must go down the path of redundant hosted extractors with modified logic and a different Sink, we’re going to hit an Azure IoT limit prior to our full scale out, and would like to understand if this is a feature that can be provided, or if we should be planning otherwise.

2 replies

Forum|alt.badge.img

Hi,

As the Product Manager for this space, I’m decidedly _not_ an authority on mapping statements for our hosted extractors!

I ran a couple of aspects of you post by one of our lead engineers and he suggested omitting the wrapper IF statement, and instead using a  case() statement for the dataSetId property to select the correct data set ID based on your iothub-cnnection-device-id information. I.e:

"dataSetId": case(context.messageAnnotations.`iothub-cnnection-device-id`, "SITE_A", <dataset_id_a>, "SITE_B", <dataset_id_a>, "SITE_C", <dataset_id_a>, null)

 


Forum|alt.badge.img+3
  • Author
  • Seasoned
  • 12 replies
  • March 11, 2025
Thomas Sjølshagen wrote:

Hi,

As the Product Manager for this space, I’m decidedly _not_ an authority on mapping statements for our hosted extractors!

I ran a couple of aspects of you post by one of our lead engineers and he suggested omitting the wrapper IF statement, and instead using a  case() statement for the dataSetId property to select the correct data set ID based on your iothub-cnnection-device-id information. I.e:

"dataSetId": case(context.messageAnnotations.`iothub-cnnection-device-id`, "SITE_A", <dataset_id_a>, "SITE_B", <dataset_id_a>, "SITE_C", <dataset_id_a>, null)

 

Hi Thomas,

Thanks for your reply.

I might be missing something, but where are we setting the dataset_id in this mapping?  We set the database for the raw records, but the dataset itself is set in the Sink of the configration, not in the mapping, isn’t it?


Cookie Policy

We use cookies to enhance and personalize your experience. If you accept you agree to our full cookie policy. Learn more about our cookies.

 
Cookie Settings