Источники данных (SOURCES)
Блок sources содержит описание источников данных.
Каждый источник имеет следующие поля:
-
type (строка) - тип источника. Поддерживаемые значения:
- aerospike, cassandra, clickhouse, hive, mongodb, mssql, mysql, oracle, postgres(этот же тип указывается для подключения к greenplum), spark, teradata, dataframe, trino
-
name (строка) - уникальное наименование источника, рекомендуем для именования использовать Naming convention и структура хранения конфигураций
-
external_ids (словарь) - идентификатор источника данных в Каталоге данных - MWS Data Cat или Alation (опционально).
- Подробнее см. ниже про Интеграцию с Каталогом данных
-
tags (список строк) - список тэгов (меток) для источника (опционально)
-
parameters (словарь) - параметры для подключения к источнику (host, port и т.д.), параметры Vault (credentials), параметры для настройки ресурсов для spark-задач (spark_conf_parameters) и т.д.
Более подробно про параметры для различных типов источников в параметры источников.
-
owners (список строк) - список овнеров (см. Ролевая модель), может быть название учетной записи и/или имя группы в ldap.
-
emails_for_failed_task_alert (список почтовых адресов) - список почтовых адресов для уведомнения о падении запуска проверки.
-
jira_project - название проекта в jira, в котором будет создана задача при срабатывании алерта.
Если необходимо добавить несколько источников, то в поле sources передается список словарей:
sources:
- type: spark
name: spark_cloud
parameters:
spark_conf_parameters:
spark.executor.memory: "1g"
spark.driver.memory: "1g"
spark.executor.instances: "1"
spark.yarn.queue: "example_queue"
credentials:
type: vault
path: "platform/dq/spark_cloud"
user: "username"
cluster: "test-cluster-name"
owners: ['user1', 'user2']
- type: spark
name: spark_cloud_2
parameters:
spark_conf_parameters:
spark.executor.memory: "2g"
spark.driver.memory: "2g"
spark.executor.instances: "1"
spark.yarn.queue: "another_queue"
credentials:
type: vault
path: "platform/dq/spark_cloud"
user: "tech_user_2"
cluster: "test-cluster-name"
owners: ['user1', 'user2']
jira_project: 'DA'
Параметры источников
Параметры для аутентификации на источнике
Для подключения к источникам данных DQ использует пару логин/пароль:
-
Логин - указывается в параметрах Source
-
Пароль - может храниться в одной из 2х систем:
-
HashiCorp Vault - внешнее хранилище секретов.
-
Internal Storage - внутренне хранилище секретов DQ. Можно использовать в отсутвие доступа к Vault
-
Параметры указываются в блоке credentials
Инструкция по работе с HashiCorp Vault
Пример параметров:
sources:
- type: spark
name: spark_cloud
parameters:
credentials:
type: vault
path: "platform/dq/spark_cloud"
user: "username"
...
Параметр | Описание | Обязательный |
---|---|---|
type (строка) |
Тип хранилища. Возможные значения: |
|
user (строка) |
Имя пользователя, под которым DQ подключится к источнику данных | |
path (строка) |
Путь где хранится пароль в Vault. |
В самом Vault внутри path должен быть создан key: value
-
key - это имя пользователя (должно совпадать с user из конфига)
-
value - это пароль
Инструкция по работе с внутренним хранилищем
-
Коротко об Internal Storage
- Пароли и другие чувствительные данные хранятся в зашифрованном виде.
- Прочитать значение может только бэкенд DQ. У конечного пользователя нет возможности получить значения, сохраненные в хранилище, даже в зашифрованном виде.
- У контейнера в хранилище, как и Source, есть владельцы (owners). Использовать данные из хранилища можно только в тех Source, где owner также является владельцем секрета в хранилище
Пример параметров:
sources:
- type: spark
name: spark_cloud
parameters:
credentials:
type: intenral
credential_id: "b045b68b-ff80-4193-8d41-5a9b54e4883c"
user: "username"
...
Параметр | Описание | Обязательный |
---|---|---|
type (строка) |
Тип хранилища. Для Internal storage указывается значение "internal" | |
user (строка) |
Имя пользователя, под которым DQ подключится к источнику данных | |
credential_id (строка) |
Идентификатор контейнера внутреннего хранилища с секретами, в котором хранится пароль |
Типовой сценарий с использованием Swagger
Шаг 1. Сохраняем данные от источника отправляя запрос на /v4/internal_storage_credentials
В теле запроса пользователь указывает пользователей в
owners
и учетные данные источник вdata
. В примере, вowners
передаем одного пользователяuser_1
и вdata
передаем данные от тестового источника, где ключом являетсяuser_name
, а значениемpassword
, это и есть имя пользователя и пароль от источника данных.
Json, который мы сформировали для POST метода
{
"owners": [
"user_1"
],
"data": {
"user_name": "password"
}
}
Далее передаем в поля X-username и X-Password данные от УЗ DQ или token и нажимаем кнопку "Execute".
Если вы все сделали правильно, то в ответе увидите сгенерированный
credential_uuid
Шаг 2. Добавляем credential_uuid в yaml-config с вашим source
.
Для этого в
sources
передаемcredentials
. Далее, значение для ключаtype
указываемinternal
, для ключаcredential_id
указываем ранее полученныйcredential_uuid
и для ключаuser
передаем имя учетной записи DQ.
sources:
- type: spark
name: spark_cloud
parameters:
spark_conf_parameters:
spark.executor.memory: "1g"
spark.driver.memory: "1g"
spark.executor.instances: "1"
spark.yarn.queue: "example_queue"
credentials:
type: internal
credential_id: "ee4cadac-8194-4dec-93ac-d68952cc30ee"
user: "username"
cluster: "test-cluster-name"
owners: ['user_1']
product: DQ
owner
и допустим, а также решили добавить еще один источник данных.
Для этого нам потребуется эндпоинт /v4/internal_storage_credentials/{credential_uuid}.
В
owners
добавим еще одного пользователя, а также добавим новые креды для источника данных.
Добавим новых owners и один дополнительный источник
{
"owners": [
"user_1",
"user_2"
],
"data": {
"user_2": "password_2"
}
}
ВАЖНО: В
owners
необходимо указывать и новых пользователя(-ей), и старых. В случае сdata
можно передавать данные для других источников не указывая предыдущие.Далее нам необходимо указать наши
username
иpassword
, как мы делали в первом шаге, и указать полученный ранееcredential_uuid
в полеcredential_uuid
, после чего нажать кнопку "Execute".
Если вы все сделали правильно, то получите сообщение об успешном обновлении вашего контейнера.
Пример работы через python и curl
-
Разберем тело запроса ниже
В
owners
передаются учетные записи владельца и тех, кто будет запускать или редактировать проверки.Также стоить отметить, что пользователь, который отправляет запрос, также записывается в owners по-умолчанию.
В
data
прередаются пара логин/пароль от источника(-ов), на данных которого(-ых) будут выполняться DQ проверки.Важно: Если вы хотите использовать auth token для формирования запроса, ознакомьтесь с документацией.
Тело запроса
{
"owners": [
"user_1",
"user_2",
"user_3"
],
"data": {
"user_name": "password",
}
}
Примеры запросов
from requests import post
url = 'https://{base_url}/api/v4/internal_storage_credentials'
data = {
"owners": [
"user_1",
"user_2",
"user_3"
],
"data": {
"user_name": "password",
}
}
headers = {
"x-username": "dq_user",
"x-password": "dq_pass"
}
internal_cred_request = post(url=url, json=data, headers=headers)
print(internal_cred_request.json())
curl -X 'POST' \
'https://{base_url}/api/v4/internal_storage_credentials' \
-H 'accept: application/json' \
-H 'X-Username: dq_user' \
-H 'X-Password: dq_pass' \
-H 'Content-Type: application/json' \
-d '{
"owners": [
"user_1",
"user_2",
"user_3"
],
"data": {
"user_name": "password"
}
}'
Пример ответа
{
"response": "Internal storage credential object created.",
"credential_uuid": "credential_uuid_value"
}
-
Также имеется возможность внесения изменений в
storage
.
Например, если необходимо добавить или удалить пользователей в
owners
или удалить УЗ из спискаdata
.Для этого необходимо отправить PUT запрос на эндпоинт /v4/internal_storage_credentials/{credential_uuid}
Примеры запросов
from requests import put
credential_uuid = 'credential_uuid_value'
url = 'https://{base_url}/api/v4/internal_storage_credentials/{credential_uuid}'
data = {
"owners": [
"new_user",
"user_2"
],
"data": {
"user_name": "password",
"new_user": "pass"
}
}
headers = {
"x-username": "dq_user",
"x-password": "dq_pass"
}
cred_update = put(url=url, json=data, headers=headers)
print(cred_update.json())
curl -X 'PUT' \
'https://{base_url}/v4/internal_storage_credentials/c045b68b-ff80-4193-8d41-5a9b54e4883a' \
-H 'accept: application/json' \
-H 'X-Username: dq_user' \
-H 'X-Password: dq_pass' \
-H 'Content-Type: application/yaml' \
-d '{
"owners": [
"new_user",
"user_2"
],
"data": {
"user_name": "password",
"new_user": "pass"
}
}'
Пример ответа
{
"response": "Internal storage credential object updated.",
"credential_uuid": "credential_uuid_value"
}
Далее остается только передать параметр credential_id
и значение internal
для параметра type
в наш source.
Пример source с internal credential
sources:
- type: spark
name: spark_cloud
parameters:
spark_conf_parameters:
spark.executor.memory: "1g"
spark.driver.memory: "1g"
spark.executor.instances: "1"
spark.yarn.queue: "example_queue"
credentials:
type: internal
credential_id: "credential_id_value"
user: "username"
cluster: "test-cluster-name"
owners: ['new_user', 'user_2']
product: DQ
Параметры Spark-источника
-
spark_conf_parameters (словарь) - Содержит параметры для конфигурации spark сессии (см. https://spark.apache.org/docs/latest/configuration.html), такие как spark.executor.memory, spark.driver.memory, spark.executor.instances и т.д, в поле parameters необходимо передать словарь spark_conf_parameters.
-
cluster (строка | Enum) - Название кластера
-
executor_threads (число, по-умолчанию = 1) - кол-во потоков (параллельно выполняемых jobs) на spark executors. Если задано больше 1, spark jobs будут выполняться одновременно, в результате ресурсы кластера могут использоваться более эффективно. При увеличении числа потоков также может потребоваться поднять память выделяемую на executors
Пример параметров для spark
parameters:
spark_conf_parameters:
spark.executor.memory: "2g"
credentials:
type: vault
path: "platform/dq/spark_cloud"
user: "username"
cluster: "test-cluster-name"
Переопределение таймзоны в spark сессии
По-умолчанию spark сессия будет создана с таймзоной UTC (spark.sql.session.timeZone).
Таймзону имеет смысл переопределить, если данные в таблице с типом timestamp хранятся с учетом другого часового пояса, например Europe/Moscow.
Это можно осуществить, добавив следующие spark_conf_parameters:
- spark.sql.session.timeZone - название часового пояса
Пример:
spark_conf_parameters:
spark.executor.memory: "1g"
spark.driver.memory: "1g"
spark.executor.instances: "1"
spark.sql.session.timeZone: "Europe/Moscow"
Параметры Hive-источника
-
user (строка) - Имя пользователя в Hive.
-
hosts (строка) - Имя name node с HiveServer в формате hostname:port. Допускается запись нескольких значений через запятую.
-
zooKeeper (словарь) - Переопределяет hosts.
-
hosts(строка) - адрес zookeeper в формате hostname:port
-
service(строка) - имя сервиса в zookeeper (по-умолчанию, "hiveserver2")
-
-
configuration (словарь) - Содержит параметры конфигурации для Hive, которая будет применена при подключении
-
kerberos_service_name (строка) - имя principal (указывается только для кластеров с kerberos)
-
database (строка) - База данных, по умолчанию 'default'
Пример hive параметров
parameters:
credentials:
type: vault
path: "platform/dq/spark_cloud"
user: "username"
kerberos_service_name: "hive"
zooKeeper:
hosts: zookeeper-address:2181
service: "hiveserver2"
configuration:
tez.queue.name: 'dq'
Рекомендуется указывать адрес ZooKeeper, а не HiveServer напрямую
Параметры Oracle источника
-
host (строка) - Имя сервера с бд
-
port (строка) - Порт на котором запущен
-
sid (строка) - oracle system identifier (имя инстанса oracle)
-
service_name (строка) - oracle service name
-
extra_params - дополнительные (опциональные) параметры для настройки сессии. Параметры передаются в формате key: value и затем подставляются в запрос ALTER SESSION SET key = value. Например это можно использовать для изменения таймзоны: TIME_ZONE = '+03:00'.
Для oracle нужно обязательно указать один из параметров: sid или service_name
Пример параметров Oracle
parameters:
credentials:
type: vault
path: "platform/dq/oracle"
user: "username"
host: "localhost"
port: "15210"
sid: "ORCLCDB.localdomain"
extra_params:
TIME_ZONE: "'+03:00'"
Параметры Postgres(Greenplum), MySQL, MongoDB, MSSQL источников
-
host (строка) - Имя сервера с бд
-
port (строка) - Порт на котором запущен
-
database (строка) - Имя бд
Пример параметров Postgres
parameters:
credentials:
type: vault
path: "platform/dq/postgres_path"
user: "username"
host: postgres-domain.com
port: "5438"
database: "cloud_dq_test"
Пример параметров MongoDB
parameters:
credentials:
type: vault
path: "platform/dq/mongo_path"
user: "dq_user"
host: "mongo"
port: 27017
database: "dq_db"
Параметры Teradata источника
-
host (строка) - Имя сервера с бд
-
port (строка) - Порт на котором запущен, опциональный параметр (значение по-умолчанию = 1025)
-
database (строка) - Имя бд, опциональный параметр. Необходимо указать если требуется LDAP аутентификация (Добавляется параметр LOGMECH=LDAP в url)
Пример параметров Teradata
parameters:
credentials:
type: vault
path: "platform/dq/teradata_path"
user: "username"
host: teradata
port: "1025"
Параметры Aerospike источника
-
host (строка) - Имя сервера с бд
-
port (строка) - Порт на котором запущен, опциональный параметр (значение по-умолчанию = 3000)
Пример параметров Aerospike
parameters:
credentials:
type: vault
path: "platform/dq/aerospike_path"
user: "username"
host: aerospike
port: "3000"
Параметры ClickHouse источника
-
host (строка) - Имя сервера с бд
-
port (строка) - TCP порт на котором запущен clickhouse, опциональный параметр (значение по-умолчанию = "9000")
- DQ умеет подключаться к Clickhouse только по TCP протоколу (HTTP сейчас не поддерживается)
-
database(строка) - (значение по-умолчанию='default')
-
extra_params - дополнительные (опциональные) параметры для коннектора Кликхауса. В частности используются для передачи параметров secure и verify, необходимых для защищенного подключения по порту 9440 (как в примере ниже).
Пример параметров ClickHouse
parameters:
credentials:
type: vault
path: "platform/dq/click_path"
user: "username"
host: "dq_clickhouse"
port: "9440"
database: "dq_db"
extra_params:
secure: true
verify: false
Параметры Cassandra источника
-
keyspace (строка) - Пространство ключей
-
host (строка) - Имя сервера на котором запущена Cassandra
-
port (строка) - Порт, на котором запущена cassandra
Важные ограничения:
В текущей реализации группировка (group by) в параметрах метрики не реализована.
keyspace задается на уровне source, если нужны проверки в разных keyspace, потребуется создать еще один source
Пример параметров Cassandra
parameters:
host: cassandra
port: "9042"
keyspace: "test_cassandra_keyspace"
credentials:
type: vault
path: "platform/dq/cassandra_path"
user: "test_user"
Параметры Dataframe-источника
У dataframe источника отсутствуют параметры.
Важные ограничения:
В текущей реализации группы, сравнения и метрики связанные с источником
dataframe нельзя запускать.
Параметры Trino-источника
Данный тип источника используется для создания кросс проверок с построчным сравнением данных между разными системами - Перекрестные проверки между источниками
-
host (строка) - имя или ip адрес Trino сервера
-
port (число) - порт, на котором запущен трино
-
extra_params (словарь) - дополнительные (опциональные) параметры для коннектора.
Пример параметров Trino
parameters:
credentials:
type: vault
path: "platform/dq/trino"
user: "username"
host: trino
port: "8443"
extra_params:
timezone: "Europe/Moscow"
Интеграция с Каталогом данных
DQ поддерживает интеграцию с Каталогами данных.
-
Alation
- DQ отправляет агрегированный статус проверок по данному источнику в alation в виде светофора. Светофор будет проставляться для отдельных таблиц в рамках источника.
-
Datahub aka MWS Data Cat
- Статистика по проверкам отображается в MWS Data Cat на
странице с Объектом и на страницах с поисковой выдачей.
Статистика включает в себя:
- кол-во проверок на объект и отдельный атрибут
- наличие алертов в ITSM систему на объект
- таблица со списком проверок по Объекту с результатами последнего запуска
- Статистика по проверкам отображается в MWS Data Cat на
странице с Объектом и на страницах с поисковой выдачей.
Статистика включает в себя:
Для настройки интеграции в Source требуется добавить атрибут external_ids.
Атрибут представляет собой словарь с одним или двумя ключами обозначающими тип Каталога данных, с которым устанавливается интеграция:
sources:
- name: test_source
type: spark
external_ids:
alation: 404
datahub: "284_RMS"
...
parameters:
...
Значение, которое нужно прописать в атрибуте - это ID источника данных в Каталоге данных:
-
alation - ID Data source в Alation
-
datahub - значение "platform instance"