This write up describes a basic set up of writing data points to Cognite Data Fusion using Apache NiFi. The source data is power consumption readings from the HAN interface of a power meter in a residential fuse box:
The starting point here is an existing MQTT broker that receives data from an existing MQTT client device. We will use Apache NiFi to consume the MQTT messages, extract the readings and write continuously to CDF.
The end goal is a live updated time series within CDF with power readings every 2.5 seconds that can be used for analysis or automation within CDF or simply visualization:
NiFi flow overview
The picture above shows the complete NiFi flow. From left to right, consuming MQTT messages, transforming and writing to Cognite Data Fusion. Approximately 120 data points per five minutes, which corresponds to the power meter outputting one reading every 2.5 second.
The NiFi flow makes use of the following Processors:
Connects to the MQTT broker and subscribes to the topic used for the readings. Successfully parsed messages are passed on, one message per flow file.
Passes the incoming flow file to stdin and calls a supplied command.
The command is a purpose-built small command line program that returns a json string containing the data points that will be written to CDF:
Forms a POST request passing the json string as the body to Cognite’s endpoint for inserting data points.
Controller service (and therefore not visible in the screen picture)
Performs authentication against Azure AD by client credentials flow using a supplied client id and secret.
Provides an access token to processors that needs access to CDF.
The service principal is a member of security group with required capabilities as described in Cognite’s documentation
The Cognite endpoint for inserting data points accepts up to 10,000 data points per request, while NiFi is now writing one data point per request. NiFi can accumulate flow files and bigger requests executed less frequently if some amount of latency is acceptable.
An additional InvokeHTTP processor should be setup to report running status to Cognite Extraction pipelines to provide email notifications should the incoming data flow stop