Одна статья для понимания: анализ исходного кода Apache DolphinScheduler3.0.
Одна статья для понимания: анализ исходного кода Apache DolphinScheduler3.0.

Самое полное руководство по улучшению интервью с большими данными в Интернете!

Содержание этой статьи
  1. Дизайн и стратегия DolphinScheduler 1.1 Распределенный дизайн 1.1.1 Централизация 1.1.2 Децентрализация 1.2 Проектирование архитектуры DophinScheduler 1.3 Проблемы отказоустойчивости 1.3.1 Отказоустойчивость во время простоя 1.3.2 Повторная попытка после неудачи 1.4 Удаленный доступ к журналу
  2. Анализ исходного кода DolphinScheduler 2.1 Введение в модуль проекта и файл конфигурации 2.1.1 Знакомство с инженерным модулем 2.1.2 Файл конфигурации 2.2 Интерфейс основной задачи API 2.3 Архитектура Quaterz и процесс работы 2.3.1 Концепция и архитектура 2.3.2 Процесс инициализации и выполнения 2.3.3 Работа кластера 2.4 Мастер-запуск и процесс выполнения 2.4.1 Концепция и логика выполнения 2.4.2 Кластеры и слоты 2.4.3 Процесс выполнения кода 2.5 Процесс запуска и выполнения работ 2.5.1 Концепция и логика выполнения 2.5.2 Процесс выполнения кода 2.6 rpcinteraction 2.6.1 Взаимодействие Мастера и Работника 2.6.2 Другие сервисы взаимодействуют с Мастером 2.7 Алгоритм балансировки нагрузки 2.7.1 Взвешенный случайный выбор 2.7.2 Линейная нагрузка 2.7.3 Плавный опрос 2.8 Служба журналов 2.9 Сигнализация

1. Дизайн и стратегия DolphinScheduler

Если вы обратите внимание на DolphinScheduler, вы должны иметь определенное представление о системе планирования. Я не буду вдаваться в подробности о некоторых существительных, участвующих в планировании, я сосредоточусь на определении процесса, экземпляре процесса, определении задачи и экземпляре задачи. (Концепция отсутствия рабочих мест действительно очень нова, возможно, потому, что я не хочу пересекаться с JobDetail Quartz).

  • Определение задачи. Ключевыми компонентами определения процесса являются различные типы задач, такие как sql, Shell, Spark, Mr, Python и т. д.;
  • Экземпляр задачи: создание экземпляра задачи, определяющее конкретный статус выполнения задачи;
  • Определение процесса: направленный ациклический граф (DAG), созданный набором узлов задач посредством зависимостей;
  • Экземпляр процесса: экземпляр процесса, созданный посредством ручного или запланированного планирования;
  • Планирование времени: система использует распределенный планировщик Quartz и поддерживает визуальное создание выражений cron;

1.1 Распределенный дизайн

Архитектурное проектирование распределенных систем в основном делится на два типа: централизованные и децентрализованные. Каждый из них имеет свои преимущества и недостатки, в зависимости от соответствующего выбора бизнеса.

1.1.1 Централизация

Централизованная конструкция относительно проста. Роли установки узла в кластере можно разделить на два типа: главный и подчиненный, как показано ниже:

Главный: роль главного узла в основном отвечает за распределение задач и мониторинг состояния подчиненного устройства. Он может динамически распределять задачи по подчиненным, чтобы подчиненные узлы не были «заняты до смерти» или «простояли до смерти».

Есть некоторые проблемы с централизованным проектированием.

Первый момент: если с Мастером что-то пойдет не так, лидера не будет и весь кластер рухнет.

Чтобы решить эту проблему, в большинстве моделей архитектуры Master/Slave используется конструкция активного и резервного Master, которая может иметь горячий или холодный резерв, автоматическое переключение или ручное переключение, и все больше и больше новых систем начинают использовать его. Возможность автоматического выбора и переключения Мастера для повышения доступности системы.

Второй момент: если планировщик находится на ведущем устройстве, хотя он и может поддерживать разные задачи в группе обеспечения доступности баз данных, работающей на разных машинах, это приведет к перегрузке ведущего устройства. Если планировщик находится на ведомом устройстве, все задачи в DAG могут быть отправлены только на определенную машину. При наличии большого количества параллельных задач нагрузка на ведомое устройство может быть больше.

xxl-job использует этот метод проектирования, но возникают соответствующие проблемы. Если менеджер (админ) выйдет из строя, то кластер рухнет. Планировщик находится на менеджере. Менеджер отвечает за проверку и распределение всех задач. Есть риск перегрузки менеджера, что требует от разработчиков поиска. решение.

1.1.2 Децентрализация

В децентрализованном дизайне обычно не существует понятия «главный/подчиненный». Все роли одинаковы и статус одинаков. Основная идея децентрализованного дизайна заключается в том, что во всей распределенной системе нет «узла», который бы отличался от других узлов. Manager», поэтому единой точки отказа не существует.

Однако, поскольку узла «менеджера» нет, каждому узлу необходимо взаимодействовать с другими узлами для получения необходимой машинной информации. Ненадежность связи распределенной системы значительно увеличивает сложность реализации вышеперечисленных функций. На самом деле по-настоящему децентрализованные распределенные системы встречаются редко.

Напротив, постоянно возникают динамические централизованные распределенные системы. В рамках этой архитектуры менеджеры в кластере выбираются динамически, а не заранее, и в случае сбоя кластера узлы кластера спонтанно проводят собрание, чтобы выбрать нового менеджера, который будет руководить работой.

Как правило, это избирательные стратегии, реализованные на основе алгоритма Рафта. Алгоритм Raft в настоящее время имеет соответствующий PR в сообществе и еще не был объединен.

  • Ссылка на PR: https://github.com/apache/dolphinscheduler/issues/10874
  • Для динамического отображения см. ссылку: http://thesecretlivesofdata.com/.

Децентрализация DolphinScheduler заключается в том, что Master/Worker регистрируется в центре регистрации, при этом понимается, что Master-кластер и Worker-кластер являются бесцентровыми.

1.2 Проектирование архитектуры DophinScheduler

Украдя схему архитектуры системы с официального сайта, вы увидите, что система планирования имеет децентрализованный дизайн и состоит из пользовательского интерфейса, API, MasterServer, Zookeeper, WorkServer, Alert и других частей.

API: Уровень интерфейса API в основном отвечает за обработку запросов от внешнего уровня пользовательского интерфейса. Сервис предоставляет унифицированный RESTful API предоставляет услуги запросов внешнему миру. Интерфейс включает в себя создание, определение, запрос, модификацию рабочего процесса, выпуск, автономный режим, ручной запуск, остановку, паузу, возобновление, выполнение с этого узла и т. д.

MasterServer: MasterServer использует концепцию распределенного бесцентрового дизайна. MasterServer интегрирует Quartz и в основном отвечает за него. DAG Сегментация задач, мониторинг отправки задач и мониторинг состояния работоспособности других мастер-серверов и рабочих серверов. Когда служба MasterServer запускается, она регистрирует временные узлы в Zookeeper и обеспечивает отказоустойчивость, отслеживая изменения во временных узлах Zookeeper. WorkServer: WorkerServer также использует концепцию распределенного бесцентрового дизайна. WorkerServer в основном отвечает за выполнение задач и предоставление услуг журналов. Когда служба WorkerServer запускается, она регистрирует временные узлы в Zookeeper и поддерживает контрольные сигналы.

ZooKeeper: Служба ZooKeeper, узлы MasterServer и WorkerServer в системе используют ZooKeeper для управления кластером и отказоустойчивости. Кроме того, система также осуществляет мониторинг событий и распределенные блокировки на базе ZooKeeper.

Alert:Предоставление интерфейсов, связанных с сигнализацией,Интерфейс в основном включает в себя два типа хранения данных о тревогах, функции запроса и уведомления.,Поддерживает расширенные плагины сигнализации для свободного расширения конфигурации.

1.3 Проблемы отказоустойчивости

Отказоустойчивость делится на отказоустойчивость во время простоя службы и повторную попытку задачи, а отказоустойчивость во время простоя службы делится на две ситуации: отказоустойчивость главного устройства и отказоустойчивость рабочего;

1.3.1 Отказоустойчивость во время простоя

Отказоустойчивая конструкция службы основана на механизме Watcher ZooKeeper. Принцип реализации следующий:

Мастер контролирует каталоги других мастеров и рабочих. Если событие удаления отслеживается, он будет обеспечивать отказоустойчивость экземпляра процесса или экземпляра задачи на основе конкретной бизнес-логики. Блок-схема отказоустойчивости более гуманна, чем блок-схема в. Официальный документ. Вы можете Для справки подробности следующие.

Основная блок-схема отказоустойчивости

После завершения отказоустойчивости ZooKeeper Master он будет перепланирован потоком планировщика в DolphinScheduler, будет проходить через DAG, чтобы найти «выполняющиеся» и «успешно отправленные» задачи, отслеживать состояние его экземпляров задач на предмет «выполняющихся» задач и отслеживать статус «успешно отправленных» задач. Необходимо определить, существует ли уже очередь задач. Если она существует, состояние экземпляра задачи также будет отслеживаться. Если она не существует, экземпляр задачи будет отслеживаться. быть поданы повторно.

