Самое полное руководство по улучшению интервью с большими данными в Интернете!
Если вы обратите внимание на DolphinScheduler, вы должны иметь определенное представление о системе планирования. Я не буду вдаваться в подробности о некоторых существительных, участвующих в планировании, я сосредоточусь на определении процесса, экземпляре процесса, определении задачи и экземпляре задачи. (Концепция отсутствия рабочих мест действительно очень нова, возможно, потому, что я не хочу пересекаться с JobDetail Quartz).
Архитектурное проектирование распределенных систем в основном делится на два типа: централизованные и децентрализованные. Каждый из них имеет свои преимущества и недостатки, в зависимости от соответствующего выбора бизнеса.
Централизованная конструкция относительно проста. Роли установки узла в кластере можно разделить на два типа: главный и подчиненный, как показано ниже:
Главный: роль главного узла в основном отвечает за распределение задач и мониторинг состояния подчиненного устройства. Он может динамически распределять задачи по подчиненным, чтобы подчиненные узлы не были «заняты до смерти» или «простояли до смерти».
Есть некоторые проблемы с централизованным проектированием.
Первый момент: если с Мастером что-то пойдет не так, лидера не будет и весь кластер рухнет.
Чтобы решить эту проблему, в большинстве моделей архитектуры Master/Slave используется конструкция активного и резервного Master, которая может иметь горячий или холодный резерв, автоматическое переключение или ручное переключение, и все больше и больше новых систем начинают использовать его. Возможность автоматического выбора и переключения Мастера для повышения доступности системы.
Второй момент: если планировщик находится на ведущем устройстве, хотя он и может поддерживать разные задачи в группе обеспечения доступности баз данных, работающей на разных машинах, это приведет к перегрузке ведущего устройства. Если планировщик находится на ведомом устройстве, все задачи в DAG могут быть отправлены только на определенную машину. При наличии большого количества параллельных задач нагрузка на ведомое устройство может быть больше.
xxl-job использует этот метод проектирования, но возникают соответствующие проблемы. Если менеджер (админ) выйдет из строя, то кластер рухнет. Планировщик находится на менеджере. Менеджер отвечает за проверку и распределение всех задач. Есть риск перегрузки менеджера, что требует от разработчиков поиска. решение.
В децентрализованном дизайне обычно не существует понятия «главный/подчиненный». Все роли одинаковы и статус одинаков. Основная идея децентрализованного дизайна заключается в том, что во всей распределенной системе нет «узла», который бы отличался от других узлов. Manager», поэтому единой точки отказа не существует.
Однако, поскольку узла «менеджера» нет, каждому узлу необходимо взаимодействовать с другими узлами для получения необходимой машинной информации. Ненадежность связи распределенной системы значительно увеличивает сложность реализации вышеперечисленных функций. На самом деле по-настоящему децентрализованные распределенные системы встречаются редко.
Напротив, постоянно возникают динамические централизованные распределенные системы. В рамках этой архитектуры менеджеры в кластере выбираются динамически, а не заранее, и в случае сбоя кластера узлы кластера спонтанно проводят собрание, чтобы выбрать нового менеджера, который будет руководить работой.
Как правило, это избирательные стратегии, реализованные на основе алгоритма Рафта. Алгоритм Raft в настоящее время имеет соответствующий PR в сообществе и еще не был объединен.
Децентрализация DolphinScheduler заключается в том, что Master/Worker регистрируется в центре регистрации, при этом понимается, что Master-кластер и Worker-кластер являются бесцентровыми.
Украдя схему архитектуры системы с официального сайта, вы увидите, что система планирования имеет децентрализованный дизайн и состоит из пользовательского интерфейса, API, MasterServer, Zookeeper, WorkServer, Alert и других частей.
API: Уровень интерфейса API в основном отвечает за обработку запросов от внешнего уровня пользовательского интерфейса. Сервис предоставляет унифицированный RESTful API предоставляет услуги запросов внешнему миру. Интерфейс включает в себя создание, определение, запрос, модификацию рабочего процесса, выпуск, автономный режим, ручной запуск, остановку, паузу, возобновление, выполнение с этого узла и т. д.
MasterServer: MasterServer использует концепцию распределенного бесцентрового дизайна. MasterServer интегрирует Quartz и в основном отвечает за него. DAG Сегментация задач, мониторинг отправки задач и мониторинг состояния работоспособности других мастер-серверов и рабочих серверов. Когда служба MasterServer запускается, она регистрирует временные узлы в Zookeeper и обеспечивает отказоустойчивость, отслеживая изменения во временных узлах Zookeeper. WorkServer: WorkerServer также использует концепцию распределенного бесцентрового дизайна. WorkerServer в основном отвечает за выполнение задач и предоставление услуг журналов. Когда служба WorkerServer запускается, она регистрирует временные узлы в Zookeeper и поддерживает контрольные сигналы.
ZooKeeper: Служба ZooKeeper, узлы MasterServer и WorkerServer в системе используют ZooKeeper для управления кластером и отказоустойчивости. Кроме того, система также осуществляет мониторинг событий и распределенные блокировки на базе ZooKeeper.
Alert:Предоставление интерфейсов, связанных с сигнализацией,Интерфейс в основном включает в себя два типа хранения данных о тревогах, функции запроса и уведомления.,Поддерживает расширенные плагины сигнализации для свободного расширения конфигурации.
Отказоустойчивость делится на отказоустойчивость во время простоя службы и повторную попытку задачи, а отказоустойчивость во время простоя службы делится на две ситуации: отказоустойчивость главного устройства и отказоустойчивость рабочего;
Отказоустойчивая конструкция службы основана на механизме Watcher ZooKeeper. Принцип реализации следующий:
Мастер контролирует каталоги других мастеров и рабочих. Если событие удаления отслеживается, он будет обеспечивать отказоустойчивость экземпляра процесса или экземпляра задачи на основе конкретной бизнес-логики. Блок-схема отказоустойчивости более гуманна, чем блок-схема в. Официальный документ. Вы можете Для справки подробности следующие.
Основная блок-схема отказоустойчивости
После завершения отказоустойчивости ZooKeeper Master он будет перепланирован потоком планировщика в DolphinScheduler, будет проходить через DAG, чтобы найти «выполняющиеся» и «успешно отправленные» задачи, отслеживать состояние его экземпляров задач на предмет «выполняющихся» задач и отслеживать статус «успешно отправленных» задач. Необходимо определить, существует ли уже очередь задач. Если она существует, состояние экземпляра задачи также будет отслеживаться. Если она не существует, экземпляр задачи будет отслеживаться. быть поданы повторно.
Блок-схема отказоустойчивости рабочих
Как только поток главного планировщика обнаруживает, что экземпляр задачи находится в состоянии «требуется отказоустойчивость», он берет на себя задачу и повторно отправляет ее. Обратите внимание, что из-за «дрожания сети» узел может потерять контрольное сообщение с ZooKeeper за короткий период времени, что приведет к событию удаления узла.
В этой ситуации мы используем самый простой метод, который заключается в прямой остановке службы Master или Worker, когда у узла истекает тайм-аут соединения с ZooKeeper.
Здесь мы должны сначала различать концепции повтора сбоя задачи, восстановления сбоя процесса и повторного запуска сбоя процесса:
Далее перейдем к делу. Мы делим узлы задач в рабочем процессе на два типа.
Каждый бизнес-узел может настроить количество неудачных попыток. При сбое узла задачи он будет автоматически повторять попытки до тех пор, пока не будет успешным или не превысит настроенное количество повторов. Логические узлы не поддерживают повторную попытку в случае сбоя. Однако задачи в логических узлах поддерживают повтор.
Если задача в рабочем процессе завершается сбоем и достигает максимального количества повторов, рабочий процесс завершится сбоем и остановится. Неудавшийся рабочий процесс можно запустить повторно или восстановить процесс.
Поскольку Web (UI) и Worker не обязательно находятся на одном компьютере, просмотр журналов не может быть тем же самым, что и запрос локальных файлов.
Есть два варианта:
Чтобы сделать DolphinScheduler максимально облегченным, был выбран RPC для реализации удаленного доступа к информации журнала. Конкретные методы работы с кодом см. в главе 2.8.
Основная цель этой главы — поочередно представить функции, описанные в первой главе, на уровне кода. Установка системы здесь не рассматривается. Пожалуйста, изучите установку и работу самостоятельно.
dolphinscheduler-common common.properties
#Локальный рабочий каталог, используемый для хранения временных файлов
data.basedir.path=/tmp/dolphinscheduler
#Тип хранения файлов ресурсов: HDFS,S3,NONE
resource.storage.type=NONE
#Путь хранения файлов ресурсов
resource.upload.path=/dolphinscheduler
#hadoopВключить ли разрешения Kerberos
hadoop.security.authentication.startup.state=false
#каталог конфигурации Kerberos
java.security.krb5.conf.path=/opt/krb5.conf
#kerberosloginuser
login.user.keytab.username=hdfs-mycluster@ESZ.COM
#kerberosloginuserkeytab
login.user.keytab.path=/opt/hdfs.headless.keytab
Срок действия #kerberos, целое число, единица измерения — час
kerberos.expire.time=2
# Если тип хранилища — HDFS, вам необходимо настроить пользователя с соответствующими разрешениями на операции.
hdfs.root.user=hdfs
#Адрес запроса Если ресурс.storage.type=S3, значение аналогично: s3a://dolphinscheduler. Если ресурс.хранилище.тип=HDFS, если hadoop Настроен HA,нуждатьсякопироватьcore-site.xml и hdfs-site.xml файл в каталог conf
fs.defaultFS=hdfs://mycluster:8020
aws.access.key.id=minioadmin
aws.secret.access.key=minioadmin
aws.region=us-east-1
aws.endpoint=http://localhost:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
#yarn resourcemanager адрес, еслиresourcemanager включает HA,Введите IP-адреса HA (через запятую).,если менеджер ресурсов представляет собой один узел, Это значение можно оставить пустым
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
#еслиresourcemanager включает HAили не используетсяresourcemanager,Просто сохраните значение по умолчанию. еслиresourcemanager — это один узел,你нуждаться将ds1 Настроено как имя хоста, соответствующее ресурсному менеджеру.
yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s
# datasource encryption enable
datasource.encryption.enable=false
# datasource encryption salt
datasource.encryption.salt=!@#$%^&*
# data quality option
data-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar
#data-quality.error.output.path=/tmp/data-quality-error-data
# Network IP gets priority, default inner outer
# Whether hive SQL is executed in the same session
support.hive.oneSession=false
# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions
sudo.enable=true
# network interface preferred like eth0, default: empty
#dolphin.scheduler.network.interface.preferred=
# network IP gets priority, default: inner outer
#dolphin.scheduler.network.priority.strategy=default
# system env path
#dolphinscheduler.env.path=dolphinscheduler_env.sh
#Находится ли он в режиме разработки
development.state=false
# rpc port
alert.rpc.port=50052
# Url endpoint for zeppelin RESTful API
zeppelin.rest.url=http://localhost:8080
dolphinscheduler-api application.yaml
server:
port: 12345
servlet:
session:
timeout: 120m
context-path: /dolphinscheduler/
compression:
enabled: true
mime-types: text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
jetty:
max-http-form-post-size: 5000000
spring:
application:
name: api-server
banner:
charset: UTF-8
jackson:
time-zone: UTC
date-format: "yyyy-MM-dd HH:mm:ss"
servlet:
multipart:
max-file-size: 1024MB
max-request-size: 1024MB
messages:
basename: i18n/messages
datasource:
# driver-class-name: org.postgresql.Driver
# url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
username: root
password: root
hikari:
connection-test-query: select 1
minimum-idle: 5
auto-commit: true
validation-timeout: 3000
pool-name: DolphinScheduler
maximum-pool-size: 50
connection-timeout: 30000
idle-timeout: 600000
leak-detection-threshold: 0
initialization-fail-timeout: 1
quartz:
auto-startup: false
job-store-type: jdbc
jdbc:
initialize-schema: never
properties:
org.quartz.threadPool:threadPriority: 5
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.scheduler.instanceId: AUTO
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.acquireTriggersWithinLock: true
org.quartz.scheduler.instanceName: DolphinScheduler
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.jobStore.useProperties: false
org.quartz.threadPool.makeThreadsDaemons: true
org.quartz.threadPool.threadCount: 25
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.scheduler.makeSchedulerThreadDaemon: true
# org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.clusterCheckinInterval: 5000
management:
endpoints:
web:
exposure:
include: '*'
metrics:
tags:
application: ${spring.application.name}
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
# connect-string: localhost:2181
connect-string: 10.255.158.70:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
audit:
enabled: false
metrics:
enabled: true
python-gateway:
# Weather enable python gateway server or not. The default value is true.
enabled: true
# The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
# between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
gateway-server-address: 0.0.0.0
# The port of Python gateway server start. Define which port you could connect to Python gateway server from
# Python API side.
gateway-server-port: 25333
# The address of Python callback client.
python-address: 127.0.0.1
# The port of Python callback client.
python-port: 25334
# Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
# and socket server would never close even though no requests accept
connect-timeout: 0
# Close each active connection of socket server if python program not active after x milliseconds. Define value is
# (0 = infinite), and socket server would never close even though no requests accept
read-timeout: 0
# Override by profile
---
spring:
config:
activate:
on-profile: mysql
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
quartz:
properties:
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
dolphinscheduler-master application.yaml
spring:
banner:
charset: UTF-8
application:
name: master-server
jackson:
time-zone: UTC
date-format: "yyyy-MM-dd HH:mm:ss"
cache:
# default enable cache, you can disable by `type: none`
type: none
cache-names:
- tenant
- user
- processDefinition
- processTaskRelation
- taskDefinition
caffeine:
spec: maximumSize=100,expireAfterWrite=300s,recordStats
datasource:
#driver-class-name: org.postgresql.Driver
#url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
username: root
password:
hikari:
connection-test-query: select 1
minimum-idle: 5
auto-commit: true
validation-timeout: 3000
pool-name: DolphinScheduler
maximum-pool-size: 50
connection-timeout: 30000
idle-timeout: 600000
leak-detection-threshold: 0
initialization-fail-timeout: 1
quartz:
job-store-type: jdbc
jdbc:
initialize-schema: never
properties:
org.quartz.threadPool:threadPriority: 5
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.scheduler.instanceId: AUTO
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.acquireTriggersWithinLock: true
org.quartz.scheduler.instanceName: DolphinScheduler
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.jobStore.useProperties: false
org.quartz.threadPool.makeThreadsDaemons: true
org.quartz.threadPool.threadCount: 25
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.scheduler.makeSchedulerThreadDaemon: true
# org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.clusterCheckinInterval: 5000
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
# connect-string: localhost:2181
connect-string: 10.255.158.70:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
master:
listen-port: 5678
# master fetch command num
fetch-command-num: 10
# master prepare execute thread number to limit handle commands in parallel
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
exec-threads: 100
# master dispatch task number per batch
dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight
# master heartbeat interval, the unit is second
heartbeat-interval: 10
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval, the unit is millisecond
task-commit-interval: 1000
state-wheel-interval: 5
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3
# failover interval, the unit is minute
failover-interval: 10
# kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true
server:
port: 5679
management:
endpoints:
web:
exposure:
include: '*'
metrics:
tags:
application: ${spring.application.name}
metrics:
enabled: true
# Override by profile
---
spring:
config:
activate:
on-profile: mysql
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
quartz:
properties:
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
dolphinscheduler-worker application.yaml
spring:
banner:
charset: UTF-8
application:
name: worker-server
jackson:
time-zone: UTC
date-format: "yyyy-MM-dd HH:mm:ss"
datasource:
#driver-class-name: org.postgresql.Driver
#url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
username: root
#password: root
password:
hikari:
connection-test-query: select 1
minimum-idle: 5
auto-commit: true
validation-timeout: 3000
pool-name: DolphinScheduler
maximum-pool-size: 50
connection-timeout: 30000
idle-timeout: 600000
leak-detection-threshold: 0
initialization-fail-timeout: 1
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
# connect-string: localhost:2181
connect-string: 10.255.158.70:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
worker:
# worker listener port
listen-port: 1234
# worker execute thread number to limit task instances in parallel
exec-threads: 100
# worker heartbeat interval, the unit is second
heartbeat-interval: 10
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# worker tenant auto create
tenant-auto-create: true
# worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1
# worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
reserved-memory: 0.3
# default worker groups separated by comma, like 'worker.groups=default,test'
groups:
- default
# alert server listen host
alert-listen-host: localhost
alert-listen-port: 50052
server:
port: 1235
management:
endpoints:
web:
exposure:
include: '*'
metrics:
tags:
application: ${spring.application.name}
metrics:
enabled: true
Вам не нужно обращать внимание на другие бизнес-интерфейсы. Вам нужно только обратить внимание на наиболее важный интерфейс онлайн-функций. Этот интерфейс может генерировать все коды, связанные с планированием задач.
интерфейс: /dolphinscheduler/projects/{projectCode}/schedules/{id}/online;
Этот интерфейс отправит определенный процесс в систему планирования Quartz;
public Map<String, Object> setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
// check schedule exists
Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
return result;
}
// check schedule release state
if (scheduleObj.getReleaseState() == scheduleStatus) {
logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
return result;
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
if (processTaskRelations.isEmpty()) {
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
return result;
}
// check sub process definition release state
List<Long> subProcessDefineCodes = new ArrayList<>();
processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);
if (!subProcessDefineCodes.isEmpty()) {
List<ProcessDefinition> subProcessDefinitionList =
processDefinitionMapper.queryByCodes(subProcessDefineCodes);
if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
/**
* if there is no online process, exit directly
*/
if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}",
subProcessDefinition.getId(), subProcessDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId()));
return result;
}
}
}
}
}
// check master server exists
List<Server> masterServers = monitorService.getServerListFromRegistry(true);
if (masterServers.isEmpty()) {
putMsg(result, Status.MASTER_NOT_EXISTS);
return result;
}
// set status
scheduleObj.setReleaseState(scheduleStatus);
scheduleMapper.updateById(scheduleObj);
try {
switch (scheduleStatus) {
case ONLINE:
logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
setSchedule(project.getId(), scheduleObj);
break;
case OFFLINE:
logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
deleteSchedule(project.getId(), id);
break;
default:
putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
return result;
}
} catch (Exception e) {
result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
throw new ServiceException(result.get(Constants.MSG).toString(), e);
}
putMsg(result, Status.SUCCESS);
return result;
}
public void setSchedule(int projectId, Schedule schedule) {
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());
quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);
}
public void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule) {
String jobName = this.buildJobName(schedule.getId());
String jobGroupName = this.buildJobGroupName(projectId);
Map<String, Object> jobDataMap = this.buildDataMap(projectId, schedule);
String cronExpression = schedule.getCrontab();
String timezoneId = schedule.getTimezoneId();
/**
* transform from server default timezone to schedule timezone
* e.g. server default timezone is `UTC`
* user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`,
* api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours,
* so when add job to quartz, it should recover by transform timezone
*/
Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);
lock.writeLock().lock();
try {
JobKey jobKey = new JobKey(jobName, jobGroupName);
JobDetail jobDetail;
//add a task (if this task already exists, return this task directly)
if (scheduler.checkExists(jobKey)) {
jobDetail = scheduler.getJobDetail(jobKey);
jobDetail.getJobDataMap().putAll(jobDataMap);
} else {
jobDetail = newJob(clazz).withIdentity(jobKey).build();
jobDetail.getJobDataMap().putAll(jobDataMap);
scheduler.addJob(jobDetail, false, true);
logger.info("Add job, job name: {}, group name: {}",
jobName, jobGroupName);
}
TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
/*
* Instructs the Scheduler that upon a mis-fire
* situation, the CronTrigger wants to have it's
* next-fire-time updated to the next time in the schedule after the
* current time (taking into account any associated Calendar),
* but it does not want to be fired now.
*/
CronTrigger cronTrigger = newTrigger()
.withIdentity(triggerKey)
.startAt(startDate)
.endAt(endDate)
.withSchedule(
cronSchedule(cronExpression)
.withMisfireHandlingInstructionDoNothing()
.inTimeZone(DateUtils.getTimezone(timezoneId))
)
.forJob(jobDetail).build();
if (scheduler.checkExists(triggerKey)) {
// updateProcessInstance scheduler trigger when scheduler cycle changes
CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
String oldCronExpression = oldCronTrigger.getCronExpression();
if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {
// reschedule job trigger
scheduler.rescheduleJob(triggerKey, cronTrigger);
logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
jobName, jobGroupName, cronExpression, startDate, endDate);
}
} else {
scheduler.scheduleJob(cronTrigger);
logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
jobName, jobGroupName, cronExpression, startDate, endDate);
}
} catch (Exception e) {
throw new ServiceException("add job failed", e);
} finally {
lock.writeLock().unlock();
}
}
Структура Quartz в основном включает в себя следующие части:
После создания планировщика он может добавлять, удалять и перечислять задания и триггеры, а также выполнять другие операции, связанные с планированием (например, приостанавливать триггеры). Но планировщик фактически активирует триггер (то есть выполнит задание) только после вызова метода start().
Основной принцип Quartz заключается в использовании планировщика для планирования бизнес-объектов пользовательских задач, реализуемых спецификациями интерфейса задания установки, определенными JobDetail и Trigger, для завершения планирования задач. Основная логика следующая:
Временная диаграмма кода выглядит следующим образом:
Основное содержание заключается в инициализации планировщика контейнера планирования задач, а также пула потоков, необходимого контейнеру, объекта взаимодействия с данными JobStore и потока обработки задач QuartzSchedulerThread для обработки конкретного класса бизнес-реализации интерфейса задания.
Бизнес-класс DolphinScheduler — ProcessScheduleJob, и его основная функция — запись данных в общую таблицу на основе информации планирования.
Что следует отметить:
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() {
public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
try {
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> fireInstanceIds = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) {
fireInstanceIds.add(ft.getFireInstanceId());
}
for (OperableTrigger tr : result) {
if (fireInstanceIds.contains(tr.getFireInstanceId())) {
return true;
}
}
return false;
} catch (SQLException e) {
throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}
protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances)
throws JobPersistenceException {
if (failedInstances.size() > 0) {
long recoverIds = System.currentTimeMillis();
logWarnIfNonZero(failedInstances.size(),
"ClusterManager: detected " + failedInstances.size()
+ " failed or restarted instances.");
try {
for (SchedulerStateRecord rec : failedInstances) {
getLog().info(
"ClusterManager: Scanning for instance \""
+ rec.getSchedulerInstanceId()
+ "\"'s failed in-progress jobs.");
List<FiredTriggerRecord> firedTriggerRecs = getDelegate()
.selectInstancesFiredTriggerRecords(conn,
rec.getSchedulerInstanceId());
int acquiredCount = 0;
int recoveredCount = 0;
int otherCount = 0;
Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();
for (FiredTriggerRecord ftRec : firedTriggerRecs) {
TriggerKey tKey = ftRec.getTriggerKey();
JobKey jKey = ftRec.getJobKey();
triggerKeys.add(tKey);
// release blocked triggers..
if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_WAITING, STATE_BLOCKED);
} else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_PAUSED, STATE_PAUSED_BLOCKED);
}
// release acquired triggers..
if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
getDelegate().updateTriggerStateFromOtherState(
conn, tKey, STATE_WAITING,
STATE_ACQUIRED);
acquiredCount++;
} else if (ftRec.isJobRequestsRecovery()) {
// handle jobs marked for recovery that were not fully
// executed..
if (jobExists(conn, jKey)) {
@SuppressWarnings("deprecation")
SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(
"recover_"
+ rec.getSchedulerInstanceId()
+ "_"
+ String.valueOf(recoverIds++),
Scheduler.DEFAULT_RECOVERY_GROUP,
new Date(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobName(jKey.getName());
rcvryTrig.setJobGroup(jKey.getGroup());
rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
rcvryTrig.setPriority(ftRec.getPriority());
JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobDataMap(jd);
rcvryTrig.computeFirstFireTime(null);
storeTrigger(conn, rcvryTrig, null, false,
STATE_WAITING, false, true);
recoveredCount++;
} else {
getLog()
.warn(
"ClusterManager: failed job '"
+ jKey
+ "' no longer exists, cannot schedule recovery.");
otherCount++;
}
} else {
otherCount++;
}
// free up stateful job's triggers
if (ftRec.isJobDisallowsConcurrentExecution()) {
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_WAITING, STATE_BLOCKED);
getDelegate()
.updateTriggerStatesForJobFromOtherState(
conn, jKey,
STATE_PAUSED, STATE_PAUSED_BLOCKED);
}
}
getDelegate().deleteFiredTriggers(conn,
rec.getSchedulerInstanceId());
// Check if any of the fired triggers we just deleted were the last fired trigger
// records of a COMPLETE trigger.
int completeCount = 0;
for (TriggerKey triggerKey : triggerKeys) {
if (getDelegate().selectTriggerState(conn, triggerKey).
equals(STATE_COMPLETE)) {
List<FiredTriggerRecord> firedTriggers =
getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());
if (firedTriggers.isEmpty()) {
if (removeTrigger(conn, triggerKey)) {
completeCount++;
}
}
}
}
logWarnIfNonZero(acquiredCount,
"ClusterManager: ......Freed " + acquiredCount
+ " acquired trigger(s).");
logWarnIfNonZero(completeCount,
"ClusterManager: ......Deleted " + completeCount
+ " complete triggers(s).");
logWarnIfNonZero(recoveredCount,
"ClusterManager: ......Scheduled " + recoveredCount
+ " recoverable job(s) for recovery.");
logWarnIfNonZero(otherCount,
"ClusterManager: ......Cleaned-up " + otherCount
+ " other failed job(s).");
if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {
getDelegate().deleteSchedulerState(conn,
rec.getSchedulerInstanceId());
}
}
} catch (Throwable e) {
throw new JobPersistenceException("Failure recovering jobs: "
+ e.getMessage(), e);
}
}
}
Ключевые понятия:
Связанные с кварцем:
Связанные с DolphinScheduler:
Код отказоустойчивости главного узла следующий. Бизнес-пояснения см. в разделе 1.5.1 Объяснение отказоустойчивости главного узла:
private void failoverMasterWithLock(String masterHost) {
String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
try {
registryClient.getLock(failoverPath);
this.failoverMaster(masterHost);
} catch (Exception e) {
LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e);
} finally {
registryClient.releaseLock(failoverPath);
}
}
/**
* failover master
* <p>
* failover process instance and associated task instance
*Экземпляр процесса аварийного переключения и связанный экземпляр задачи.
* @param masterHost master host
*/
private void failoverMaster(String masterHost) {
if (StringUtils.isEmpty(masterHost)) {
return;
}
Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
long startTime = System.currentTimeMillis();
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
if (Constants.NULL.equals(processInstance.getHost())) {
continue;
}
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance taskInstance : validTaskInstanceList) {
LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance, workerServers);
}
if (serverStartupTime != null && processInstance.getRestartTime() != null
&& processInstance.getRestartTime().after(serverStartupTime)) {
continue;
}
LOGGER.info("failover process instance id: {}", processInstance.getId());
//updateProcessInstance host is null and insert into command
processInstance.setHost(Constants.NULL);
processService.processNeedFailoverProcessInstances(processInstance);
}
LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
}
На самом деле, использование распределенных блокировок Zookeer здесь не является ни точным, ни точным. Почему вы так говорите? Потому что Slot рассчитывается по CommondId по модулю длины основного списка, а обновление длины основного списка контролируется распределенными блокировками Zookeer. Данные планирования сканирования главного узла контролируются через слот.
Конкретный код выглядит следующим образом:
Обновление слота
private void updateMasterNodes() {
MASTER_SLOT = 0;
MASTER_SIZE = 0;
this.masterNodes.clear();
String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
try {
registryClient.getLock(nodeLock);
Collection<String> currentNodes = registryClient.getMasterNodesDirectly();
List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);
syncMasterNodes(currentNodes, masterNodes);
} catch (Exception e) {
logger.error("update master nodes error", e);
} finally {
registryClient.releaseLock(nodeLock);
}
}
/**
* sync master nodes
*
* @param nodes master nodes
*/
private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
masterLock.lock();
try {
String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort());
this.masterNodes.addAll(nodes);
this.masterPriorityQueue.clear();
this.masterPriorityQueue.putList(masterNodes);
int index = masterPriorityQueue.getIndex(addr);
if (index >= 0) {
MASTER_SIZE = nodes.size();
MASTER_SLOT = index;
} else {
logger.warn("current addr:{} is not in active master list", addr);
}
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr);
} finally {
masterLock.unlock();
}
}
Слот-приложение
/**
* 1. get command by slot
* 2. donot handle command if slot is empty
*/
/** * 1. Получить команду через слот * 2. если Слот пуст,тогда команда не обрабатывается */
private void scheduleProcess() throws Exception {
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
//indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return;
}
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
return;
}
for (ProcessInstance processInstance : processInstances) {
if (processInstance == null) {
continue;
}
WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
processInstance
, processService
, nettyExecutorManager
, processAlertManager
, masterConfig
, stateWheelExecuteThread);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);
}
}
private List<Command> findCommands() {
int pageNumber = 0;
int pageSize = masterConfig.getFetchCommandNum();
List<Command> result = new ArrayList<>();
if (Stopper.isRunning()) {
int thisMasterSlot = ServerNodeManager.getSlot();
int masterCount = ServerNodeManager.getMasterSize();
if (masterCount > 0) {
result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
}
}
return result;
}
@Override
public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
if (masterCount <= 0) {
return Lists.newArrayList();
}
return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
}
<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
select *
from t_ds_command
where id % #{masterCount} = #{thisMasterSlot}
order by process_instance_priority, id asc
limit #{limit} offset #{offset}
</select>
##Проверка слота
private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {
List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size());
for (final Command command : commands) {
masterPrepareExecService.execute(() -> {
try {
// slot check again
SlotCheckState slotCheckState = slotCheck(command);
if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
logger.info("handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
return;
}
ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(),
command);
if (processInstance != null) {
processInstances.add(processInstance);
logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
}
} catch (Exception e) {
logger.error("handle command error ", e);
processService.moveToErrorCommand(command, e.toString());
} finally {
latch.countDown();
}
});
}
try {
// make sure to finish handling command each time before next scan
latch.await();
} catch (InterruptedException e) {
logger.error("countDownLatch await error ", e);
}
return processInstances;
}
private SlotCheckState slotCheck(Command command) {
int slot = ServerNodeManager.getSlot();
int masterSize = ServerNodeManager.getMasterSize();
SlotCheckState state;
if (masterSize <= 0) {
state = SlotCheckState.CHANGE;
} else if (command.getId() % masterSize == slot) {
state = SlotCheckState.PASS;
} else {
state = SlotCheckState.INJECT;
}
return state;
}
Код слишком громоздкий, поэтому я не буду вставлять сюда код по одному, чтобы объяснить функции каждого класса. Будет понятнее прочитать код самостоятельно.
Workerузел Временная диаграмма кода выглядит следующим образом:
Код слишком громоздкий, поэтому я не буду вставлять сюда код по одному, чтобы объяснить функции каждого класса. Будет понятнее прочитать код самостоятельно.
Поскольку связь RPC между узлами и службами приложений реализована на основе Netty, здесь не будут подробно объясняться знания, связанные с Netty. В текущей главе рассматривается только проектирование и реализация режима взаимодействия между Master и Worker.
Общий дизайн выглядит следующим образом
Взаимодействие бизнес-логики между Master и Worker основано на сервере Netty и клиенте для реализации связи Rpc. Когда Master и Worker запускаются, они регистрируют информацию своего сервера Netty на соответствующем узле ZK, и задачи Master будут следующими. Распределено, когда выполняются бизнес-операции, такие как уничтожение потоков и задач, извлеките Worker на ZK. Информация об узле, выберите узел в соответствии со стратегией балансировки нагрузки и создайте клиент Netty для связи с сервером Netty работника. После того, как работник получит запрос RPC от мастера, он будет кэшировать информацию о канале и одновременно обрабатывать соответствующий бизнес. время поток обратного вызова обратного вызова получит кэшированный канал. Выполните операцию обратного вызова, образуя таким образом замкнутый цикл.
Выполнение и уничтожение задач, а также обработка статуса обратного вызова и другие операции выполняются через процессор процессора, связанный с клиентом и сервером Netty.
Конкретный код основной части выглядит следующим образом.:
При запуске Master он инициализирует Nettyserver, зарегистрирует соответствующий обработчик запросов в NettyHandler и запустит:
@PostConstruct
public void run() throws SchedulerException {
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
// install task plugin
this.taskPluginManager.installPlugin();
// self tolerant
this.masterRegistryClient.init();
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
this.masterSchedulerService.init();
this.masterSchedulerService.start();
this.eventExecuteService.start();
this.failoverExecuteThread.start();
this.scheduler.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("shutdownHook");
}
}));
}
/**
* server start
*/
public void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
.group(this.bossGroup, this.workGroup)
.channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
initNettyChannel(ch);
}
});
ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) {
logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
}
if (future.isSuccess()) {
logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (future.cause() != null) {
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
} else {
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
}
}
}
Когда NettyExecutorManager мастера инициализируется, NettyRemotingClient также будет инициализирован, и будет зарегистрирован процессор, который обрабатывает запросы обратного вызова Worker. Реальная привязка порта происходит после получения порта исполнителя:
/**
* constructor
*/
public NettyExecutorManager() {
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
##Зарегистрируйте процессор, который обрабатывает обратные вызовы рабочих процессов
@PostConstruct
public void init() {
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
}
public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig;
if (NettyUtils.useEpoll()) {
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
} else {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
}
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
this.start();
}
/**
* start
*/
private void start() {
this.bootstrap
.group(this.workerGroup)
.channel(NettyUtils.getSocketChannelClass())
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
.addLast(new NettyDecoder(), clientHandler, encoder);
}
});
this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS);
isStarted.compareAndSet(false, true);
}
Код распределения задач следующий:
/**
* task dispatch
*
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
/**
* get executor manager
*/
ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
if (executorManager == null) {
throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
}
/**
* host select
*/
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, "
+ "current task needs worker group %s to execute",
context.getCommand(),context.getWorkerGroup()));
}
context.setHost(host);
executorManager.beforeExecute(context);
try {
/**
* task execute
*/
return executorManager.execute(context);
} finally {
executorManager.afterExecute(context);
}
}
/**
* execute logic
*
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
/**
* all nodes
*/
Set<String> allNodes = getAllNodes(context);
/**
* fail nodes
*/
Set<String> failNodeSet = new HashSet<>();
/**
* build command accord executeContext
*/
Command command = context.getCommand();
/**
* execute task host
*/
Host host = context.getHost();
boolean success = false;
while (!success) {
try {
doExecute(host, command);
success = true;
context.setHost(host);
} catch (ExecuteException ex) {
logger.error(String.format("execute command : %s error", command), ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
if (remained != null && remained.size() > 0) {
host = Host.of(remained.iterator().next());
logger.error("retry execute command : {} host : {}", command, host);
} else {
throw new ExecuteException("fail after try all nodes");
}
} catch (Throwable t) {
throw new ExecuteException("fail after try all nodes");
}
}
}
return success;
}
/**
* execute logic
*
* @param host host
* @param command command
* @throws ExecuteException if error throws ExecuteException
*/
public void doExecute(final Host host, final Command command) throws ExecuteException {
/**
* retry count,default retry 3
*/
int retryCount = 3;
boolean success = false;
do {
try {
nettyRemotingClient.send(host, command);
success = true;
} catch (Exception ex) {
logger.error(String.format("send command : %s to %s error", command, host), ex);
retryCount--;
ThreadUtils.sleep(100);
}
} while (retryCount >= 0 && !success);
if (!success) {
throw new ExecuteException(String.format("send command : %s to %s error", command, host));
}
}
/**
* send task
*
* @param host host
* @param command command
*/
public void send(final Host host, final Command command) throws RemotingException {
Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
try {
ChannelFuture future = channel.writeAndFlush(command).await();
if (future.isSuccess()) {
logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());
} else {
String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());
logger.error(msg, future.cause());
throw new RemotingException(msg);
}
} catch (Exception e) {
logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
}
}
Конкретный код рабочей части выглядит следующим образом::
Точно так же Woker при запуске инициализирует NettyServer, зарегистрирует соответствующий процессор и запустится:
/**
* worker server run
*/
@PostConstruct
public void run() {
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
// install task plugin
this.taskPluginManager.installPlugin();
// worker registry
try {
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
// task execute manager
this.workerManagerThread.start();
// retry report task status
this.retryReportTaskStatusThread.start();
/*
* registry hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("shutdownHook");
}
}));
}
Когда объект потока обратного вызова инициализируется, включенный клиент Nettyremotingclient будет инициализирован вместе, и соответствующий бизнес-процессор будет зарегистрирован:
public TaskCallbackService() {
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
}
Поток обратного вызова будет связываться с клиентом Мастера через Chanel, кэшированный в других исполнителях:
/**
* send result
*
* @param taskInstanceId taskInstanceId
* @param command command
*/
public void send(int taskInstanceId, Command command) {
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if (nettyRemoteChannel != null) {
nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// remove(taskInstanceId);
return;
}
}
});
}
}
к Служба Если взять журналы в качестве примера, внешний интерфейс запускает интерфейс для запроса журналов, взаимодействует с базой данных через параметры для получения информации NettyServer от Master, а затем создает клиент Netty для связи с Master для получения журналов и их возврата. Конкретный код выглядит следующим образом:
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
return loggerService.queryLog(taskInstanceId, skipNum, limit);
}
* view log
*
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
@Override
@SuppressWarnings("unchecked")
public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null) {
return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
}
if (StringUtils.isBlank(taskInstance.getHost())) {
return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);
}
Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
String log = queryLog(taskInstance,skipLineNum,limit);
result.setData(log);
return result;
}
* query log
*
* @param taskInstance task instance
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
Host host = Host.of(taskInstance.getHost());
logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), taskInstance.getLogPath(),
host.getPort());
StringBuilder log = new StringBuilder();
if (skipLineNum == 0) {
String head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(),
host,
Constants.SYSTEM_LINE_SEPARATOR);
log.append(head);
}
log.append(logClient
.rollViewLog(host.getIp(), host.getPort(), taskInstance.getLogPath(), skipLineNum, limit));
return log.toString();
}
* roll view log
*
* @param host host
* @param port port
* @param path path
* @param skipLineNum skip line number
* @param limit limit
* @return log content
*/
public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {
logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = "";
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(
response.getBody(), RollViewLogResponseCommand.class);
return rollReviewLog.getMsg();
}
} catch (Exception e) {
logger.error("roll view log error", e);
} finally {
this.client.closeChannel(address);
}
return result;
}
* sync send
*
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @return command
*/
public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
final long opaque = command.getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
channel.writeAndFlush(command).addListener(future -> {
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
logger.error("send command {} to host {} failed", command, host);
});
/*
* sync wait for result
*/
Command result = responseFuture.waitResponse();
if (result == null) {
if (responseFuture.isSendOK()) {
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
} else {
throw new RemotingException(host.toString(), responseFuture.getCause());
}
}
return result;
}
Nettyclient инициализируется одновременно с инициализацией бизнес-объекта журнала:
* construct client
*/
public LogClientService() {
this.clientConfig = new NettyClientConfig();
this.clientConfig.setWorkerThreads(4);
this.client = new NettyRemotingClient(clientConfig);
this.isRunning = true;
}
DolphinScheduler предоставляет три алгоритма балансировки нагрузки, когда Master выбирает исполнителей, и все алгоритмы используют веса узлов: взвешенный случайный (случайный), плавный опрос (циклический перебор) и линейная нагрузка (меньший вес). Используйте файл конфигурации, чтобы указать, какая стратегия балансировки нагрузки используется. Конфигурацией по умолчанию является стратегия веса: селектор хоста: low_weight.
public HostManager hostManager() {
HostSelector selector = masterConfig.getHostSelector();
HostManager hostManager;
switch (selector) {
case RANDOM:
hostManager = new RandomHostManager();
break;
case ROUND_ROBIN:
hostManager = new RoundRobinHostManager();
break;
case LOWER_WEIGHT:
hostManager = new LowerWeightHostManager();
break;
default:
throw new IllegalArgumentException("unSupport selector " + selector);
}
beanFactory.autowireBean(hostManager);
return hostManager;
}
Лучше понять это, взглянув на код: просуммируйте все значения весов, а затем возьмите случайное целое число итогового результата. Случайное целое число добавляет совокупную разницу в весе ко всем исходным хостам и возвращает хост, когда он есть. меньше нуля. Если нет, возвращается случайное целое число.
public HostWorker doSelect(final Collection<HostWorker> source) {
List<HostWorker> hosts = new ArrayList<>(source);
int size = hosts.size();
int[] weights = new int[size];
int totalWeight = 0;
int index = 0;
for (HostWorker host : hosts) {
totalWeight += host.getHostWeight();
weights[index] = host.getHostWeight();
index++;
}
if (totalWeight > 0) {
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < size; i++) {
offset -= weights[i];
if (offset < 0) {
return hosts.get(i);
}
}
}
return hosts.get(ThreadLocalRandom.current().nextInt(size));
}
Логика расчета веса: для расчета используйте зарегистрированное использование ЦП, использование памяти, коэффициент загрузки и время запуска.
double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;
long uptime = System.currentTimeMillis() - startTime;
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
// If the warm-up is not over, add the weight
return calculatedWeight * Constants.WARM_UP_TIME / uptime;
}
return calculatedWeight;
}
Получите узел с наименьшим весом и сбросьте вес узла до наибольшего.
* select
*
* @param sources sources
* @return HostWeight
*/
@Override
public HostWeight doSelect(Collection<HostWeight> sources) {
double totalWeight = 0;
double lowWeight = 0;
HostWeight lowerNode = null;
for (HostWeight hostWeight : sources) {
totalWeight += hostWeight.getWeight();
hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
lowerNode = hostWeight;
lowWeight = hostWeight.getCurrentWeight();
}
}
lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
return lowerNode;
}
Этот алгоритм не очень прост для понимания, поэтому я не знаю, правильно ли я понимаю. Раньше он всегда брал первый, когда накопленный вес превышал максимальный, он начинал опрос. по весу.
public HostWorker doSelect(Collection<HostWorker> source) {
List<HostWorker> hosts = new ArrayList<>(source);
String key = hosts.get(0).getWorkerGroup();
ConcurrentMap<String, WeightedRoundRobin> map = workGroupWeightMap.get(key);
if (map == null) {
workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
map = workGroupWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
HostWorker selectedHost = null;
WeightedRoundRobin selectWeightRoundRobin = null;
for (HostWorker host : hosts) {
String workGroupHost = host.getWorkerGroup() + host.getAddress();
WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
int weight = host.getHostWeight();
if (weight < 0) {
weight = 0;
}
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
// set weight
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(workGroupHost, weightedRoundRobin);
weightedRoundRobin = map.get(workGroupHost);
}
if (weight != weightedRoundRobin.getWeight()) {
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedHost = host;
selectWeightRoundRobin = weightedRoundRobin;
}
totalWeight += weight;
}
if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
workGroupWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
if (selectedHost != null) {
selectWeightRoundRobin.sel(totalWeight);
return selectedHost;
}
return hosts.get(0);
}
Введена версия 2.6.2, поэтому особо объяснять не буду.
Это еще не изучено. Визуальная проверка заключается в основном в фильтрации данных в соответствии с правилами, а затем вызове интерфейса службы сигнализации указанного типа для выполнения операций сигнализации, таких как электронная почта, WeChat, SMS-уведомление и т. д.
Если эта статья вам полезна, не забудьте "заглянуть" "Нравиться" "собирать" Эй, три подряд!
Худшая эра Интернета действительно может наступить
Я учусь в университете Билибили по специальности «большие данные».
Когда мы изучаем Flink, что именно мы изучаем?
193 статьи избили Флинка, на этот сборник нужно обратить внимание
Производственная среда Flink ТОП проблем и оптимизации, Alibaba Zangjingge YYDS
Когда мы изучаем Spark, что именно мы изучаем?
Среди всех модулей Spark я бы назвал SparkSQL самым сильным!
Hard Hive | Краткое содержание интервью по базовой настройке на 40 000 слов
Небольшая энциклопедия методологий и практик управления данными.
Небольшое руководство по построению портретов пользователей по системе тегов
Статья на 40 000 слов. | ClickHouseБаза&упражняться&Настройка полноэкранного анализа
【интервью&личностный рост】2021В середине года,Опыт социального рекрутинга и школьного рекрутинга