Использование udf в spark сессии DQ
Настройка метрики с udf
Добавление своих UDF происходит через конфиг.
Сущность UDF в конфиге выглядит следующим образом
udf:
- name: custom_udf
url: 'https://artifactory-url:443/path_to_jar/jar_name-0.1.1.jar'
package: 'example' # опционально
У него есть:
-
name - уникальное имя UDF, нужно чтобы привязать его к метрике
-
url - ссылка, где лежит jar файл, чтобы DQ мог его скачать. При обновлении версии пакета этот атрибут также необходимо актуализировать, в противном случае DQ будет использовать старую версию
-
package - опциональный параметр, название scala пакета, если он задан в коде, то тогда его необходимо задать и здесь.
UDF привязывается к нужной метрике
metrics:
- name: custom_udf_test
type: custom_sql
check_object: spark_test
parameters:
sql: 'select addVal(1)'
udf: custom_udf
Пример полного конфига с UDF:
sources:
- type: spark
name: test_spark_udf
parameters:
credentials:
type: vault
path: "platform/dq/spark_cloud"
user: "tech_user_2"
spark_conf_parameters:
spark.yarn.queue: 'default'
cluster: test-cluster-name
owners: ['user1']
udf:
- name: custom_udf
url: 'https://artifactory-url:443/path_to_jar/jar_name-0.1.1.jar'
package: 'example' # опционально
check_objects:
- name: spark_test
source: test_spark_udf
table: metric_type
metrics:
- name: custom_udf_test
type: custom_sql
check_object: spark_test
parameters:
sql: 'select addVal(1)'
udf: custom_udf
compares:
- name: compare_custom_udf_test
type: compare_with_static_values
parameters:
min_value: 1
max_value: 2
metric: custom_udf_test
groups:
- name: test_spark_udf_group
compares:
- compare_custom_udf_test
Чтобы UDF корректно отработал в процессе DQ, должны быть соблюдены правила по наименовании объекта и у объекта должен быть метод registerUdf, более подробно см. ниже.
Добавление Scala UDF
scala udf
Пример UDF, которая просто увеличивает число на единицу:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
package com.example.udf // опционально
object dq {
val addVal = udf((x: Int) => x+1)
def registerUdf: Unit = {
val spark = SparkSession.builder().getOrCreate()
spark.udf.register("addVal", addVal)
}
}
Имя объекта обязательно должно быть dq и у него должен быть метод для регистрации registerUdf, можно его просто скопировать из примера заменив название udf в spark.udf.register("<имя_udf>", <имя_udf>).
package является опциональным и если он указан, то в конфиге в атрибуте package сущности udf необходимо его указать, пример:
udf:
- name: custom_udf
url: 'https://artifactory-url:443/path_to_jar/jar_name-0.1.1.jar'
package: 'com.example.udf'
Как использовать UDF в метрике
Вызвать UDF можно в sql запросе в метрике custom_sql, пример:
select addVal(1)