Блок-схема отказоустойчивости рабочих

Как только поток главного планировщика обнаруживает, что экземпляр задачи находится в состоянии «требуется отказоустойчивость», он берет на себя задачу и повторно отправляет ее. Обратите внимание, что из-за «дрожания сети» узел может потерять контрольное сообщение с ZooKeeper за короткий период времени, что приведет к событию удаления узла.

В этой ситуации мы используем самый простой метод, который заключается в прямой остановке службы Master или Worker, когда у узла истекает тайм-аут соединения с ZooKeeper.

1.3.2 Повторная попытка после неудачи

Здесь мы должны сначала различать концепции повтора сбоя задачи, восстановления сбоя процесса и повторного запуска сбоя процесса:

  1. Миссия «Повторить в случае неудачи» — это уровень миссии.,Это делается автоматически диспетчерской системой.,Например, задача Shell устанавливает количество повторов, равное 3.,Затем, после сбоя задачи Shell, она попытается запустить ее до 3 раз.
  2. Восстановление после сбоя процесса происходит на уровне процесса и выполняется вручную. Восстановление может быть выполнено только с отказавшего узла или с текущего узла. Повторный запуск сбоя процесса также осуществляется на уровне процесса и выполняется вручную. Повторный запуск выполняется с начального узла.

Далее перейдем к делу. Мы делим узлы задач в рабочем процессе на два типа.

  1. Одним из них является бизнес-узел, который соответствует фактическому сценарию или оператору обработки, например узел Shell, узел MR, узел Spark, узел зависимости и т. д.
  2. Существует также логический узел, который не выполняет фактическую обработку сценариев или операторов, а только логическую обработку всего потока процесса, например разделов подпроцесса и т. д.

Каждый бизнес-узел может настроить количество неудачных попыток. При сбое узла задачи он будет автоматически повторять попытки до тех пор, пока не будет успешным или не превысит настроенное количество повторов. Логические узлы не поддерживают повторную попытку в случае сбоя. Однако задачи в логических узлах поддерживают повтор.

Если задача в рабочем процессе завершается сбоем и достигает максимального количества повторов, рабочий процесс завершится сбоем и остановится. Неудавшийся рабочий процесс можно запустить повторно или восстановить процесс.

1.4 Удаленный доступ к журналу

Поскольку Web (UI) и Worker не обязательно находятся на одном компьютере, просмотр журналов не может быть тем же самым, что и запрос локальных файлов.

Есть два варианта:

  1. Выложить логи в поисковик ES;
  2. Получать информацию удаленного журнала через связь Netty;

Чтобы сделать DolphinScheduler максимально облегченным, был выбран RPC для реализации удаленного доступа к информации журнала. Конкретные методы работы с кодом см. в главе 2.8.

2. Анализ исходного кода DolphinScheduler

Основная цель этой главы — поочередно представить функции, описанные в первой главе, на уровне кода. Установка системы здесь не рассматривается. Пожалуйста, изучите установку и работу самостоятельно.

2.1 Введение в модуль проекта и файл конфигурации

2.1.1 Знакомство с инженерным модулем
  • модуль сигнализации dolphinscheduler-alert предоставляет услуги сигнализации;
  • модуль веб-приложения dolphinscheduler-api предоставляет службу Rest Api для вызова пользовательского интерфейса;
  • dolphinscheduler-common — это общее константное перечисление, класс инструмента, структура данных или базовый класс, обеспечивающий доступ к базе данных и другие операции;
  • dolphinscheduler — удаленный клиент и сервер на базе Netty;
  • служба журнала dolphinscheduler-server и Heartbeat;
  • dolphinscheduler-log-server LoggerServer используется для Rest Api для просмотра журналов через RPC;
  • служба dolphinscheduler-master MasterServer, в основном отвечающая за сегментацию DAG и мониторинг состояния задач;
  • служба dolphinscheduler-worker WorkerServer, в основном отвечающая за отправку, выполнение и обновление статуса задач;
  • Сервисный модуль dolphinscheduler-service включает в себя Quartz, Zookeeper и службы доступа к клиенту журнала, что удобно для вызовов серверного модуля и модуля API;
  • интерфейсный модуль dolphinscheduler-ui;
2.1.2 Файл конфигурации

dolphinscheduler-common common.properties

Язык кода:javascript
копировать
#Локальный рабочий каталог, используемый для хранения временных файлов
data.basedir.path=/tmp/dolphinscheduler
#Тип хранения файлов ресурсов: HDFS,S3,NONE
resource.storage.type=NONE
#Путь хранения файлов ресурсов
resource.upload.path=/dolphinscheduler
#hadoopВключить ли разрешения Kerberos
hadoop.security.authentication.startup.state=false
#каталог конфигурации Kerberos
java.security.krb5.conf.path=/opt/krb5.conf
#kerberosloginuser
login.user.keytab.username=hdfs-mycluster@ESZ.COM

#kerberosloginuserkeytab
login.user.keytab.path=/opt/hdfs.headless.keytab

Срок действия #kerberos, целое число, единица измерения — час
kerberos.expire.time=2
# Если тип хранилища — HDFS, вам необходимо настроить пользователя с соответствующими разрешениями на операции.
hdfs.root.user=hdfs
#Адрес запроса Если ресурс.storage.type=S3, значение аналогично: s3a://dolphinscheduler. Если ресурс.хранилище.тип=HDFS, если hadoop Настроен HA,нуждатьсякопироватьcore-site.xml и hdfs-site.xml файл в каталог conf
fs.defaultFS=hdfs://mycluster:8020
aws.access.key.id=minioadmin
aws.secret.access.key=minioadmin
aws.region=us-east-1
aws.endpoint=http://localhost:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
#yarn resourcemanager адрес, еслиresourcemanager включает HA,Введите IP-адреса HA (через запятую).,если менеджер ресурсов представляет собой один узел, Это значение можно оставить пустым
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
#еслиresourcemanager включает HAили не используетсяresourcemanager,Просто сохраните значение по умолчанию. еслиresourcemanager — это один узел,你нуждаться将ds1 Настроено как имя хоста, соответствующее ресурсному менеджеру.
yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s

# datasource encryption enable
datasource.encryption.enable=false

# datasource encryption salt
datasource.encryption.salt=!@#$%^&*

# data quality option
data-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar

#data-quality.error.output.path=/tmp/data-quality-error-data

# Network IP gets priority, default inner outer

# Whether hive SQL is executed in the same session
support.hive.oneSession=false

# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions
sudo.enable=true

# network interface preferred like eth0, default: empty
#dolphin.scheduler.network.interface.preferred=

# network IP gets priority, default: inner outer
#dolphin.scheduler.network.priority.strategy=default

# system env path
#dolphinscheduler.env.path=dolphinscheduler_env.sh

#Находится ли он в режиме разработки
development.state=false

# rpc port
alert.rpc.port=50052

# Url endpoint for zeppelin RESTful API
zeppelin.rest.url=http://localhost:8080

dolphinscheduler-api application.yaml

Язык кода:javascript
копировать
server:
  port: 12345
  servlet:
    session:
      timeout: 120m
    context-path: /dolphinscheduler/
  compression:
    enabled: true
    mime-types: text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
  jetty:
    max-http-form-post-size: 5000000

spring:
  application:
    name: api-server
  banner:
    charset: UTF-8
  jackson:
    time-zone: UTC
    date-format: "yyyy-MM-dd HH:mm:ss"
  servlet:
    multipart:
      max-file-size: 1024MB
      max-request-size: 1024MB
  messages:
    basename: i18n/messages
  datasource:
#    driver-class-name: org.postgresql.Driver
#    url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
    username: root
    password: root
    hikari:
      connection-test-query: select 1
      minimum-idle: 5
      auto-commit: true
      validation-timeout: 3000
      pool-name: DolphinScheduler
      maximum-pool-size: 50
      connection-timeout: 30000
      idle-timeout: 600000
      leak-detection-threshold: 0
      initialization-fail-timeout: 1
  quartz:
    auto-startup: false
    job-store-type: jdbc
    jdbc:
      initialize-schema: never
    properties:
      org.quartz.threadPool:threadPriority: 5
      org.quartz.jobStore.isClustered: true
      org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
      org.quartz.scheduler.instanceId: AUTO
      org.quartz.jobStore.tablePrefix: QRTZ_
      org.quartz.jobStore.acquireTriggersWithinLock: true
      org.quartz.scheduler.instanceName: DolphinScheduler
      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
      org.quartz.jobStore.useProperties: false
      org.quartz.threadPool.makeThreadsDaemons: true
      org.quartz.threadPool.threadCount: 25
      org.quartz.jobStore.misfireThreshold: 60000
      org.quartz.scheduler.makeSchedulerThreadDaemon: true
