Как упоминалось выше, общие шаги по использованию Airflow для планирования задач следующие:
Вышеупомянутый файл Python представляет собой сценарий Python Airflow, который использует код для указания структуры группы обеспечения доступности баз данных.
Давайте возьмем планирование и выполнение команд оболочки в качестве примера, чтобы объяснить использование воздушного потока.
# импортировать DAG Объект, вам нужно позже создать экземпляр объекта DAG.
from airflow import DAG
# импортироватьBashOperator Операторы, нам нужно использовать этот объект для выполнения процесса
from airflow.operators.bash import BashOperator
Примечание. Приведенный выше код можно создать в инструменте разработки, но пакет Airflow необходимо импортировать и установить в используемой среде Python3.7.
D:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
from datetime import datetime, timedelta
# Определите некоторые параметры в default_args, которые можно использовать при создании экземпляра DAG, используя Python. dic определение формата
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2022, 3, 25), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'myairflow_execute_bash', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = timedelta(days=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)
Уведомление:
Первый способ:
with DAG("my_dag_name") as dag:
op=XXOperator(task_id="task")
Второй метод (использованный выше):
my_dag = DAG("my_dag_name")
op = XXOperator(task_id="task", dag=my_dag)
Третий способ:
@dag(start_date=days_ago(2))
def generate_dag():
op = XXOperator(task_id="task")
dag = generate_dag()
Вы можете обратиться к:
http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperatorПроверятьbaseopartorБольше параметров в。
Вы можете обратиться к:
http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html
Просмотрите описание параметра DAG или можете напрямую щелкнуть DAG в инструменте разработки, чтобы ввести исходный код и узнать, каковы соответствующие параметры.
При создании экземпляра оператора создается задача Task. Процесс создания экземпляра объекта из оператора называется конструктором. Каждый конструктор имеет «task_id» в качестве уникального идентификатора задачи.
Далее мы определяем три Оператора, то есть три Задачи. Каждый идентификатор задачи не может повторяться.
# operator Поддерживает несколько типов, используется здесь BashOperator
first = BashOperator(
task_id='first',
bash_command='echo "run first task"',
dag=dag
)
middle = BashOperator(
task_id='middle',
bash_command='echo "run middle task"',
dag=dag
)
last = BashOperator(
task_id='last',
bash_command='echo "run last task"',
dag=dag,
retries=3
)
Уведомление:
#использовать set_upstream、set_downstream Установленные зависимости не могут отображаться, иначе будет выдано сообщение об ошибке.
# middle.set_upstream(first) # middle будет выполнено после завершения первого выполнения
# last.set_upstream(middle) # last будет внутри Выполняется после завершения среднего выполнения
#Вы также можете использовать символы смещения для установки зависимостей
first >> middle >>last # first Выполнить первым, средним вторым, последним последним
# first >> [middle,last] # первый выполняется первый, средний ,последнее параллельное выполнение
Уведомление: При выполнении скрипта,если вDAGнайден водинкольцевая ссылка(Например:A->B->C-A)выдаст исключение。БолееDAG taskЗависимости можно найти на официальном сайте.:http://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#task-dependencies
На данный момент конфигурация Python выглядит следующим образом:
# импортировать DAG Объект, вам нужно позже создать экземпляр объекта DAG.
from airflow import DAG
# импортироватьBashOperator Операторы, нам нужно использовать этот объект для выполнения процесса
from airflow.example_dags.example_bash_operator import dag
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# Определите некоторые параметры в default_args, которые можно использовать при создании экземпляра DAG, используя Python. dic определение формата
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2021, 9, 4), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'myairflow_execute_bash', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = timedelta(days=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)
# operator Поддерживает несколько типов, используется здесь BashOperator
first = BashOperator(
task_id='first',
bash_command='echo "run first task"',
dag=dag
)
middle = BashOperator(
task_id='middle',
bash_command='echo "run middle task"',
dag=dag
)
last = BashOperator(
task_id='last',
bash_command='echo "run last task"',
dag=dag,
retries=3
)
#использовать set_upstream、set_downstream Установленные зависимости не могут отображаться, иначе будет выдано сообщение об ошибке.
# middle.set_upstream(first) # middle будет выполнено после завершения первого выполнения
# last.set_upstream(middle) # last будет внутри Выполняется после завершения среднего выполнения
#Вы также можете использовать символы смещения для установки зависимостей
first >> middle >>last # first Выполнить первым, средним вторым, последним последним
# first >> [middle,last] # первый выполняется первый, средний ,последнее параллельное выполнение
Будет ли вышеизложенноеpythonФайл конфигурации загружен наКаталог AIRFLOW_HOME/dags, по умолчаниюAIRFLOW_HOMEЧтобы установить узел“/root/airflow”Оглавление,Каталог dags в текущем каталоге необходимо создать вручную.
«ps aux|grep webserver» и «ps aux|grep Scheduler» находят соответствующий процесс воздушного потока, завершают его и перезапускают Airflow. После перезапуска вы увидите соответствующий идентификатор DAG «myairflow_execute_bash» в веб-интерфейсе airflow.
Выполните следующие действия, чтобы выполнить DAG. Сначала откройте рабочий процесс, а затем «Запустите DAG». После этого вы увидите, что задача выполнена успешно.
Посмотреть журнал выполнения задач:
В Airflow планировщик запускает DAG на основе «start_date» и «schedule_interval», указанных в файле DAG. В частности, важно отметить, что планировщик Airflow запускает выполнение DAG в конце запланированного периода времени, а не запускает DAG в начале, например:
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2022, 3, 25), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'myairflow_execute_bash', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = timedelta(days=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)
Настроенная выше группа обеспечения доступности баз данных запланирована на 24 марта 2022 г. по всемирному координированному времени (UTC) и выполняется каждый день. Конкретное время работы этой группы обеспечения доступности баз данных следующее:
Автоматически планировать дату выполнения DAG | Автоматически планировать фактическое время запуска DAG. |
---|---|
2022-03-24,00:00:00+00:00 | 2022-03-25,00:00:00+00:00 |
2022-03-25,00:00:00+00:00 | 2022-03-26,00:00:00+00:00 |
2022-03-26,00:00:00+00:00 | 2022-03-27,00:00:00+00:00 |
2022-03-27,00:00:00+00:00 | 2022-03-28,00:00:00+00:00 |
2022-03-28,00:00:00+00:00 | 2022-03-29,00:00:00+00:00 |
... ... | ... ... |
Первые данные в приведенной выше таблице используются в качестве примера для объяснения. Обычное расписание Airflow — 00:00:00 каждый день. Предположим, что дата дня — 2022-03-24. достигает 2022-03-24 00:00:00. Он будет выполнен, и период планирования измененного времени планирования будет 2022-03-24 00:00:00 ~ 2022-03-25. 00:00:00 , в Airflow выполнение фактически запускается в конце цикла планирования, то есть время выполнения, автоматически запускаемое в 2022-03-24 00:00:00, равно 2022-03-25 00: 00:00.
Как показано на рисунке ниже, в airflow «execution_date» — это не фактическое время выполнения, а временная отметка начала цикла планирования. Например: дата_выполнения — 04.09.2021, 00:00:00. Фактическое время автоматического планирования группы обеспечения доступности баз данных — 05.09.2021, 00:00:00. Конечно, в дополнение к автоматическому планированию мы также можем вручную запустить выполнение DAG. Чтобы определить, запланирован ли запуск DAG (автоматическое планирование) или запущен вручную, вы можете просмотреть «Тип запуска».
В плане работы Airflow важной концепцией является догонялка. После реализации конкретной логики DAG, если для догона установлено значение True (по умолчанию — True), Airflow будет «заполнять» все прошлые запуски DAG. Если для догона установлено значение False, Airflow выполнит запуск DAG, начиная с момента, предшествующего последнему времени запуска DAG, игнорируя все предыдущие записи.
Например: теперь группа обеспечения доступности баз данных выполняется каждые 1 минуту, время начала планирования — 01.01.2001, а текущая дата — 01.10.2021, 15:23:21. начало с 2001-01. Текущая группа обеспечения доступности баз данных будет запускаться каждую минуту, начиная с -01 00:00:00. Если для параметра catchup установлено значение False, DAG будет выполнять запуск DAG с 15:22:20 2021-10-01 (момент перед текущим 15:23:21 2021-10-01).
Пример: есть три командные задачи оболочки: первая, вторая и третья. Они планируются по порядку и выполняются каждую минуту. Первое время выполнения — 01.01.2000.
Установите для параметра catchup значение True (по умолчанию), и конфигурация Python DAG будет следующей:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2001, 1, 1), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'catchup_test1 ', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = timedelta(minutes=1), # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
catchup=True # осуществлятьDAGчас,начнетсявремя До сих пор всеосуществлятьзадачиосуществлять,По умолчаниюTrue
)
first = BashOperator(
task_id='first',
bash_command='echo "run first task"',
dag=dag
)
middle = BashOperator(
task_id='second',
bash_command='echo "run second task"',
dag=dag
)
last = BashOperator(
task_id='third',
bash_command='echo "run third task"',
dag=dag,
retries=3
)
first >> middle >>last
Загрузите файл конфигурации Python в $AIRFLOW_HOME/dags, перезапустите airflow, и график выполнения DAG будет следующим:
Установите для параметра catchup значение False, и конфигурация Python DAG будет следующей:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2001, 1, 1), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'catchup_test2', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = timedelta(minutes=1), # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
catchup=False # осуществлятьDAGчас,начнетсявремя До сих пор всеосуществлятьзадачиосуществлять,По умолчаниюTrue
)
first = BashOperator(
task_id='first',
bash_command='echo "run first task"',
dag=dag
)
middle = BashOperator(
task_id='second',
bash_command='echo "run second task"',
dag=dag
)
last = BashOperator(
task_id='third',
bash_command='echo "run third task"',
dag=dag,
retries=3
)
first >> middle >>last
Загрузите файл конфигурации Python в $AIRFLOW_HOME/dags, перезапустите airflow, и график выполнения DAG будет следующим:
Есть два способа настроить догонялки в Airflow:
В разделе планировщика файла конфигурации воздушного потока airflow.cfg установите catchup_by_default=True (по умолчанию) или False. Этот параметр является глобальным.
Установите параметры объекта DAG в конфигурации кода Python: dag.catchup=True или False.
dag = DAG(
dag_id = 'myairflow_execute_bash',
default_args = default_args,
catchup=False,
schedule_interval = timedelta(days=1))
Каждый DAG может иметь цикл выполнения планирования или нет. Если цикл планирования существует, мы можем установить параметр «schedule_interval» в конфигурации DAG кода Python, чтобы указать цикл планирования DAG. Его можно установить следующими тремя способами.
AirflowПредустановкаодиннекоторыйCronПланированиенеделя Ожидать,Вы можете обратиться к:
DAG Runs — Airflow Documentation,Как показано ниже:
Используйте следующее в файле конфигурации Python:
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2021, 9, 4), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'cron_test', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = '@daily' # использовать Готовое расписание Крон, запланированный в 0:00 каждый день
Этот метод заключается в написании команды запланированного задания crontab системы Linux.,Можно найти вhttps://crontab.guru/Веб-сайт сначала генерирует соответствующую настройку.час Планирование Заказ,Его формат следующий:
minute hour day month week
минута: представляет минуту, которая может быть любым целым числом от 0 до 59.
час: представляет час, который может быть любым целым числом от 0 до 23.
день: представляет дату, которая может быть любым целым числом от 1 до 31.
месяц: представляет месяц, который может быть любым целым числом от 1 до 12.
week:Указывает звезду Ожидать Несколько,Может быть любым целым числом от 0 до 7,0 или 7 здесь представляют воскресенье.
В каждом из вышеперечисленных полей также можно использовать специальные символы для обозначения различных значений:
Звездочка (*): представляет все возможные значения.,Например, если поле месяца отмечено звездочкой,Это означает, что после соблюдения ограничений других полей каждый месяцосуществлять该Заказ操作。
Запятая (,): вы можете указать диапазон списка со значениями, разделенными запятыми, например «1,2,5,7,8,9».
Центральная полоса (-): вы можете использовать центральную полосу между целыми числами для обозначения диапазона целых чисел, например, «2-6» означает «2,3,4,5,6».
Косая черта (/): косую черту можно использовать для указания частоты интервала и размера шага времени.,Например”0-23/2”значит каждые два Часосуществлятьодин Второсортный。
Используйте следующее в файле конфигурации Python:
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2021, 9, 4), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'cron_test', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = '* * * * *' # Использование Кронтаба Команда запланированного задания, запускается раз в минуту
)
timedelta использует Python timedelta Установите период планирования, вы можете настроить дни, недели, часы, минуты, секунды и миллисекунды. Используйте следующее в файле конфигурации Python:
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2021, 9, 4), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'cron_test', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = timedelta(minutes=5) # используя питон timedelta Установите период планирования, вы можете настроить дни, недели, часы, минуты, секунды и миллисекунды.
)
A >> B >>C
'''
airflow Настройка зависимости задач один
'''
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2021, 9, 22), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'dag_relation_1', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = timedelta(minutes=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)
A = BashOperator(
task_id='A',
bash_command='echo "run A task"',
dag=dag
)
B = BashOperator(
task_id='B',
bash_command='echo "run B task"',
dag=dag
)
C = BashOperator(
task_id='C',
bash_command='echo "run C task"',
dag=dag,
retries=3
)
A >> B >>C
[A,B] >>C >>D
'''
airflow Параметр зависимости задач 2
'''
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2021, 9, 22), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'dag_relation_2', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = timedelta(minutes=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)
A = BashOperator(
task_id='A',
bash_command='echo "run A task"',
dag=dag
)
B = BashOperator(
task_id='B',
bash_command='echo "run B task"',
dag=dag
)
C = BashOperator(
task_id='C',
bash_command='echo "run C task"',
dag=dag,
retries=3
)
D = BashOperator(
task_id='D',
bash_command='echo "run D task"',
dag=dag
)
[A,B] >>C >>D
[A,B,C] >>D >>[E,F]
'''
airflow Параметр зависимости задач три
'''
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2021, 9, 22), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'dag_relation_3', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = timedelta(minutes=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)
A = BashOperator(
task_id='A',
bash_command='echo "run A task"',
dag=dag
)
B = BashOperator(
task_id='B',
bash_command='echo "run B task"',
dag=dag
)
C = BashOperator(
task_id='C',
bash_command='echo "run C task"',
dag=dag,
retries=3
)
D = BashOperator(
task_id='D',
bash_command='echo "run D task"',
dag=dag
)
E = BashOperator(
task_id='E',
bash_command='echo "run E task"',
dag=dag
)
F = BashOperator(
task_id='F',
bash_command='echo "run F task"',
dag=dag
)
[A,B,C] >>D >>[E,F]
A >>B>>C>>D
A >>E>>F
'''
airflow Настройки зависимости задач четыре
'''
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2021, 9, 22), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'dag_relation_4', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = timedelta(minutes=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)
A = BashOperator(
task_id='A',
bash_command='echo "run A task"',
dag=dag
)
B = BashOperator(
task_id='B',
bash_command='echo "run B task"',
dag=dag
)
C = BashOperator(
task_id='C',
bash_command='echo "run C task"',
dag=dag,
retries=3
)
D = BashOperator(
task_id='D',
bash_command='echo "run D task"',
dag=dag
)
E = BashOperator(
task_id='E',
bash_command='echo "run E task"',
dag=dag
)
F = BashOperator(
task_id='F',
bash_command='echo "run F task"',
dag=dag
)
A >>[B,C,D]
A >>[E,F]
A >>B>>E
C >>D>>E
'''
airflow Параметр зависимости задач пять
'''
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # имя владельца
'start_date': datetime(2021, 9, 22), # Время, когда выполнение начинается в первый раз, равно UTC время
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5), # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'dag_relation_5', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
default_args = default_args, #Внешне определено dic параметры формата
schedule_interval = timedelta(minutes=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)
A = BashOperator(
task_id='A',
bash_command='echo "run A task"',
dag=dag
)
B = BashOperator(
task_id='B',
bash_command='echo "run B task"',
dag=dag
)
C = BashOperator(
task_id='C',
bash_command='echo "run C task"',
dag=dag,
retries=3
)
D = BashOperator(
task_id='D',
bash_command='echo "run D task"',
dag=dag
)
E = BashOperator(
task_id='E',
bash_command='echo "run E task"',
dag=dag
)
A >>B>>E
C >>D>>E