Skip to content

Установка dq-sampler-worker

Процедура настройки Sampler worker через docker-compose

Требования

  • Сетевой доступ между DQ Sample Worker и RabbitMQ в контуре DQ Neo.
  • Версии docker >= 28.1.1, docker-compose >= 2.36.2.
  • Решение о БД Postgres: установить отдельный экземпляр БД, в которую будут сохраняться сэмлпы, или подключить существующую БД.
  • Установленный dq-core.
  • Созданные в RabbitMQ: DQ_QUEUE_NAME и DQ_EXCHANGE_NAME.

Настройка docker-compose

  1. Создайте файл docker-compose.yml:
    version: "3"
    services:
      sampler-worker:
        image: gitea.demo.dataops.mts.ru/mws-data-test/dq-sampler-worker:latest
        restart: always
        command: ["worker", "1"]
        depends_on:
          - db
        env_file:
          - var.env
        volumes:
          # Директория в которой хранятся логи
          - ./logs:/code/logs
        netw.
          - sampler_worker__net
    
      db:
        image: gitea.demo.dataops.mts.ru/mws-data-test/dq-sampler-worker/postgres:latest
        restart: always
        environment:
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: postgres
        volumes:
          - ./postgres_data:/var/lib/postgresql/data
        networks:
          - sampler_worker__net
    
      filebeat:
        image: filebeat:latest
        hostname: <your-dq-host>  # Укажите актуальный хост для вашего окружения
        restart: always
        volumes:
          # Директория в которой хранятся логи
          - ./logs:/data/dq-sampler-worker/logs:ro
          # Конфиг для filebeat
          - ./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
          # Сертификат если используется SSL при отправке логов
          - ./class2root.crt:/data/dq-sampler-worker/class2root.crt:ro
        networks:
          - sampler_worker__net
    
    volumes:
      postgres_data:
    
    networks:
      sampler_worker__net:
    
  2. Создайте файл var.env:

    # RabbitMQ
    DQ_RABBIT_HOST=amqp://user:password@rabbit:5676/core
    DQ_QUEUE_NAME=sampler_worker_queue
    DQ_EXCHANGE_NAME=dq_core.sampler_worker
    DQ_ROUTING_KEY=sampler_worker_test
    WORKERS_COUNT=4
    
    # PostgreSQL
    POSTGRES_USER=postgres
    POSTGRES_PASS=postgres
    POSTGRES_HOST=db
    POSTGRES_PORT=5432
    POSTGRES_DB=sampler_worker__db
    
    # Логирование
    LOG_FORMAT=[%(asctime)s] %(pathname)s %(lineno)s %(source)s %(run_id)s %(task_id)s %(task_name)s %(entity_name)s %(levelname)s %(message)s
    LOGFILE_PATH=logs/sampler_worker.log
    LOG_LEVEL=INFO
    
    # Дополнительно
    LIMIT=1000  # Ограничение строк в sample (по умолчанию)
    

  3. (Опционально) Настройте filebeat.yml:

    filebeat.inputs:
      - type: log
        paths:
          - /data/dq-sampler-worker/logs/*.log
        ignore_older: 15m
        close_inactive: 1m
        fields:
          indexName: dq-sampler-worker-test
          solObjID: 2310
          serviceName: "dq-sampler-worker"
          fields_under_root: true
    
    processors:
      - convert:
          fields:
            - {from: "host.name", to: "host", type: "string"}
      - timestamp:
          field: "@timestamp"
          target_field: "logDate"
          layouts:
            - '2006-01-02T15:04:05Z'
    
    output.logstash:
      hosts: 'beats.obs.mts.ru:5044'
      ssl:
        certificate_authorities: "/data/dq-sampler-worker/class2root.crt"
    

Запуск

docker-compose -f docker-compose.yml up -d

Обновление метаданных

Сделайте POST запрос в на добавление sampler воркера в DQ API api/v4/sampler_worker

sampler_worker_to_hostname:
  - worker: "sampler-worker-name"          # Название sampler воркера
    hostname: "source.example.com"         # Хост по которому источник данных подключен к DQ
    rabbitmq_queue: "dq_worker_queue"      # Значение переменной DQ_QUEUE_NAME на sampler-worker
    routing_key: "dq_routing_key"          # Значение переменной DQ_ROUTING_KEY на sampler-worker

Примечания

  • Контейнеры filebeat и db не являются обязательными.
  • Для использования своей БД Postgres укажите соответствующие переменные в var.env.

Развертывание Sample Worker с поддержкой Trino

1. Настройка переменных окружения

Для работы с Trino добавьте в конфигурацию Sample Worker следующие переменные:

TRINO_USER="trino_user"       # Учетная запись Trino  
TRINO_PASSWORD="secure_pass"  # Пароль (рекомендуется хранить в Vault)  

2. Настройка маршрутизации задач

Через DQ API api/v4/sampler_worker укажите, какой Sample Worker должен обрабатывать запросы к Trino:

sampler_worker_to_hostname:
  - worker: "trino_worker"
    hostname: "trino.example.com"          # Хост Trino
    rabbitmq_queue: "dq_worker_queue"      # Очередь RabbitMQ
    routing_key: "trino_sample_requests"   # Ключ маршрутизации

3. Где хранятся семплы?

  • Список семплов: public.sample_list (настраивается через sample_list_table_name).
  • Данные семпла: сохраняются в таблицу вида SMPL_{run_uuid}_{compare_id}.

Важные замечания

  • Все примеры используют test_database_connection вместо реальных путей.
  • Для безопасности не храните пароли в коде — используйте Vault или аналоги.