#      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
      org.quartz.jobStore.clusterCheckinInterval: 5000

management:
  endpoints:
    web:
      exposure:
        include: '*'
  metrics:
    tags:
      application: ${spring.application.name}

registry:
  type: zookeeper
  zookeeper:
    namespace: dolphinscheduler
#    connect-string: localhost:2181
    connect-string: 10.255.158.70:2181
    retry-policy:
      base-sleep-time: 60ms
      max-sleep: 300ms
      max-retries: 5
    session-timeout: 30s
    connection-timeout: 9s
    block-until-connected: 600ms
    digest: ~

audit:
  enabled: false

metrics:
  enabled: true

python-gateway:
  # Weather enable python gateway server or not. The default value is true.
  enabled: true
  # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
  # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
  gateway-server-address: 0.0.0.0
  # The port of Python gateway server start. Define which port you could connect to Python gateway server from
  # Python API side.
  gateway-server-port: 25333
  # The address of Python callback client.
  python-address: 127.0.0.1
  # The port of Python callback client.
  python-port: 25334
  # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
  # and socket server would never close even though no requests accept
  connect-timeout: 0
  # Close each active connection of socket server if python program not active after x milliseconds. Define value is
  # (0 = infinite), and socket server would never close even though no requests accept
  read-timeout: 0

# Override by profile

---
spring:
  config:
    activate:
      on-profile: mysql
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
  quartz:
    properties:
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate

dolphinscheduler-master application.yaml

Язык кода:javascript
копировать
spring:
  banner:
    charset: UTF-8
  application:
    name: master-server
  jackson:
    time-zone: UTC
    date-format: "yyyy-MM-dd HH:mm:ss"
  cache:
    # default enable cache, you can disable by `type: none`
    type: none
    cache-names:
      - tenant
      - user
      - processDefinition
      - processTaskRelation
      - taskDefinition
    caffeine:
      spec: maximumSize=100,expireAfterWrite=300s,recordStats
  datasource:
    #driver-class-name: org.postgresql.Driver
    #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
    username: root
    password:
    hikari:
      connection-test-query: select 1
      minimum-idle: 5
      auto-commit: true
      validation-timeout: 3000
      pool-name: DolphinScheduler
      maximum-pool-size: 50
      connection-timeout: 30000
      idle-timeout: 600000
      leak-detection-threshold: 0
      initialization-fail-timeout: 1
  quartz:
    job-store-type: jdbc
    jdbc:
      initialize-schema: never
    properties:
      org.quartz.threadPool:threadPriority: 5
      org.quartz.jobStore.isClustered: true
      org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
      org.quartz.scheduler.instanceId: AUTO
      org.quartz.jobStore.tablePrefix: QRTZ_
      org.quartz.jobStore.acquireTriggersWithinLock: true
      org.quartz.scheduler.instanceName: DolphinScheduler
      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
      org.quartz.jobStore.useProperties: false
      org.quartz.threadPool.makeThreadsDaemons: true
      org.quartz.threadPool.threadCount: 25
      org.quartz.jobStore.misfireThreshold: 60000
      org.quartz.scheduler.makeSchedulerThreadDaemon: true
#      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
      org.quartz.jobStore.clusterCheckinInterval: 5000

registry:
  type: zookeeper
  zookeeper:
    namespace: dolphinscheduler
#    connect-string: localhost:2181
    connect-string: 10.255.158.70:2181
    retry-policy:
      base-sleep-time: 60ms
      max-sleep: 300ms
      max-retries: 5
    session-timeout: 30s
    connection-timeout: 9s
    block-until-connected: 600ms
    digest: ~

master:
  listen-port: 5678
  # master fetch command num
  fetch-command-num: 10
  # master prepare execute thread number to limit handle commands in parallel
  pre-exec-threads: 10
  # master execute thread number to limit process instances in parallel
  exec-threads: 100
  # master dispatch task number per batch
  dispatch-task-number: 3
  # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
  host-selector: lower_weight
  # master heartbeat interval, the unit is second
  heartbeat-interval: 10
  # master commit task retry times
  task-commit-retry-times: 5
  # master commit task interval, the unit is millisecond
  task-commit-interval: 1000
  state-wheel-interval: 5
  # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
  max-cpu-load-avg: -1
  # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
  reserved-memory: 0.3
  # failover interval, the unit is minute
  failover-interval: 10
  # kill yarn jon when failover taskInstance, default true
  kill-yarn-job-when-task-failover: true

server:
  port: 5679

management:
  endpoints:
    web:
      exposure:
        include: '*'
  metrics:
    tags:
      application: ${spring.application.name}

metrics:
  enabled: true

# Override by profile

---
spring:
  config:
    activate:
      on-profile: mysql
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
  quartz:
    properties:
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate

dolphinscheduler-worker application.yaml

Язык кода:javascript
копировать
spring:
  banner:
    charset: UTF-8
  application:
    name: worker-server
  jackson:
    time-zone: UTC
    date-format: "yyyy-MM-dd HH:mm:ss"
  datasource:
    #driver-class-name: org.postgresql.Driver
    #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
    username: root
    #password: root
    password:
    hikari:
      connection-test-query: select 1
      minimum-idle: 5
      auto-commit: true
      validation-timeout: 3000
      pool-name: DolphinScheduler
      maximum-pool-size: 50
      connection-timeout: 30000
      idle-timeout: 600000
      leak-detection-threshold: 0
      initialization-fail-timeout: 1

registry:
  type: zookeeper
  zookeeper:
    namespace: dolphinscheduler
#    connect-string: localhost:2181
    connect-string: 10.255.158.70:2181
    retry-policy:
      base-sleep-time: 60ms
      max-sleep: 300ms
      max-retries: 5
    session-timeout: 30s
    connection-timeout: 9s
    block-until-connected: 600ms
    digest: ~

worker:
  # worker listener port
  listen-port: 1234
  # worker execute thread number to limit task instances in parallel
  exec-threads: 100
  # worker heartbeat interval, the unit is second
  heartbeat-interval: 10
  # worker host weight to dispatch tasks, default value 100
  host-weight: 100
  # worker tenant auto create
  tenant-auto-create: true
  # worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
  max-cpu-load-avg: -1
  # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
  reserved-memory: 0.3
  # default worker groups separated by comma, like 'worker.groups=default,test'
  groups:
    - default
  # alert server listen host
  alert-listen-host: localhost
  alert-listen-port: 50052

server:
  port: 1235

management:
  endpoints:
    web:
      exposure:
        include: '*'
  metrics:
    tags:
      application: ${spring.application.name}

metrics:
  enabled: true

2.2 Интерфейс работы основной задачи API

Вам не нужно обращать внимание на другие бизнес-интерфейсы. Вам нужно только обратить внимание на наиболее важный интерфейс онлайн-функций. Этот интерфейс может генерировать все коды, связанные с планированием задач.

интерфейс: /dolphinscheduler/projects/{projectCode}/schedules/{id}/online;

Этот интерфейс отправит определенный процесс в систему планирования Quartz;

Язык кода:javascript
копировать
public Map<String, Object> setScheduleState(User loginUser,
                                                long projectCode,
                                                Integer id,
                                                ReleaseState scheduleStatus) {
        Map<String, Object> result = new HashMap<>();

        Project project = projectMapper.queryByCode(projectCode);
        // check project auth
        boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
        if (!hasProjectAndPerm) {
            return result;
        }

        // check schedule exists
        Schedule scheduleObj = scheduleMapper.selectById(id);

        if (scheduleObj == null) {
            putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
            return result;
        }
        // check schedule release state
        if (scheduleObj.getReleaseState() == scheduleStatus) {
            logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
                    scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
            putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
            return result;
        }
        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
        if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
            return result;
        }
        List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
        if (processTaskRelations.isEmpty()) {
            putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
            return result;
        }
        if (scheduleStatus == ReleaseState.ONLINE) {
            // check process definition release state
            if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
                logger.info("not release process definition id: {} , name : {}",
                        processDefinition.getId(), processDefinition.getName());
                putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
                return result;
            }
            // check sub process definition release state
            List<Long> subProcessDefineCodes = new ArrayList<>();
            processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);
            if (!subProcessDefineCodes.isEmpty()) {
                List<ProcessDefinition> subProcessDefinitionList =
                        processDefinitionMapper.queryByCodes(subProcessDefineCodes);
                if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
                    for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
                        /**
                         * if there is no online process, exit directly
                         */
                        if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
                            logger.info("not release process definition id: {} , name : {}",
                                    subProcessDefinition.getId(), subProcessDefinition.getName());
                            putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId()));
                            return result;
                        }
                    }
                }
            }
        }

        // check master server exists
        List<Server> masterServers = monitorService.getServerListFromRegistry(true);

        if (masterServers.isEmpty()) {
            putMsg(result, Status.MASTER_NOT_EXISTS);
            return result;
        }

        // set status
        scheduleObj.setReleaseState(scheduleStatus);

        scheduleMapper.updateById(scheduleObj);

        try {
            switch (scheduleStatus) {
                case ONLINE:
                    logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
                    setSchedule(project.getId(), scheduleObj);
                    break;
                case OFFLINE:
                    logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
                    deleteSchedule(project.getId(), id);
                    break;
                default:
                    putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
                    return result;
            }
        } catch (Exception e) {
            result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
            throw new ServiceException(result.get(Constants.MSG).toString(), e);
        }

        putMsg(result, Status.SUCCESS);
        return result;
    }
