Solved

How to run Remote type extractor and give the configuration in extraction pipeline.

  • 25 July 2023
  • 4 replies
  • 76 views

Userlevel 1

I want to pass the configuration and queries from the extractor pipeline to extractor (Cognite DB Extractor) which is I am running from my local but not able to start the extractor.

Currently referring this doc: Configure extractors remotely | Cognite Documentation

but there is no sample for configuration file.

or is there is way for passing queries for data extraction through extraction pipeline?

icon

Best answer by Kanchana Premachandra 27 July 2023, 09:14

View original

4 replies

Userlevel 1
Badge

Hi @Amit Rane ,

Are you already using a configuration similar to below? :

 

# List of databases
databases:
-
type: odbc
# User-given name for DB
name: my-postgres
# Connection string
connection-string: "driver={PostgreSQL Unicode};SERVER=127.0.0.1;DATABASE=testdb;PORT=5432;UID=testuser;PWD=testpassword"

# (Optional) Timeout for connections/queries. Be aware that not all ODBC
# drivers fully supports timeouts, the extractor will log warnings if the
# timeout could not be set properly. Set to 0 to disable timeouts.
#timeout: 60

-
type: odbc
# User-given name for DB
name: oracle-db
# Connection string
connection-string: "DRIVER={Oracle 19.3};DBQ=localhost:1521/XE;UID=SYSTEM;PWD=oracle"

# (Optional) Timeout for connections/queries. Be aware that not all ODBC
# drivers fully supports timeouts, the extractor will log warnings if the
# timeout could not be set properly. Set to 0 to disable timeouts.
#timeout: 60

-
type: odbc
# User-given name for DB
name: dsn-postgres
# Connection string
connection-string: "DSN={MyPostgresDsn}"

# (Optional) Timeout for connections/queries. Be aware that not all ODBC
# drivers fully supports timeouts, the extractor will log warnings if the
# timeout could not be set properly. Set to 0 to disable timeouts.
#timeout: 60

# List of queries
queries:
-
# RAW example

# User-given name for query
name: test-my-postgres
# Name of database to use (as specified in the databases section)
database: my-postgres

# Query to execute. Supports interpolation with {incremental_field} and
# {start_at} to facilitate incremental load between runs
query: >
SELECT
*
FROM
mytable
WHERE
{incremental_field} >= '{start_at}'
ORDER BY
{incremental_field} ASC


 

Userlevel 1

Hi @Kanchana Premachandra 

Yes, I am using the configuration similar to above config.

for reference:

extractor:
# (Optional) Number of rows to fetch from sources before uploading
  upload-queue-size: 10000
# (Optional) Where to store extraction states (progress) between runs.
# Required for incremental load to work.
  state-store:
# Uncomment to use a local json file for state storage
  #local:
    #path: C:\Users\ARane3\Desktop\odf\ingestion\cognite-db-extractor\state.json
# (Optional) Save interval (in seconds) for intermediate file saves.
# A final save will also be made on extractor shutdown.
    #save-interval: 30
# Uncomment to use a RAW table for state storage
    raw:
# RAW database and table to use
      database: "state"
      table: "state:table"
# (Optional) Upload interval (pin seconds) for intermediate uploads.
# A final upload will also be made on extractor shutdown.
      upload-interval: 300

  #upload-queue-size: 2

databases:
 -
  name: CDF_SOURCE_DATA
  connection-string: "DSN=dsn;Uid=uid;Pwd=pwd" 
  timeout: 0
queries:

 -
  name: WELL_READ_STRING
# Name of database to use (as specified in the databases section)
  database: CDF_SOURCE_DATA # same as above database

# Query to execute. Supports interpolation with {incremental_field}
# and{start_at} to facilitate incremental load between runs
  query: SELECT distinct CONCAT (ITEM_ID, ':', ITEM_TYPE, ':', PROPERTY COLLATE DATABASE_DEFAULT) AS externalId, START_DATETIME AS timestamp, PROPERTY_VALUE AS value FROM
        (SELECT T1.ITEM_ID, START_DATETIME, LAST_UPDT_DATE, T1.ITEM_TYPE, ITEM_NAME
        FROM VT_WELL_READ_en_US as T1 inner JOIN
        (select I_P.ITEM_ID, I_L.FROM_ITEM_ID, I_L.TO_ITEM_ID, I_P.ITEM_TYPE AS ITEM_TYPE, I_P.PROPERTY_STRING AS NAME
        FROM ITEM_PROPERTY AS I_P
        inner join ITEM_LINK AS I_L on I_P.ITEM_ID =I_L.FROM_ITEM_ID  WHERE I_L.LINK_TYPE IN ('BATTERY_ITEM', 'ORG_ITEM')) as t2
        on T1.ITEM_ID=t2.ITEM_ID WHERE ITEM_NAME IS NOT NULL) AS WELL_PROP_DATA
        UNPIVOT
        (PROPERTY_VALUE FOR PROPERTY IN (ITEM_NAME))
        AS UNPIVOT_PROPERTY

  destination-type: TIME_SERIES
  incremental-field: timestamp
  schedule: "*/1 * * * *"
  initial-start: '1900-01-01'

 

 

and extractor as below:

 

version: 2
type: remote
cognite:
 host: "https://westeurope-1.cognitedata.com"
 project: ${project}
 # (Either this or api-key is required) OIDC setup for CDF auth
 idp-authentication:
# OIDC client ID
  client-id: ${COGNITE_CLIENT_ID}
# OIDC client secret
  secret: ${COGNITE_CLIENT_SECRET}
# URL to fetch OIDC tokens from
  token-url: ${token_url}
# List of OIDC scopes to request
  scopes:
    #- "https://api.cognitedata.com/.default"
    - "https://westeurope-1.cognitedata.com/.default"
 extraction-pipeline:
   external-id: extractor-pipeline-external-id

Userlevel 1
Badge

Hi @Amit Rane ,

Please do feel free to refer the sample config files (local one and the one that needs to be placed on CDF) sent via the Support ticket and refer the mentioned documentation links. 

It would be helpful if you could set the logging level to Debug. (in case you come across any issues)

logger:
# Logging to console/terminal. Remove or comment out to disable terminal
# logging
console:
level: DEBUG

# Logging to file. Remove or comment out to disable file logging
file:
level: DEBUG
path: log.txt

Do feel free to write back via the Support ticket.

Thanks.

Userlevel 1

Hi @Kanchana Premachandra 

I have referred the provided files and now the extractor is working smoothly.

I have updated support ticket.

 

Thanks,

Amit

Reply