This guide will take you through the steps of setting up a data streaming job on the MQTT ingestion service using the Pluto API.
We will throughout the guide provide paths and bodies for the HTTP requests you will make, but we also have an OpenAPI spec you can download and import to your tool of choice (for example Postman) to ease writing these requests.
Core concepts
There are three main object types when using the Pluto service:
-
A source, which models a source of data outside of CDF. This will represent your MQTT broker. The source holds data such as the hostname (or adress) of your broker, credentials for authentication, etc.
-
A destination, which tells Pluto where to put the data output - for example CDF events or data points in a CDF time series. The destination also holds the credentials used when writing to CDF, which is going to be a CDF authentication session (more on that later).
-
A job, which describes the actual job for Pluto to do. It links a source to a destination. It tells pluto which topic filter on the source to subscribe to, and what format to expect that data on.
Setting up your first Pluto job
Before we start setting up the job, we need to make sure we can access the Pluto API.
When we write something like {{baseUrl}}/api/v1/projects/{{project}}/pluto/sources/, replace {{baseUrl}} and {{project}} with the url and project name for your CDF project - for example https://westeurope-1.cognitedata.com and my-company.
For every HTTP request you make, you need to set the cdf-version header to alpha for the Pluto API to work.
You also need to set the Authorization header to a valid token for your project, and the token needs the experimentalAcl capability with the streamIngestionApiExperiment scope. A CDF permission group with the correct capability can be set up by calling the create groups endpoint with the following body:
{
"items": [
{
"name": "Pluto access",
"sourceId": "b7c9a5a4-99c2-4785-bed3-5e6ad9a78603",
"capabilities": [
{
"experimentAcl": {
"actions": [
"USE"
],
"scope": {
"experimentscope": {
"experiments": [
"streamIngestionApiExperiment"
]
}
}
}
}
]
}
]
}where sourceId is the id of the group in your AAD tenant where group membership is defined.
Connecting to the broker
First, we will configure the source. To do this, we will send a POST request to {{baseUrl}}/api/v1/projects/{{project}}/pluto/sources/ with the following body:
{
"items": [
{
"externalId": "my-broker",
"type": "mqtt5",
"username": "my-username",
"password": "secret_p@s$word123",
"host": "mqtt-broker.company.com",
"port": "1883"
}
]
}where
externalIdis a unique identifier you give to your sourcetypeis the mqtt version to use (eithermqtt5ormqtt3)username,passwordandhostare the credentials and location for your brokerportis the port your broker is running on, typically1883or8883.
The API should respond with a response similar to your create request, but with a created and last updated timestamp (and without the password).
Creating a destination
Before making the Pluto destination, we need to make some credentials for it to use. To do this, we will use CDF Sessions.
Create a session by sending a POST request to {{baseUrl}}/api/v1/projects/{{project}}/sessions with the following body:
{
"items": [
{
"clientId": "string",
"clientSecret": "string"
}
]
}replace with the client credentials you which to use.
The API responds with
{
"items": [
{
"id": 0,
"type": "CLIENT_CREDENTIALS",
"status": "READY",
"nonce": "string",
"clientId": "string"
}
]
}And this nonce is the secret we will need to provide to Pluto. Be aware that these nonces are short-lived so we have to move fast. We recommend having the next request ready to go before creating the session, just to be sure.
Now, let's create the Pluto destination. In our example, we will be making datapoints, sot we will set the type to datapoints. Send a POST request to {{baseUrl}}/api/v1/projects/{{project}}/pluto/destinations/ with the following body:
{
"items": [
{
"externalId": "my-datapoints-destination",
"type": "datapoints",
"credentials": {
"nonce": "<string>"
}
}
]
}
Create the job
Now, let's tell Pluto to subscribe to data. To create a job, send a POST to {{baseUrl}}/api/v1/projects/{{project}}/pluto/jobs/ with the body:
{
"items": [
{
"sourceId": "my-broker",
"destinationId": "my-datapoints-destination",
"externalId": "my-job-1",
"format": {
"type": "rockwell"
},
"topicFilter": "/plant/machine1/temperatures/#"
}
]
}where sourceId and destinationId is the externalId you gave to your source and destination, respectively. externalId is a unique id given to your job. The format/type is the schema for the data we are expecting. The current options are rockwell (for data coming from Rockwell Factory Talk) and cognite (data following the syntax of our public APIs. This format is also not regarded as stable yet, and the expected schema might change).
The topicFilter is the topic filter Pluto will subscribe to on the broker, and can contain wildcards. Be aware that a single job runs synchronously, so a topic filter should not be too wide as it might lead to a single job being overwhelmed by too much data. Splitting up into several jobs gives Pluto a chance to balance the load. (In other words, subscribing to something like /# might be a bad idea depending on the data rate of your setup).
Further actions
Here we will go through a few other things that might be useful:
Pausing and resuming jobs
Pausing and resuming can be achieved by changing the targetStatus of a job, send a POST to {{baseUrl}}/api/v1/projects/{{project}}/pluto/jobs/update with
{
"items": [
{
"externalId": "my-job-1",
"update": {
"targetStatus": {
"set": "paused"
}
}
}
]
}to pause, and with "set": "running" to resume.
Listing resources
To list resources, send a GET to
{{baseUrl}}/api/v1/projects/{{project}}/pluto/sources/{{baseUrl}}/api/v1/projects/{{project}}/pluto/jobs/{{baseUrl}}/api/v1/projects/{{project}}/pluto/destinations/
Deleting
To delete resources, send a POST to
{{baseUrl}}/api/v1/projects/{{project}}/pluto/sources/delete{{baseUrl}}/api/v1/projects/{{project}}/pluto/jobs/delete{{baseUrl}}/api/v1/projects/{{project}}/pluto/destinations/delete
with body
{
"items": [
{
"externalId": "<string>"
},
{
"externalId": "<string>"
}
],
"ignoreUnknownIds": "<boolean>"
}You can’t delete resources that are used by other resources. For example, if a source is used by a job you need to delete the job first and the source second.
Checking the logs for a job
To list logs from Pluto, send a GET to {{baseUrl}}/api/v1/projects/{{project}}/pluto/jobs/logs?job=my-job-1
The ?job=my-job-1 is optional, and lets you filter logs based on the externalId of the job you want to see logs for.
Pluto will not log anything else than an “OK” message if everything is running fine, so don’t worry if you’re not seeing much in the logs - that’s a good sign! Instead inspect that data is actually flowing in to your configured destinations. We are also adding API endpoints exposing a set of metrics soon.
If your job fails to start or crashes for some reason, you will see a message in the logs with a description of the error.
Check the
documentation
Ask the
Community
Take a look
at
Academy
Cognite
Status
Page
Contact
Cognite Support