Язык кода:javascript
копировать
public void setSchedule(int projectId, Schedule schedule) {
        logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());

        quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);
    }
Язык кода:javascript
копировать
public void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule) {
        String jobName = this.buildJobName(schedule.getId());
        String jobGroupName = this.buildJobGroupName(projectId);

        Map<String, Object> jobDataMap = this.buildDataMap(projectId, schedule);
        String cronExpression = schedule.getCrontab();
        String timezoneId = schedule.getTimezoneId();

        /**
         * transform from server default timezone to schedule timezone
         * e.g. server default timezone is `UTC`
         * user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`,
         * api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours,
         * so when add job to quartz, it should recover by transform timezone
         */
        Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
        Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);

        lock.writeLock().lock();
        try {

            JobKey jobKey = new JobKey(jobName, jobGroupName);
            JobDetail jobDetail;
            //add a task (if this task already exists, return this task directly)
            if (scheduler.checkExists(jobKey)) {

                jobDetail = scheduler.getJobDetail(jobKey);
                jobDetail.getJobDataMap().putAll(jobDataMap);
            } else {
                jobDetail = newJob(clazz).withIdentity(jobKey).build();

                jobDetail.getJobDataMap().putAll(jobDataMap);

                scheduler.addJob(jobDetail, false, true);

                logger.info("Add job, job name: {}, group name: {}",
                        jobName, jobGroupName);
            }

            TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
            /*
             * Instructs the Scheduler that upon a mis-fire
             * situation, the CronTrigger wants to have it's
             * next-fire-time updated to the next time in the schedule after the
             * current time (taking into account any associated Calendar),
             * but it does not want to be fired now.
             */
            CronTrigger cronTrigger = newTrigger()
                    .withIdentity(triggerKey)
                    .startAt(startDate)
                    .endAt(endDate)
                    .withSchedule(
                            cronSchedule(cronExpression)
                                    .withMisfireHandlingInstructionDoNothing()
                                    .inTimeZone(DateUtils.getTimezone(timezoneId))
                    )
                    .forJob(jobDetail).build();

            if (scheduler.checkExists(triggerKey)) {
                // updateProcessInstance scheduler trigger when scheduler cycle changes
                CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                String oldCronExpression = oldCronTrigger.getCronExpression();

                if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {
                    // reschedule job trigger
                    scheduler.rescheduleJob(triggerKey, cronTrigger);
                    logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
                            jobName, jobGroupName, cronExpression, startDate, endDate);
                }
            } else {
                scheduler.scheduleJob(cronTrigger);
                logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
                        jobName, jobGroupName, cronExpression, startDate, endDate);
            }

        } catch (Exception e) {
            throw new ServiceException("add job failed", e);
        } finally {
            lock.writeLock().unlock();
        }
    }

2.3 Архитектура Quaterz и процесс работы

2.3.1 Концепция и архитектура

Структура Quartz в основном включает в себя следующие части:

  • SchedulerFactory:Планирование Фабрика задач, в основном отвечает за управление Планированием задачустройство;
  • Scheduler :Планирование задачи, в основном отвечает за Планирование задачи и связанные с ними интерфейсы для рабочих задач;
  • Задача: интерфейс задачи, класс реализации содержит конкретный бизнес-код задачи;
  • JobDetail: используется для определения экземпляра задания;
  • Триггер: триггер задачи, который в основном хранит временную стратегию выполнения задания. Например, как часто его выполнять, когда выполнять, как часто выполнять и т. д.;
  • JobBuilder: используется для определения/создания экземпляров JobDetail, используется для определения экземпляров заданий.
  • TriggerBuilder: используется для определения/создания экземпляров триггера;
  • Календарь: объект расширения триггера, который может исключать или включать указанный момент времени (например, исключение официальных праздников);
  • JobStore: Магазин вакансий и Планирование статус задачи в течение жизни Планировщика, начиная с SchedulerFactory начиная с момента его создания и заканчивая Scheduler Вызов отключения() Метод завершается;

После создания планировщика он может добавлять, удалять и перечислять задания и триггеры, а также выполнять другие операции, связанные с планированием (например, приостанавливать триггеры). Но планировщик фактически активирует триггер (то есть выполнит задание) только после вызова метода start().

2.3.2 Процесс инициализации и выполнения

Основной принцип Quartz заключается в использовании планировщика для планирования бизнес-объектов пользовательских задач, реализуемых спецификациями интерфейса задания установки, определенными JobDetail и Trigger, для завершения планирования задач. Основная логика следующая:

Временная диаграмма кода выглядит следующим образом:

Основное содержание заключается в инициализации планировщика контейнера планирования задач, а также пула потоков, необходимого контейнеру, объекта взаимодействия с данными JobStore и потока обработки задач QuartzSchedulerThread для обработки конкретного класса бизнес-реализации интерфейса задания.

Бизнес-класс DolphinScheduler — ProcessScheduleJob, и его основная функция — запись данных в общую таблицу на основе информации планирования.

2.3.3 Работа кластера

Что следует отметить:

  1. Когда Quartz развертывается в форме кластера, носитель данных не может быть в виде памяти, то есть нельзя использовать JobStoreRAM.
  2. Кластер Quartz использует блокировку базы данных TRIGGER_ACCESS для сканирования экземпляров триггеров, которые необходимо запланировать, гарантируя, что этот процесс сканирования может быть получен только одним экземпляром Quartz. Код выглядит следующим образом:
Язык кода:javascript
копировать
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
        
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) { 
            lockName = LOCK_TRIGGER_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },
                new TransactionValidator<List<OperableTrigger>>() {
                    public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
                        try {
                            List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
                            Set<String> fireInstanceIds = new HashSet<String>();
                            for (FiredTriggerRecord ft : acquired) {
                                fireInstanceIds.add(ft.getFireInstanceId());
                            }
                            for (OperableTrigger tr : result) {
                                if (fireInstanceIds.contains(tr.getFireInstanceId())) {
                                    return true;
                                }
                            }
                            return false;
                        } catch (SQLException e) {
                            throw new JobPersistenceException("error validating trigger acquisition", e);
                        }
                    }
                });
    }
  1. При восстановлении экземпляров кластера, вышедших из строя, необходимо обратить внимание на то, что каждый экземпляр восстанавливает аномальный экземпляр, соответствующий соответствующему экземпляру, поскольку в базе данных имеется информация об идентификаторе экземпляра контейнера планирования. Код выглядит следующим образом:
