Skip to content

Использование udf в spark сессии DQ

Настройка метрики с udf

Добавление своих UDF происходит через конфиг.

Сущность UDF  в конфиге выглядит следующим образом

udf:
  - name: custom_udf
    url: 'https://artifactory-url:443/path_to_jar/jar_name-0.1.1.jar'  
    package: 'example' # опционально

У него есть:

  • name - уникальное имя UDF, нужно чтобы привязать его к метрике

  • url - ссылка, где лежит jar файл, чтобы DQ мог его скачать. При обновлении версии пакета этот атрибут также необходимо актуализировать, в противном случае DQ будет использовать старую версию

  • package - опциональный параметр, название scala пакета, если он задан в коде, то тогда его необходимо задать и здесь.

UDF привязывается к нужной метрике

metrics:
  - name: custom_udf_test
    type: custom_sql
    check_object: spark_test
    parameters:
      sql: 'select addVal(1)'
    udf: custom_udf

Пример полного конфига с UDF:

sources:
  - type: spark
    name: test_spark_udf
    parameters:
      credentials:
        type: vault
        path: "platform/dq/spark_cloud"
        user: "tech_user_2"
      spark_conf_parameters:
        spark.yarn.queue: 'default'
      cluster: test-cluster-name
    owners: ['user1'] 


udf:
  - name: custom_udf
    url: 'https://artifactory-url:443/path_to_jar/jar_name-0.1.1.jar' 
    package: 'example' # опционально

check_objects:
  - name: spark_test
    source: test_spark_udf
    table: metric_type

metrics:
  - name: custom_udf_test
    type: custom_sql
    check_object: spark_test
    parameters:
      sql: 'select addVal(1)'
    udf: custom_udf

compares:
  - name: compare_custom_udf_test
    type: compare_with_static_values
    parameters:
      min_value: 1
      max_value: 2
    metric: custom_udf_test

groups:
  - name: test_spark_udf_group
    compares:
      - compare_custom_udf_test

Чтобы UDF корректно отработал в процессе DQ, должны быть соблюдены правила по наименовании объекта и у объекта должен быть метод registerUdf, более подробно см. ниже.

Добавление Scala UDF

scala udf

Пример UDF, которая просто увеличивает число на единицу:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf

package com.example.udf // опционально

object dq {

  val addVal = udf((x: Int) => x+1)

  def registerUdf: Unit = {
    val spark = SparkSession.builder().getOrCreate()
    spark.udf.register("addVal", addVal)
  }

}

Имя объекта обязательно должно быть dq и у него должен быть метод для регистрации registerUdf, можно его просто скопировать из примера заменив название udf в spark.udf.register("<имя_udf>", <имя_udf>).

package является опциональным и если он указан, то в конфиге в атрибуте package сущности udf необходимо его указать, пример:

udf:
  - name: custom_udf
    url: 'https://artifactory-url:443/path_to_jar/jar_name-0.1.1.jar'
    package: 'com.example.udf'

Как использовать UDF в метрике

Вызвать UDF можно в sql запросе в метрике custom_sql, пример:

select addVal(1)