Skip to content

Источники данных (SOURCES)

Блок sources содержит описание источников данных.

Каждый источник имеет следующие поля:

  • type (строка) - тип источника. Поддерживаемые значения:

    • aerospike, cassandra, clickhouse, hive, mongodb, mssql, mysql, oracle, postgres(этот же тип указывается для подключения к greenplum), spark, teradata, dataframe, trino
  • name (строка) - уникальное наименование источника, рекомендуем для именования использовать Naming convention и структура хранения конфигураций

  • external_ids (словарь) - идентификатор источника данных в Каталоге данных - MWS Data Cat или Alation (опционально).

  • tags (список строк) - список тэгов (меток) для источника (опционально)

  • parameters (словарь) - параметры для подключения к источнику (host, port и т.д.), параметры Vault (credentials), параметры для настройки ресурсов для spark-задач (spark_conf_parameters) и т.д.

    Более подробно про параметры для различных типов источников в параметры источников.

  • owners (список строк) - список овнеров (см. Ролевая модель), может быть название учетной записи и/или имя группы в ldap.

  • emails_for_failed_task_alert (список почтовых адресов) - список почтовых адресов для уведомнения о падении запуска проверки.

  • jira_project - название проекта в jira, в котором будет создана задача при срабатывании алерта.

Если необходимо добавить несколько источников, то в поле sources передается список словарей:

sources:
  - type: spark
    name: spark_cloud
    parameters:
      spark_conf_parameters:
        spark.executor.memory: "1g"
        spark.driver.memory: "1g"
        spark.executor.instances: "1"
        spark.yarn.queue: "example_queue"
      credentials:
        type: vault
        path: "platform/dq/spark_cloud"
        user: "username"
      cluster: "test-cluster-name"
    owners: ['user1', 'user2']

  - type: spark
    name: spark_cloud_2
    parameters:
      spark_conf_parameters:
        spark.executor.memory: "2g"
        spark.driver.memory: "2g"
        spark.executor.instances: "1"
        spark.yarn.queue: "another_queue"
      credentials:
        type: vault
        path: "platform/dq/spark_cloud"
        user: "tech_user_2"
      cluster: "test-cluster-name"
    owners: ['user1', 'user2']
    jira_project: 'DA'

Параметры источников

Параметры для аутентификации на источнике

Для подключения к источникам данных DQ использует пару логин/пароль:

  • Логин - указывается в параметрах Source

  • Пароль - может храниться в одной из 2х систем:

    • HashiCorp Vault - внешнее хранилище секретов.

    • Internal Storage - внутренне хранилище секретов DQ. Можно использовать в отсутвие доступа к Vault

Параметры указываются в блоке credentials

Инструкция по работе с HashiCorp Vault

Пример параметров:

sources:
  - type: spark
    name: spark_cloud
    parameters:
      credentials:
        type: vault
        path: "platform/dq/spark_cloud"
        user: "username"
      ...