Язык кода:javascript
копировать
 protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances)
        throws JobPersistenceException {

        if (failedInstances.size() > 0) {

            long recoverIds = System.currentTimeMillis();

            logWarnIfNonZero(failedInstances.size(),
                    "ClusterManager: detected " + failedInstances.size()
                            + " failed or restarted instances.");
            try {
                for (SchedulerStateRecord rec : failedInstances) {
                    getLog().info(
                            "ClusterManager: Scanning for instance \""
                                    + rec.getSchedulerInstanceId()
                                    + "\"'s failed in-progress jobs.");

                    List<FiredTriggerRecord> firedTriggerRecs = getDelegate()
                            .selectInstancesFiredTriggerRecords(conn,
                                    rec.getSchedulerInstanceId());

                    int acquiredCount = 0;
                    int recoveredCount = 0;
                    int otherCount = 0;

                    Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();

                    for (FiredTriggerRecord ftRec : firedTriggerRecs) {

                        TriggerKey tKey = ftRec.getTriggerKey();
                        JobKey jKey = ftRec.getJobKey();

                        triggerKeys.add(tKey);

                        // release blocked triggers..
                        if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
                            getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_WAITING, STATE_BLOCKED);
                        } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
                            getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_PAUSED, STATE_PAUSED_BLOCKED);
                        }

                        // release acquired triggers..
                        if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
                            getDelegate().updateTriggerStateFromOtherState(
                                    conn, tKey, STATE_WAITING,
                                    STATE_ACQUIRED);
                            acquiredCount++;
                        } else if (ftRec.isJobRequestsRecovery()) {
                            // handle jobs marked for recovery that were not fully
                            // executed..
                            if (jobExists(conn, jKey)) {
                                @SuppressWarnings("deprecation")
                                SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(
                                        "recover_"
                                                + rec.getSchedulerInstanceId()
                                                + "_"
                                                + String.valueOf(recoverIds++),
                                        Scheduler.DEFAULT_RECOVERY_GROUP,
                                        new Date(ftRec.getScheduleTimestamp()));
                                rcvryTrig.setJobName(jKey.getName());
                                rcvryTrig.setJobGroup(jKey.getGroup());
                                rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
                                rcvryTrig.setPriority(ftRec.getPriority());
                                JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
                                rcvryTrig.setJobDataMap(jd);

                                rcvryTrig.computeFirstFireTime(null);
                                storeTrigger(conn, rcvryTrig, null, false,
                                        STATE_WAITING, false, true);
                                recoveredCount++;
                            } else {
                                getLog()
                                        .warn(
                                                "ClusterManager: failed job '"
                                                        + jKey
                                                        + "' no longer exists, cannot schedule recovery.");
                                otherCount++;
                            }
                        } else {
                            otherCount++;
                        }

                        // free up stateful job's triggers
                        if (ftRec.isJobDisallowsConcurrentExecution()) {
                            getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_WAITING, STATE_BLOCKED);
                            getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_PAUSED, STATE_PAUSED_BLOCKED);
                        }
                    }

                    getDelegate().deleteFiredTriggers(conn,
                            rec.getSchedulerInstanceId());

                    // Check if any of the fired triggers we just deleted were the last fired trigger
                    // records of a COMPLETE trigger.
                    int completeCount = 0;
                    for (TriggerKey triggerKey : triggerKeys) {

                        if (getDelegate().selectTriggerState(conn, triggerKey).
                                equals(STATE_COMPLETE)) {
                            List<FiredTriggerRecord> firedTriggers =
                                    getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());
                            if (firedTriggers.isEmpty()) {

                                if (removeTrigger(conn, triggerKey)) {
                                    completeCount++;
                                }
                            }
                        }
                    }

                    logWarnIfNonZero(acquiredCount,
                            "ClusterManager: ......Freed " + acquiredCount
                                    + " acquired trigger(s).");
                    logWarnIfNonZero(completeCount,
                            "ClusterManager: ......Deleted " + completeCount
                                    + " complete triggers(s).");
                    logWarnIfNonZero(recoveredCount,
                            "ClusterManager: ......Scheduled " + recoveredCount
                                    + " recoverable job(s) for recovery.");
                    logWarnIfNonZero(otherCount,
                            "ClusterManager: ......Cleaned-up " + otherCount
                                    + " other failed job(s).");

                    if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {
                        getDelegate().deleteSchedulerState(conn,
                                rec.getSchedulerInstanceId());
                    }
                }
            } catch (Throwable e) {
                throw new JobPersistenceException("Failure recovering jobs: "
                        + e.getMessage(), e);
            }
        }
    }

2.4 Мастер-запуск и процесс выполнения

2.4.1 Концепция и логика выполнения

Ключевые понятия:

Связанные с кварцем

  • Планировщик (контейнер «Планирование задач», обычно экземпляр StdScheduler).
  • ProcessScheduleJob: (Бизнес-класс, который реализует интерфейс Job структуры планирования Quarts и специально генерирует данные бизнес-таблицы базы данных DolphinScheduler t_ds_commond);

Связанные с DolphinScheduler

  • NettyRemotingServer (сервер Netty, включая объект сервера netty serverBootstrap и объект бизнес-обработки сервера Netty serverHandler), NettyServerHandler: (класс бизнес-обработки сервера Netty: включая различные типы процессоров и пул потоков выполнения, соответствующий процессору);
  • TaskPluginManager (менеджер подключаемых модулей задач, различные типы задач управляются в виде подключаемых модулей. При запуске службы приложения в базу данных через @AutoService загружается заводская информация, реализующая интерфейс TaskChannelFactory, и различные реализации TaskChannel. классы загружаются в кеш через фабричный объект);
  • MasterRegistryClient (клиент мастера для работы с zk, который инкапсулирует все операции мастера над zk, такие как регистрация, запрос, удаление и т. д.);
  • MasterSchedulerService (сервис сканирования,Содержит поток выполнения бизнеса и защиту Nettyhe, включенную в работу.,Ответственный за Планирование задач бизнеса,слот для контроля многократного планирования задач в режиме кластера,Базовая реализация — распределенная блокировка Zookeeper);
  • WorkflowExecuteThread (настоящий поток бизнес-обработки, получает команду commond через слот и проверяет изменение слота перед выполнением. Если изменение не выполняется, ключевой функцией является создание связанных с задачей параметров, определения, приоритета и т. д. ., а затем отправить его в очередь для потребления потока обработки очереди);
  • CommonTaskProcessor (общий процессор задач, реализует интерфейс ITaskProcessor, разделен на общие, зависимые, подзадачи, блокирующие, условные типы задач в соответствии с бизнесом, включая отправку задач, выполнение, распространение, уничтожение и другие бизнес-классы, классы, загружаемые через @AutoService, It в основном инкапсулирован);
  • TaskPriorityQueueImpl (очередь задач, отвечающая за управление хранением очереди задач);
  • TaskPriorityQueueConsumer (поток-потребитель очереди задач, отвечающий за распределение и выполнение задач среди воркеров согласно политике балансировки нагрузки);
  • ServerNodeManager (контроллер информации узла, отвечающий за обновление информации о регистрации узла и смену слотов, базовая реализация — применение распределенной блокировки Zookeeper);
  • EventExecuteService (поток обработки событий через поток обработки кэшированных задач обрабатывает события, зарегистрированные в очереди событий потока во время обработки каждой задачи);
  • FailoverExecuteThread (поток аварийного переключения, включая главный и рабочий);
  • MasterRegistryDataListener (прослушиватель ошибок, размещенный в системе управления zk, ответственный за добавление и удаление узлов, зарегистрированных на zk рабочими и главными узлами).

Код отказоустойчивости главного узла следующий. Бизнес-пояснения см. в разделе 1.5.1 Объяснение отказоустойчивости главного узла:

Язык кода:javascript
копировать
 private void failoverMasterWithLock(String masterHost) {
        String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
        try {
            registryClient.getLock(failoverPath);
            this.failoverMaster(masterHost);
        } catch (Exception e) {
            LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e);
        } finally {
            registryClient.releaseLock(failoverPath);
        }
    }
 /**
     * failover master
     * <p>
     * failover process instance and associated task instance
     *Экземпляр процесса аварийного переключения и связанный экземпляр задачи.
     * @param masterHost master host
     */
    private void failoverMaster(String masterHost) {
        if (StringUtils.isEmpty(masterHost)) {
            return;
        }
        Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
        long startTime = System.currentTimeMillis();
        List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
        LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
        List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
        for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
            if (Constants.NULL.equals(processInstance.getHost())) {
                continue;
            }

            List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
            for (TaskInstance taskInstance : validTaskInstanceList) {
                LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
                failoverTaskInstance(processInstance, taskInstance, workerServers);
            }

            if (serverStartupTime != null && processInstance.getRestartTime() != null
                && processInstance.getRestartTime().after(serverStartupTime)) {
                continue;
            }

            LOGGER.info("failover process instance id: {}", processInstance.getId());
            //updateProcessInstance host is null and insert into command
            processInstance.setHost(Constants.NULL);
            processService.processNeedFailoverProcessInstances(processInstance);
        }

        LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
    }
2.4.2 Кластеры и слоты

На самом деле, использование распределенных блокировок Zookeer здесь не является ни точным, ни точным. Почему вы так говорите? Потому что Slot рассчитывается по CommondId по модулю длины основного списка, а обновление длины основного списка контролируется распределенными блокировками Zookeer. Данные планирования сканирования главного узла контролируются через слот.

Конкретный код выглядит следующим образом:

Обновление слота

Язык кода:javascript
копировать
private void updateMasterNodes() {
        MASTER_SLOT = 0;
        MASTER_SIZE = 0;
        this.masterNodes.clear();
        String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
        try {
            registryClient.getLock(nodeLock);
            Collection<String> currentNodes = registryClient.getMasterNodesDirectly();
            List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);
            syncMasterNodes(currentNodes, masterNodes);
        } catch (Exception e) {
            logger.error("update master nodes error", e);
        } finally {
            registryClient.releaseLock(nodeLock);
        }

    }
/**
     * sync master nodes
     *
     * @param nodes master nodes
     */
    private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
        masterLock.lock();
        try {
            String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort());
            this.masterNodes.addAll(nodes);
            this.masterPriorityQueue.clear();
            this.masterPriorityQueue.putList(masterNodes);
            int index = masterPriorityQueue.getIndex(addr);
            if (index >= 0) {
                MASTER_SIZE = nodes.size();
                MASTER_SLOT = index;
            } else {
                logger.warn("current addr:{} is not in active master list", addr);
            }
            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr);
        } finally {
            masterLock.unlock();
        }
    }

Слот-приложение

Язык кода:javascript
копировать
/**
     * 1. get command by slot
     * 2. donot handle command if slot is empty
     */
    /** * 1. Получить команду через слот * 2. если Слот пуст,тогда команда не обрабатывается */
    private void scheduleProcess() throws Exception {
        List<Command> commands = findCommands();
        if (CollectionUtils.isEmpty(commands)) {
            //indicate that no command ,sleep for 1s
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            return;
        }

        List<ProcessInstance> processInstances = command2ProcessInstance(commands);
        if (CollectionUtils.isEmpty(processInstances)) {
            return;
        }

        for (ProcessInstance processInstance : processInstances) {
            if (processInstance == null) {
                continue;
            }

            WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
                    processInstance
                    , processService
                    , nettyExecutorManager
                    , processAlertManager
                    , masterConfig
                    , stateWheelExecuteThread);

            this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
            if (processInstance.getTimeout() > 0) {
                stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
            }
            workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);
        }
    }
