Skip to main content

Hi,

I am working on one of the project where I am using MQTT extractor to inject data into CDF. Kindly advise best optimized stepwise solution to integrate and transform data into CDF.

Note that it is Sparkplug B-formatted MQTT data.

Kindly advise.

Regards,

Amol S Pawar

Hi @Amol Pawar 

If not familiar with it I would recommend starting with the guide: https://docs.cognite.com/de/cdf/integration/guides/extraction/mqtt 

You will find the hosted extractors in your CDF project, start by setting up the extractor with connection to your MQTT broker

To use the hosted MQTT extractor, your MQTT broker must be accessible by CDF (either directly or by opening the firewall for it) 

Once connection to your broker is established you need to define your MQTT job. The detailed documentation for this is on you right side (Setup Guide)

Since you are using Sparkplug B-formatted MQTT data you should not have to define your own message parser, just adding the topic filter (as explained in the documentation) to locate your values in the MQTT message.

Select Sparkplug B as your message format.

The Sink is the credentials to write the data points to CDF 

Once your job is defined and data available on your MQTT broker, your job should start processing data. 

 


Hi @Jan Inge Bergseth,

Thanks for your reply. I have partially configured hosted MQTT extractor as per instruction shown in the link. I need more guidance on below point:-

  1. How to configure sink for receiving data?
  2. How to transform or map this data to source solution model?

Thanks,

Amol S Pawar


  1. The Sink is the connection details to CDF, so you can your own user, or a service user. To test out I recommend testing with your own user as a starting point. Then when moving over to a production setup you need a service account 
  2. To write to da data model you need to create your own parsing that matches the data model - as described here: https://docs.cognite.com/de/cdf/integration/guides/extraction/hosted_extractors/hosted_extractors_custom_mappings#data-models 

 

Alternatively for #2 is to create a parser that writes data to CDF Raw Data base and then use a transformation that writes data to the data model.

The recommendation is to write directly


@Jan Inge Bergseth Thanks for your response. Response is very helpful.

Can you provide any training video/visuals so that it will be helpful to connect dots?

I have gone through documentation but I am unable to connect this information.

Regards,

Amol S Pawar


What I did to get started with the MQTT hosted extractor was to just start with something very simple, that I know would work.

  1. As I did not have a MQTT broker to test with I just signed up for something free at https://www.hivemq.com/
  2. This free MQTT broker you get a URL to connect, using TLS / protocol 5 ++ with username & password to connect.
  3. Then you can start testing out simple messages based on one value only and a topic, ex: 

     

  4. Setting up the Hosted MQTT extractor on the Cognite side with the same topic, ex: 
  5. This should then create a timeseries (in my case called jib) in CDF with a data point with value 11 
  6. Then start posting more advanced messages ( as Json) with supported Message formats out of the box and then potentially test out your own message parser writing directly to a data model - following the guidelines of the documentation for how to do this 

Here you can find a mapping/transform example from a REST endpoint (SAP OData) to CogniteAsset core dm that you can use to map your MQTT message and write it to you view in DM:

 

input.d.results.map(
row => {
"type": "node",
"externalId": row.FunctionalLocation,
"space": "SAP2CDF",
"view": {
"externalId": "CogniteAsset",
"space": "cdf_cdm",
"version": "v1"
},
"properties": {
"parent": {
"externalId": if (row.SuperiorFunctionalLocation != "", row.SuperiorFunctionalLocation, "null"),
"space": "SAP2CDF",
},
"sourceCreatedTime": format_timestamp(
int(
row.CreationDate.replace("/Date(", "").replace("+0000", "").replace(")/", "")
),
"%Y-%m-%dT%H:%M:%S"
),
"sourceUpdatedTime": if( row.LastChangeDateTime is not null,
format_timestamp(
int(
row.LastChangeDateTime.replace("/Date(", "").replace("+0000", "").replace(")/", "")
),
"%Y-%m-%dT%H:%M:%S"),
null
),
"sourceCreatedUser":row.CreatedByUser,
"name":row.FunctionalLocationName,
}
}
)

 


Reply