Параметр Описание Обязательный
type (строка) Тип хранилища. Возможные значения:
  • vault
  • vault_local
  • user (строка) Имя пользователя, под которым DQ подключится к источнику данных
    path (строка) Путь где хранится пароль в Vault.

    В самом Vault внутри path должен быть создан key: value

    • key - это имя пользователя (должно совпадать с user из конфига)

    • value - это пароль

    Инструкция по работе с внутренним хранилищем
    • Коротко об Internal Storage


      • Пароли и другие чувствительные данные хранятся в зашифрованном виде.
      • Прочитать значение может только бэкенд DQ. У конечного пользователя нет возможности получить значения, сохраненные в хранилище, даже в зашифрованном виде.
      • У контейнера в хранилище, как и Source, есть владельцы (owners). Использовать данные из хранилища можно только в тех Source, где owner также является владельцем секрета в хранилище

      alt text

    Пример параметров:

    sources:
      - type: spark
        name: spark_cloud
        parameters:
          credentials:
            type: intenral
            credential_id: "b045b68b-ff80-4193-8d41-5a9b54e4883c"
            user: "username"
          ...
    

    Параметр Описание Обязательный
    type (строка) Тип хранилища. Для Internal storage указывается значение "internal"
    user (строка) Имя пользователя, под которым DQ подключится к источнику данных
    credential_id (строка) Идентификатор контейнера внутреннего хранилища с секретами, в котором хранится пароль
    Типовой сценарий с использованием Swagger

    Шаг 1. Сохраняем данные от источника отправляя запрос на /v4/internal_storage_credentials

    В теле запроса пользователь указывает пользователей в owners и учетные данные источник в data. В примере, в owners передаем одного пользователя user_1 и в data передаем данные от тестового источника, где ключом является user_name, а значением password, это и есть имя пользователя и пароль от источника данных.

    Json, который мы сформировали для POST метода

    {
      "owners": [
        "user_1" 
      ],
      "data": {
        "user_name": "password"
      }
    }
    

    Далее передаем в поля X-username и X-Password данные от УЗ DQ или token и нажимаем кнопку "Execute".

    Если вы все сделали правильно, то в ответе увидите сгенерированный credential_uuid

    Шаг 2. Добавляем credential_uuid в yaml-config с вашим source.

    Для этого в sources передаем credentials. Далее, значение для ключа type указываем internal, для ключа credential_id указываем ранее полученный credential_uuid и для ключа user передаем имя учетной записи DQ.

    sources:
      - type: spark
        name: spark_cloud
        parameters:
          spark_conf_parameters:
            spark.executor.memory: "1g"
            spark.driver.memory: "1g"
            spark.executor.instances: "1"
            spark.yarn.queue: "example_queue"
          credentials: 
            type: internal
            credential_id: "ee4cadac-8194-4dec-93ac-d68952cc30ee"
            user: "username"
          cluster: "test-cluster-name"
        owners: ['user_1']
        product: DQ
    
    Шаг 3. Мы вспомнили, что забыли добавить еще одного owner и допустим, а также решили добавить еще один источник данных.

    Для этого нам потребуется эндпоинт /v4/internal_storage_credentials/{credential_uuid}.

    В owners добавим еще одного пользователя, а также добавим новые креды для источника данных.

    Добавим новых owners и один дополнительный источник

    {
      "owners": [
        "user_1",
        "user_2"
      ],
      "data": {
        "user_2": "password_2"
      }
    }
    

    ВАЖНО: В owners необходимо указывать и новых пользователя(-ей), и старых. В случае с data можно передавать данные для других источников не указывая предыдущие.

    Далее нам необходимо указать наши username и password, как мы делали в первом шаге, и указать полученный ранее credential_uuid в поле credential_uuid, после чего нажать кнопку "Execute".

    Если вы все сделали правильно, то получите сообщение об успешном обновлении вашего контейнера.

    Пример работы через python и curl
    • Разберем тело запроса ниже


      В owners передаются учетные записи владельца и тех, кто будет запускать или редактировать проверки.

      Также стоить отметить, что пользователь, который отправляет запрос, также записывается в owners по-умолчанию.

      В data прередаются пара логин/пароль от источника(-ов), на данных которого(-ых) будут выполняться DQ проверки.

      Важно: Если вы хотите использовать auth token для формирования запроса, ознакомьтесь с документацией.

    Тело запроса

    {
      "owners": [
        "user_1",
        "user_2",
        "user_3"
      ],
      "data": {
        "user_name": "password",
      }
    }
    


    Примеры запросов

    from requests import post 
    
    url = 'https://{base_url}/api/v4/internal_storage_credentials'
    
    data = {
        "owners": [
            "user_1",
            "user_2",
            "user_3"
      ],
      "data": {
        "user_name": "password",
      }
    }
    
    headers = {
        "x-username": "dq_user",
        "x-password": "dq_pass"
    }
    
    internal_cred_request = post(url=url, json=data, headers=headers)
    print(internal_cred_request.json())
    
    curl -X 'POST' \
      'https://{base_url}/api/v4/internal_storage_credentials' \
      -H 'accept: application/json' \
      -H 'X-Username: dq_user' \
      -H 'X-Password: dq_pass' \
      -H 'Content-Type: application/json' \
      -d '{
      "owners": [
        "user_1",
        "user_2",
        "user_3"
      ],
      "data": {
        "user_name": "password"
      }
    }'
    


    Пример ответа

    {
      "response": "Internal storage credential object created.",
      "credential_uuid": "credential_uuid_value"
    }
    


    • Также имеется возможность внесения изменений в storage.


      Например, если необходимо добавить или удалить пользователей в owners или удалить УЗ из списка data.

      Для этого необходимо отправить PUT запрос на эндпоинт /v4/internal_storage_credentials/{credential_uuid}


    Примеры запросов

    from requests import put 
    
    credential_uuid = 'credential_uuid_value'
    
    url = 'https://{base_url}/api/v4/internal_storage_credentials/{credential_uuid}'
    
    data = {
        "owners": [
            "new_user",
            "user_2"
      ],
      "data": {
        "user_name": "password",
        "new_user": "pass"
      }
    }
    
    headers = {
        "x-username": "dq_user",
        "x-password": "dq_pass"
    }
    
    cred_update = put(url=url, json=data, headers=headers)
    print(cred_update.json())
    
    curl -X 'PUT' \
      'https://{base_url}/v4/internal_storage_credentials/c045b68b-ff80-4193-8d41-5a9b54e4883a' \
      -H 'accept: application/json' \
      -H 'X-Username: dq_user' \
      -H 'X-Password: dq_pass' \
      -H 'Content-Type: application/yaml' \
      -d '{
      "owners": [
        "new_user",
        "user_2"
      ],
      "data": {
        "user_name": "password",
        "new_user": "pass"
      }
    }'
    

    Пример ответа

    {
      "response": "Internal storage credential object updated.",
      "credential_uuid": "credential_uuid_value"
    }
    

    Далее остается только передать параметр credential_id и значение internal для параметра type в наш source.

    Пример source с internal credential

    sources:
      - type: spark
        name: spark_cloud
        parameters:
          spark_conf_parameters:
            spark.executor.memory: "1g"
            spark.driver.memory: "1g"
            spark.executor.instances: "1"
            spark.yarn.queue: "example_queue"
          credentials: 
            type: internal
            credential_id: "credential_id_value"
            user: "username"
          cluster: "test-cluster-name"
        owners: ['new_user', 'user_2']
        product: DQ
    


    Параметры Spark-источника

    • spark_conf_parameters (словарь) - Содержит параметры для конфигурации spark сессии (см. https://spark.apache.org/docs/latest/configuration.html), такие как spark.executor.memory, spark.driver.memory, spark.executor.instances и т.д, в поле parameters необходимо передать словарь spark_conf_parameters.

    • cluster (строка | Enum) - Название кластера

    • executor_threads (число, по-умолчанию = 1) - кол-во потоков (параллельно выполняемых jobs) на spark executors. Если задано больше 1, spark jobs будут выполняться одновременно, в результате ресурсы кластера могут использоваться более эффективно. При увеличении числа потоков также может потребоваться поднять память выделяемую на executors

    Пример параметров для spark

    parameters:
      spark_conf_parameters:
        spark.executor.memory: "2g"
      credentials:
        type: vault
        path: "platform/dq/spark_cloud"
        user: "username"
      cluster: "test-cluster-name"
    
    Переопределение таймзоны в spark сессии

    По-умолчанию spark сессия будет создана с таймзоной UTC (spark.sql.session.timeZone).

    Таймзону имеет смысл переопределить, если данные в таблице с типом timestamp хранятся с учетом другого часового пояса, например Europe/Moscow.

    Это можно осуществить, добавив следующие spark_conf_parameters:

    • spark.sql.session.timeZone - название часового пояса

    Пример:

    spark_conf_parameters:
      spark.executor.memory: "1g"
      spark.driver.memory: "1g"
      spark.executor.instances: "1"
      spark.sql.session.timeZone: "Europe/Moscow"
    

    Параметры Hive-источника

    • user (строка) - Имя пользователя в Hive.

    • hosts (строка) - Имя name node с HiveServer в формате hostname:port. Допускается запись нескольких значений через запятую.

    • zooKeeper (словарь) -  Переопределяет hosts.

      • hosts(строка) - адрес zookeeper в формате  hostname:port

      • service(строка) - имя сервиса в zookeeper (по-умолчанию, "hiveserver2")

    • configuration (словарь) - Содержит параметры конфигурации для Hive, которая будет применена при подключении

    • kerberos_service_name (строка) - имя principal (указывается только для кластеров с kerberos)

    • database (строка) - База данных, по умолчанию 'default'

    Пример hive параметров

    parameters:
      credentials:
        type: vault
        path: "platform/dq/spark_cloud"
        user: "username"
      kerberos_service_name: "hive"
      zooKeeper:
        hosts: zookeeper-address:2181
        service: "hiveserver2"
      configuration:
        tez.queue.name: 'dq'
    

    Рекомендуется указывать адрес ZooKeeper, а не HiveServer напрямую

    Параметры Oracle источника

    • host (строка) - Имя сервера с бд

    • port (строка) - Порт на котором запущен

    • sid (строка) - oracle system identifier (имя инстанса oracle)

    • service_name (строка) - oracle service name

    • extra_params - дополнительные (опциональные) параметры для настройки сессии. Параметры передаются в формате key: value и затем подставляются в запрос ALTER SESSION SET key = value. Например это можно использовать для изменения таймзоны: TIME_ZONE = '+03:00'.

    Для oracle нужно обязательно указать один из параметров: sid или service_name

    Пример параметров Oracle

    parameters:
      credentials:
        type: vault
        path: "platform/dq/oracle"
        user: "username"
      host: "localhost"
      port: "15210"
      sid: "ORCLCDB.localdomain"
      extra_params:
        TIME_ZONE: "'+03:00'"
    

    Параметры Postgres(Greenplum), MySQL, MongoDB, MSSQL источников

    • host (строка) - Имя сервера с бд

    • port (строка) - Порт на котором запущен

    • database (строка) - Имя бд

    Пример параметров Postgres

    parameters:
      credentials:
        type: vault
        path: "platform/dq/postgres_path"
        user: "username"
      host: postgres-domain.com
      port: "5438"
      database: "cloud_dq_test"
    

    Пример параметров MongoDB

    parameters:
      credentials:
        type: vault
        path: "platform/dq/mongo_path"
        user: "dq_user"
      host: "mongo"
      port: 27017
      database: "dq_db"
    

    Параметры Teradata источника

    • host (строка) - Имя сервера с бд

    • port (строка) - Порт на котором запущен, опциональный параметр (значение по-умолчанию = 1025)

    • database (строка) - Имя бд, опциональный параметр. Необходимо указать если требуется LDAP аутентификация (Добавляется параметр LOGMECH=LDAP в url)

    Пример параметров Teradata

    parameters:
      credentials:
        type: vault
        path: "platform/dq/teradata_path"
        user: "username"
      host: teradata
      port: "1025"
    

    Параметры Aerospike источника

    • host (строка) - Имя сервера с бд

    • port (строка) - Порт на котором запущен, опциональный параметр (значение по-умолчанию = 3000)

    Пример параметров Aerospike

    parameters:
      credentials:
        type: vault
        path: "platform/dq/aerospike_path"
        user: "username"
      host: aerospike
      port: "3000"
    

    Параметры ClickHouse источника

    • host (строка) - Имя сервера с бд

    • port (строка) - TCP порт на котором запущен clickhouse, опциональный параметр (значение по-умолчанию = "9000")

      • DQ умеет подключаться к Clickhouse только по TCP протоколу (HTTP сейчас не поддерживается)
    • database(строка) - (значение по-умолчанию='default')

    • extra_params - дополнительные (опциональные) параметры для коннектора Кликхауса. В частности используются для передачи параметров secure и verify, необходимых для защищенного подключения по порту 9440 (как в примере ниже).

    Пример параметров ClickHouse

    parameters:
      credentials:
        type: vault
        path: "platform/dq/click_path"
        user: "username"
      host: "dq_clickhouse"
      port: "9440"
      database: "dq_db"
      extra_params:
        secure: true
        verify: false
    

    Параметры Cassandra источника

    • keyspace (строка) - Пространство ключей

    • host (строка) - Имя сервера на котором запущена Cassandra

    • port (строка) - Порт, на котором запущена cassandra

    Важные ограничения:

    В текущей реализации группировка (group by) в параметрах метрики не реализована.

    keyspace задается на уровне source, если нужны проверки в разных keyspace, потребуется создать еще один source

    Пример параметров Cassandra

    parameters:
      host: cassandra
      port: "9042"
      keyspace: "test_cassandra_keyspace"
      credentials:
        type: vault
        path: "platform/dq/cassandra_path"
        user: "test_user"
    

    Параметры Dataframe-источника

    У dataframe источника отсутствуют параметры.

    Важные ограничения:

    В текущей реализации группы, сравнения и метрики связанные с источником dataframe нельзя запускать.

    Параметры Trino-источника

    Данный тип источника используется для создания кросс проверок с построчным сравнением данных между разными системами - Перекрестные проверки между источниками

    • host (строка) - имя или ip адрес Trino сервера

    • port (число) - порт, на котором запущен трино

    • extra_params (словарь) - дополнительные (опциональные) параметры для коннектора.

    Пример параметров Trino

    parameters:
      credentials:
        type: vault
        path: "platform/dq/trino"
        user: "username"
      host: trino
      port: "8443"
      extra_params:
        timezone: "Europe/Moscow"
    

    Интеграция с Каталогом данных

    DQ поддерживает интеграцию с Каталогами данных.

    • Alation

      • DQ отправляет агрегированный статус проверок по данному источнику в alation в виде светофора. Светофор будет проставляться для отдельных таблиц в рамках источника.
    • Datahub aka MWS Data Cat

      • Статистика по проверкам отображается в MWS Data Cat на странице с Объектом и на страницах с поисковой выдачей. Статистика включает в себя:
        • кол-во проверок на объект и отдельный атрибут
        • наличие алертов в ITSM систему на объект
        • таблица со списком проверок по Объекту с результатами последнего запуска

    Для настройки интеграции в Source требуется добавить атрибут external_ids.

    Атрибут представляет собой словарь с одним или двумя ключами обозначающими тип Каталога данных, с которым устанавливается интеграция:

    sources:
      - name: test_source
        type: spark
        external_ids:
          alation: 404
          datahub: "284_RMS"
        ...
        parameters:
          ...
    

    Значение, которое нужно прописать в атрибуте - это ID источника данных в Каталоге данных:

    • alation - ID Data source в Alation

    • datahub - значение "platform instance"