private List<Command> findCommands() {
        int pageNumber = 0;
        int pageSize = masterConfig.getFetchCommandNum();
        List<Command> result = new ArrayList<>();
        if (Stopper.isRunning()) {
            int thisMasterSlot = ServerNodeManager.getSlot();
            int masterCount = ServerNodeManager.getMasterSize();
            if (masterCount > 0) {
                result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
            }
        }
        return result;
    }
@Override
    public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
        if (masterCount <= 0) {
            return Lists.newArrayList();
        }
        return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
    }
    
 <select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
        select *
        from t_ds_command
        where id % #{masterCount} = #{thisMasterSlot}
        order by process_instance_priority, id asc
            limit #{limit} offset #{offset}
    </select>

##Проверка слота
 private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {
        List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
        CountDownLatch latch = new CountDownLatch(commands.size());
        for (final Command command : commands) {
            masterPrepareExecService.execute(() -> {
                try {
                    // slot check again
                    SlotCheckState slotCheckState = slotCheck(command);
                    if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
                        logger.info("handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
                        return;
                    }
                    ProcessInstance processInstance = processService.handleCommand(logger,
                            getLocalAddress(),
                            command);
                    if (processInstance != null) {
                        processInstances.add(processInstance);
                        logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
                    }
                } catch (Exception e) {
                    logger.error("handle command error ", e);
                    processService.moveToErrorCommand(command, e.toString());
                } finally {
                    latch.countDown();
                }
            });
        }

        try {
            // make sure to finish handling command each time before next scan
            latch.await();
        } catch (InterruptedException e) {
            logger.error("countDownLatch await error ", e);
        }

        return processInstances;
    }

private SlotCheckState slotCheck(Command command) {
        int slot = ServerNodeManager.getSlot();
        int masterSize = ServerNodeManager.getMasterSize();
        SlotCheckState state;
        if (masterSize <= 0) {
            state = SlotCheckState.CHANGE;
        } else if (command.getId() % masterSize == slot) {
            state = SlotCheckState.PASS;
        } else {
            state = SlotCheckState.INJECT;
        }
        return state;
    }
2.4.3 Процесс выполнения кода

Код слишком громоздкий, поэтому я не буду вставлять сюда код по одному, чтобы объяснить функции каждого класса. Будет понятнее прочитать код самостоятельно.

2.5 Процесс запуска и выполнения воркера

2.5.1 Концепция и логика выполнения
  • NettyRemotingServer (сервер Netty, включенный в рабочий процесс) WorkerRegistryClient (клиент zk, инкапсулирует рабочий процесс и операции, связанные с zk, регистрацию, запрос, удаление и т. д.);
  • TaskPluginManager (менеджер подключаемых модулей задач, который инкапсулирует абстракцию логики загрузки подключаемых модулей и фактического выполнения задач);
  • WorkerManagerThread (генератор рабочих потоков задач, потребляет информацию о задачах из очереди продвижения процессора Netty и генерирует потоки выполнения задач для отправки в управление пулом потоков);
  • TaskExecuteProcessor (процессор выполнения задач Netty, генерирует информацию о задачах, распространяемую мастером для работы, и помещает ее в очередь);
  • TaskExecuteThread (поток выполнения задачи);
  • TaskCallbackService (поток обратного вызова задачи, взаимодействует с клиентом Netty, включенным в мастер);
  • AbstractTask (абстрактный класс реальной бизнес-задачи, подклассы включают в себя бизнес-задачу реального выполнения, SqlTask, DataXTask и т. д.);
  • RetryReportTaskStatusThread(Нетсосредоточиться на)
2.5.2 Процесс выполнения кода

Workerузел Временная диаграмма кода выглядит следующим образом:

Код слишком громоздкий, поэтому я не буду вставлять сюда код по одному, чтобы объяснить функции каждого класса. Будет понятнее прочитать код самостоятельно.

2.6 Взаимодействие RPC

Поскольку связь RPC между узлами и службами приложений реализована на основе Netty, здесь не будут подробно объясняться знания, связанные с Netty. В текущей главе рассматривается только проектирование и реализация режима взаимодействия между Master и Worker.

Общий дизайн выглядит следующим образом

2.6.1 Взаимодействие Мастера и Работника

Взаимодействие бизнес-логики между Master и Worker основано на сервере Netty и клиенте для реализации связи Rpc. Когда Master и Worker запускаются, они регистрируют информацию своего сервера Netty на соответствующем узле ZK, и задачи Master будут следующими. Распределено, когда выполняются бизнес-операции, такие как уничтожение потоков и задач, извлеките Worker на ZK. Информация об узле, выберите узел в соответствии со стратегией балансировки нагрузки и создайте клиент Netty для связи с сервером Netty работника. После того, как работник получит запрос RPC от мастера, он будет кэшировать информацию о канале и одновременно обрабатывать соответствующий бизнес. время поток обратного вызова обратного вызова получит кэшированный канал. Выполните операцию обратного вызова, образуя таким образом замкнутый цикл.

Выполнение и уничтожение задач, а также обработка статуса обратного вызова и другие операции выполняются через процессор процессора, связанный с клиентом и сервером Netty.

Конкретный код основной части выглядит следующим образом.

При запуске Master он инициализирует Nettyserver, зарегистрирует соответствующий обработчик запросов в NettyHandler и запустит:

Язык кода:javascript
копировать
 @PostConstruct
    public void run() throws SchedulerException {
        // init remoting server
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(masterConfig.getListenPort());
        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);

        // logger server
        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);

        this.nettyRemotingServer.start();

        // install task plugin
        this.taskPluginManager.installPlugin();

        // self tolerant
        this.masterRegistryClient.init();
        this.masterRegistryClient.start();
        this.masterRegistryClient.setRegistryStoppable(this);

        this.masterSchedulerService.init();
        this.masterSchedulerService.start();

        this.eventExecuteService.start();
        this.failoverExecuteThread.start();

        this.scheduler.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (Stopper.isRunning()) {
                close("shutdownHook");
            }
        }));
    }
 /**
     * server start
     */
    public void start() {
        if (isStarted.compareAndSet(false, true)) {
            this.serverBootstrap
                    .group(this.bossGroup, this.workGroup)
                    .channel(NettyUtils.getServerSocketChannelClass())
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
                    .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
                    .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
                    .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
                    .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) {
                            initNettyChannel(ch);
                        }
                    });

            ChannelFuture future;
            try {
                future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
            } catch (Exception e) {
                logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
                throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
            }
            if (future.isSuccess()) {
                logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
            } else if (future.cause() != null) {
                throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
            } else {
                throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
            }
        }
    }

Когда NettyExecutorManager мастера инициализируется, NettyRemotingClient также будет инициализирован, и будет зарегистрирован процессор, который обрабатывает запросы обратного вызова Worker. Реальная привязка порта происходит после получения порта исполнителя:

Язык кода:javascript
копировать
  /**
     * constructor
     */
    public NettyExecutorManager() {
        final NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
    }
##Зарегистрируйте процессор, который обрабатывает обратные вызовы рабочих процессов
    @PostConstruct
    public void init() {
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
    }
    
 public NettyRemotingClient(final NettyClientConfig clientConfig) {
        this.clientConfig = clientConfig;
        if (NettyUtils.useEpoll()) {
            this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
                private final AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
                private final AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
                }
            });
        }
        this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
                new CallerThreadExecutePolicy());
        this.clientHandler = new NettyClientHandler(this, callbackExecutor);

        this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));

        this.start();
    }
 /**
     * start
     */
 private void start() {

        this.bootstrap
                .group(this.workerGroup)
                .channel(NettyUtils.getSocketChannelClass())
                .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
                .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
                .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
                .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ch.pipeline()
                                .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
                                .addLast(new NettyDecoder(), clientHandler, encoder);
                    }
                });
        this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS);
        isStarted.compareAndSet(false, true);
    }

Код распределения задач следующий:

Язык кода:javascript
копировать
/**
     * task dispatch
     *
     * @param context context
     * @return result
     * @throws ExecuteException if error throws ExecuteException
     */
    public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
        /**
         * get executor manager
         */
        ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
        if (executorManager == null) {
            throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
        }

        /**
         * host select
         */

        Host host = hostManager.select(context);
        if (StringUtils.isEmpty(host.getAddress())) {
            throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, "
                            + "current task needs worker group %s to execute",
                    context.getCommand(),context.getWorkerGroup()));
        }
        context.setHost(host);
        executorManager.beforeExecute(context);
        try {
            /**
             * task execute
             */
            return executorManager.execute(context);
        } finally {
            executorManager.afterExecute(context);
        }
    }


/**
     * execute logic
     *
     * @param context context
     * @return result
     * @throws ExecuteException if error throws ExecuteException
     */
    @Override
    public Boolean execute(ExecutionContext context) throws ExecuteException {

        /**
         *  all nodes
         */
        Set<String> allNodes = getAllNodes(context);

        /**
         * fail nodes
         */
        Set<String> failNodeSet = new HashSet<>();

        /**
         *  build command accord executeContext
         */
        Command command = context.getCommand();

        /**
         * execute task host
         */
        Host host = context.getHost();
        boolean success = false;
        while (!success) {
            try {
                doExecute(host, command);
                success = true;
                context.setHost(host);
            } catch (ExecuteException ex) {
                logger.error(String.format("execute command : %s error", command), ex);
                try {
                    failNodeSet.add(host.getAddress());
                    Set<String> tmpAllIps = new HashSet<>(allNodes);
                    Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                    if (remained != null && remained.size() > 0) {
                        host = Host.of(remained.iterator().next());
                        logger.error("retry execute command : {} host : {}", command, host);
                    } else {
                        throw new ExecuteException("fail after try all nodes");
                    }
                } catch (Throwable t) {
                    throw new ExecuteException("fail after try all nodes");
                }
            }
        }

        return success;
    }


/**
     * execute logic
     *
     * @param host host
     * @param command command
     * @throws ExecuteException if error throws ExecuteException
     */
    public void doExecute(final Host host, final Command command) throws ExecuteException {
        /**
         * retry count,default retry 3
         */
        int retryCount = 3;
        boolean success = false;
        do {
            try {
                nettyRemotingClient.send(host, command);
                success = true;
            } catch (Exception ex) {
                logger.error(String.format("send command : %s to %s error", command, host), ex);
                retryCount--;
                ThreadUtils.sleep(100);
            }
        } while (retryCount >= 0 && !success);

        if (!success) {
            throw new ExecuteException(String.format("send command : %s to %s error", command, host));
        }
    }

  /**
     * send task
     *
     * @param host host
     * @param command command
     */
    public void send(final Host host, final Command command) throws RemotingException {
        Channel channel = getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        try {
            ChannelFuture future = channel.writeAndFlush(command).await();
            if (future.isSuccess()) {
                logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());
            } else {
                String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());
                logger.error(msg, future.cause());
                throw new RemotingException(msg);
            }
        } catch (Exception e) {
            logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
            throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
        }
    }

Конкретный код рабочей части выглядит следующим образом:

Точно так же Woker при запуске инициализирует NettyServer, зарегистрирует соответствующий процессор и запустится:

Язык кода:javascript
копировать
/**
     * worker server run
     */
    @PostConstruct
    public void run() {
        // init remoting server
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(workerConfig.getListenPort());
        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);

        // logger server
        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);

        this.nettyRemotingServer.start();

        // install task plugin
        this.taskPluginManager.installPlugin();

        // worker registry
        try {
            this.workerRegistryClient.registry();
            this.workerRegistryClient.setRegistryStoppable(this);
            Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();

            this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }

        // task execute manager
        this.workerManagerThread.start();

        // retry report task status
        this.retryReportTaskStatusThread.start();

        /*
         * registry hooks, which are called before the process exits
         */
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (Stopper.isRunning()) {
                close("shutdownHook");
            }
        }));
    }

Когда объект потока обратного вызова инициализируется, включенный клиент Nettyremotingclient будет инициализирован вместе, и соответствующий бизнес-процессор будет зарегистрирован:

Язык кода:javascript
копировать
public TaskCallbackService() {
        final NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
    }

Поток обратного вызова будет связываться с клиентом Мастера через Chanel, кэшированный в других исполнителях:

Язык кода:javascript
копировать
/**
     * send result
     *
     * @param taskInstanceId taskInstanceId
     * @param command command
     */
    public void send(int taskInstanceId, Command command) {
        NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
        if (nettyRemoteChannel != null) {
            nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        // remove(taskInstanceId);
                        return;
                    }
                }
            });
        }
    }
2.6.2 Другие сервисы взаимодействуют с Мастером

к Служба Если взять журналы в качестве примера, внешний интерфейс запускает интерфейс для запроса журналов, взаимодействует с базой данных через параметры для получения информации NettyServer от Master, а затем создает клиент Netty для связи с Master для получения журналов и их возврата. Конкретный код выглядит следующим образом:

Язык кода:javascript
копировать
                                   @RequestParam(value = "skipLineNum") int skipNum,
                                   @RequestParam(value = "limit") int limit) {
        return loggerService.queryLog(taskInstanceId, skipNum, limit);
    }
Язык кода:javascript
копировать
     * view log
     *
     * @param taskInstId task instance id
     * @param skipLineNum skip line number
     * @param limit limit
     * @return log string data
     */
    @Override
    @SuppressWarnings("unchecked")
    public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {

        TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);

        if (taskInstance == null) {
            return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
        }
        if (StringUtils.isBlank(taskInstance.getHost())) {
            return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);
        }
        Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
        String log = queryLog(taskInstance,skipLineNum,limit);
        result.setData(log);
        return result;
    }
Язык кода:javascript
копировать
     * query log
     *
     * @param taskInstance  task instance
     * @param skipLineNum skip line number
     * @param limit       limit
     * @return log string data
     */
    private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
        Host host = Host.of(taskInstance.getHost());

        logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), taskInstance.getLogPath(),
                host.getPort());

        StringBuilder log = new StringBuilder();
        if (skipLineNum == 0) {
            String head = String.format(LOG_HEAD_FORMAT,
                    taskInstance.getLogPath(),
                    host,
                    Constants.SYSTEM_LINE_SEPARATOR);
            log.append(head);
        }

        log.append(logClient
                .rollViewLog(host.getIp(), host.getPort(), taskInstance.getLogPath(), skipLineNum, limit));

        return log.toString();
    }
Язык кода:javascript
копировать
     * roll view log
     *
     * @param host host
     * @param port port
     * @param path path
     * @param skipLineNum skip line number
     * @param limit limit
     * @return log content
     */
    public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {
        logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
        RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
        String result = "";
        final Host address = new Host(host, port);
        try {
            Command command = request.convert2Command();
            Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
            if (response != null) {
                RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(
                        response.getBody(), RollViewLogResponseCommand.class);
                return rollReviewLog.getMsg();
            }
        } catch (Exception e) {
            logger.error("roll view log error", e);
        } finally {
            this.client.closeChannel(address);
        }
        return result;
    }
Язык кода:javascript
копировать
     * sync send
     *
     * @param host host
     * @param command command
     * @param timeoutMillis timeoutMillis
     * @return command
     */
    public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
        final Channel channel = getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        final long opaque = command.getOpaque();
        final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
        channel.writeAndFlush(command).addListener(future -> {
            if (future.isSuccess()) {
                responseFuture.setSendOk(true);
                return;
            } else {
                responseFuture.setSendOk(false);
            }
            responseFuture.setCause(future.cause());
            responseFuture.putResponse(null);
            logger.error("send command {} to host {} failed", command, host);
        });
        /*
         * sync wait for result
         */
        Command result = responseFuture.waitResponse();
        if (result == null) {
            if (responseFuture.isSendOK()) {
                throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
            } else {
                throw new RemotingException(host.toString(), responseFuture.getCause());
            }
        }
        return result;
    }

Nettyclient инициализируется одновременно с инициализацией бизнес-объекта журнала:

Язык кода:javascript
копировать
     * construct client
     */
    public LogClientService() {
        this.clientConfig = new NettyClientConfig();
        this.clientConfig.setWorkerThreads(4);
        this.client = new NettyRemotingClient(clientConfig);
        this.isRunning = true;
    }

2.7 Алгоритм балансировки нагрузки

DolphinScheduler предоставляет три алгоритма балансировки нагрузки, когда Master выбирает исполнителей, и все алгоритмы используют веса узлов: взвешенный случайный (случайный), плавный опрос (циклический перебор) и линейная нагрузка (меньший вес). Используйте файл конфигурации, чтобы указать, какая стратегия балансировки нагрузки используется. Конфигурацией по умолчанию является стратегия веса: селектор хоста: low_weight.

Язык кода:javascript
копировать
    public HostManager hostManager() {
        HostSelector selector = masterConfig.getHostSelector();
        HostManager hostManager;
        switch (selector) {
            case RANDOM:
                hostManager = new RandomHostManager();
                break;
            case ROUND_ROBIN:
                hostManager = new RoundRobinHostManager();
                break;
            case LOWER_WEIGHT:
                hostManager = new LowerWeightHostManager();
                break;
            default:
                throw new IllegalArgumentException("unSupport selector " + selector);
        }
        beanFactory.autowireBean(hostManager);
        return hostManager;
    }
2.7.1 Взвешенный случайный выбор

Лучше понять это, взглянув на код: просуммируйте все значения весов, а затем возьмите случайное целое число итогового результата. Случайное целое число добавляет совокупную разницу в весе ко всем исходным хостам и возвращает хост, когда он есть. меньше нуля. Если нет, возвращается случайное целое число.

Язык кода:javascript
копировать
    public HostWorker doSelect(final Collection<HostWorker> source) {

        List<HostWorker> hosts = new ArrayList<>(source);
        int size = hosts.size();
        int[] weights = new int[size];
        int totalWeight = 0;
        int index = 0;

        for (HostWorker host : hosts) {
            totalWeight += host.getHostWeight();
            weights[index] = host.getHostWeight();
            index++;
        }

        if (totalWeight > 0) {
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);

            for (int i = 0; i < size; i++) {
                offset -= weights[i];
                if (offset < 0) {
                    return hosts.get(i);
                }
            }
        }
        return hosts.get(ThreadLocalRandom.current().nextInt(size));
    }
2.7.2 Линейная нагрузка

Логика расчета веса: для расчета используйте зарегистрированное использование ЦП, использование памяти, коэффициент загрузки и время запуска.

Язык кода:javascript
копировать
        double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;
        long uptime = System.currentTimeMillis() - startTime;
        if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
            // If the warm-up is not over, add the weight
            return calculatedWeight * Constants.WARM_UP_TIME / uptime;
        }
        return calculatedWeight;
    }

Получите узел с наименьшим весом и сбросьте вес узла до наибольшего.

Язык кода:javascript
копировать
     * select
     *
     * @param sources sources
     * @return HostWeight
     */
    @Override
    public HostWeight doSelect(Collection<HostWeight> sources) {
        double totalWeight = 0;
        double lowWeight = 0;
        HostWeight lowerNode = null;
        for (HostWeight hostWeight : sources) {
            totalWeight += hostWeight.getWeight();
            hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
            if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
                lowerNode = hostWeight;
                lowWeight = hostWeight.getCurrentWeight();
            }
        }
        lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
        return lowerNode;

    }
2.7.3 Плавный опрос

Этот алгоритм не очень прост для понимания, поэтому я не знаю, правильно ли я понимаю. Раньше он всегда брал первый, когда накопленный вес превышал максимальный, он начинал опрос. по весу.

Язык кода:javascript
копировать
    public HostWorker doSelect(Collection<HostWorker> source) {

        List<HostWorker> hosts = new ArrayList<>(source);
        String key = hosts.get(0).getWorkerGroup();
        ConcurrentMap<String, WeightedRoundRobin> map = workGroupWeightMap.get(key);
        if (map == null) {
            workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
            map = workGroupWeightMap.get(key);
        }

        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();
        HostWorker selectedHost = null;
        WeightedRoundRobin selectWeightRoundRobin = null;

        for (HostWorker host : hosts) {
            String workGroupHost = host.getWorkerGroup() + host.getAddress();
            WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
            int weight = host.getHostWeight();
            if (weight < 0) {
                weight = 0;
            }

            if (weightedRoundRobin == null) {
                weightedRoundRobin = new WeightedRoundRobin();
                // set weight
                weightedRoundRobin.setWeight(weight);
                map.putIfAbsent(workGroupHost, weightedRoundRobin);
                weightedRoundRobin = map.get(workGroupHost);
            }
            if (weight != weightedRoundRobin.getWeight()) {
                weightedRoundRobin.setWeight(weight);
            }

            long cur = weightedRoundRobin.increaseCurrent();
            weightedRoundRobin.setLastUpdate(now);
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedHost = host;
                selectWeightRoundRobin = weightedRoundRobin;
            }

            totalWeight += weight;
        }

        if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) {
            try {
                ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
                newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
                workGroupWeightMap.put(key, newMap);
            } finally {
                updateLock.set(false);
            }
        }

        if (selectedHost != null) {
            selectWeightRoundRobin.sel(totalWeight);
            return selectedHost;
        }

        return hosts.get(0);
    }

2.8 Служба журналов

Введена версия 2.6.2, поэтому особо объяснять не буду.

2.9 Сигнализация

Это еще не изучено. Визуальная проверка заключается в основном в фильтрации данных в соответствии с правилами, а затем вызове интерфейса службы сигнализации указанного типа для выполнения операций сигнализации, таких как электронная почта, WeChat, SMS-уведомление и т. д.

Если эта статья вам полезна, не забудьте "заглянуть" "Нравиться" "собирать" Эй, три подряд!

Впервые выпущено во всей сети в 2022 году. Модель и учебное пособие для специалистов по работе с большими данными (Шэнтянь Банзи).

Худшая эра Интернета действительно может наступить

Я учусь в университете Билибили по специальности «большие данные».

Когда мы изучаем Flink, что именно мы изучаем?

193 статьи избили Флинка, на этот сборник нужно обратить внимание

Производственная среда Flink ТОП проблем и оптимизации, Alibaba Zangjingge YYDS

Flink CDC Я уверен, что даже Иисус не сможет его удержать! Краткий перечень онлайн-проблем Flink CDC |

Когда мы изучаем Spark, что именно мы изучаем?

Среди всех модулей Spark я бы назвал SparkSQL самым сильным!

Hard Hive | Краткое содержание интервью по базовой настройке на 40 000 слов

Небольшая энциклопедия методологий и практик управления данными.

Небольшое руководство по построению портретов пользователей по системе тегов

Статья на 40 000 слов. | ClickHouseБаза&упражняться&Настройка полноэкранного анализа

【интервью&личностный рост】2021В середине года,Опыт социального рекрутинга и школьного рекрутинга

Начинается еще одно десятилетие в направлении больших данных | Завершено первое издание «Hard Series»

Написанные мной статьи о росте/интервью/карьерном росте

Что мы узнаем, изучая Hive? «Продолжение Hard Hive»

boy illustration
Учебное пособие по Jetpack Compose для начинающих, базовые элементы управления и макет
boy illustration
Код js веб-страницы, фон частицы, код спецэффектов
boy illustration
【новый! Суперподробное】Полное руководство по свойствам компонентов Figma.
boy illustration
🎉Обязательно к прочтению новичкам: полное руководство по написанию мини-программ WeChat с использованием программного обеспечения Cursor.
boy illustration
[Забавный проект Docker] VoceChat — еще одно приложение для мгновенного чата (IM)! Может быть встроен в любую веб-страницу!
boy illustration
Как реализовать переход по странице в HTML (html переходит на указанную страницу)
boy illustration
Как решить проблему зависания и низкой скорости при установке зависимостей с помощью npm. Существуют ли доступные источники npm, которые могут решить эту проблему?
boy illustration
Серия From Zero to Fun: Uni-App WeChat Payment Practice WeChat авторизует вход в систему и украшает страницу заказа, создает интерфейс заказа и инициирует запрос заказа
boy illustration
Серия uni-app: uni.navigateЧтобы передать скачок значения
boy illustration
Апплет WeChat настраивает верхнюю панель навигации и адаптируется к различным моделям.
boy illustration
JS-время конвертации
boy illustration
Обеспечьте бесперебойную работу ChromeDriver 125: советы по решению проблемы chromedriver.exe не найдены
boy illustration
Поле комментария, щелчок мышью, специальные эффекты, js-код
boy illustration
Объект массива перемещения объекта JS
boy illustration
Как открыть разрешение на позиционирование апплета WeChat_Как использовать WeChat для определения местонахождения друзей
boy illustration
Я даю вам два набора из 18 простых в использовании фонов холста Power BI, так что вам больше не придется возиться с цветами!
boy illustration
Получить текущее время в js_Как динамически отображать дату и время в js
boy illustration
Вам необходимо изучить сочетания клавиш vsCode для форматирования и организации кода, чтобы вам больше не приходилось настраивать формат вручную.
boy illustration
У ChatGPT большое обновление. Всего за 45 минут пресс-конференция показывает, что OpenAI сделал еще один шаг вперед.
boy illustration
Copilot облачной разработки — упрощение разработки
boy illustration
Микросборка xChatGPT с низким кодом, создание апплета чат-бота с искусственным интеллектом за пять шагов
boy illustration
CUDA Out of Memory: идеальное решение проблемы нехватки памяти CUDA
boy illustration
Анализ кластеризации отдельных ячеек, который должен освоить каждый&MarkerгенетическийВизуализация
boy illustration
vLLM: мощный инструмент для ускорения вывода ИИ
boy illustration
CodeGeeX: мощный инструмент генерации кода искусственного интеллекта, который можно использовать бесплатно в дополнение к второму пилоту.
boy illustration
Машинное обучение Реальный бой LightGBM + настройка параметров случайного поиска: точность 96,67%
boy illustration
Бесшовная интеграция, мгновенный интеллект [1]: платформа больших моделей Dify-LLM, интеграция без кодирования и встраивание в сторонние системы, более 42 тысяч звезд, чтобы стать свидетелями эксклюзивных интеллектуальных решений.
boy illustration
LM Studio для создания локальных больших моделей
boy illustration
Как определить количество слоев и нейронов скрытых слоев нейронной сети?
boy illustration
[Отслеживание целей] Подробное объяснение ByteTrack и детали кода