Платформа планирования больших данных Airflow (5): использование Airflow
Платформа планирования больших данных Airflow (5): использование Airflow

Использование воздушного потока

Как упоминалось выше, общие шаги по использованию Airflow для планирования задач следующие:

  • Создавайте файлы Python и используйте разные операторы в соответствии с реальными потребностями.
  • Передайте определенные параметры различным операторам в файле Python и определите серию задач.
  • Определите связь между задачами в файле Python, чтобы сформировать группу обеспечения доступности баз данных.
  • Загрузите файл Python для выполнения и запланируйте DAG. Каждая задача будет формировать экземпляр.
  • Используйте командную строку или WEBUI для просмотра и управления.

Вышеупомянутый файл Python представляет собой сценарий Python Airflow, который использует код для указания структуры группы обеспечения доступности баз данных.

один、Команда оболочки диспетчеризации воздушного потока

Давайте возьмем планирование и выполнение команд оболочки в качестве примера, чтобы объяснить использование воздушного потока.

1. Сначала нам нужно создать файл Python и импортировать необходимые библиотеки классов.

Язык кода:javascript
копировать
# импортировать DAG Объект, вам нужно позже создать экземпляр объекта DAG.
from airflow import DAG

# импортироватьBashOperator Операторы, нам нужно использовать этот объект для выполнения процесса
from airflow.operators.bash import BashOperator

Примечание. Приведенный выше код можно создать в инструменте разработки, но пакет Airflow необходимо импортировать и установить в используемой среде Python3.7.

Язык кода:javascript
копировать
D:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

2. Создать экземпляр DAG

Язык кода:javascript
копировать
from datetime import datetime, timedelta

# Определите некоторые параметры в default_args, которые можно использовать при создании экземпляра DAG, используя Python. dic определение формата
default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2022, 3, 25),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'myairflow_execute_bash', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = timedelta(days=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)

Уведомление:

  • Существует три способа создания экземпляра DAG.

Первый способ:

Язык кода:javascript
копировать
with DAG("my_dag_name") as dag:
    op=XXOperator(task_id="task")

Второй метод (использованный выше):

Язык кода:javascript
копировать
my_dag = DAG("my_dag_name")
op = XXOperator(task_id="task", dag=my_dag)

Третий способ:

Язык кода:javascript
копировать
@dag(start_date=days_ago(2))
def generate_dag():
    op = XXOperator(task_id="task")
dag = generate_dag()
  • Описание основных параметров базового оператора:

Вы можете обратиться к:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperatorПроверятьbaseopartorБольше параметров в。

  • Описание параметра DAG

Вы можете обратиться к:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html

Просмотрите описание параметра DAG или можете напрямую щелкнуть DAG в инструменте разработки, чтобы ввести исходный код и узнать, каковы соответствующие параметры.

3. Определить задачу

При создании экземпляра оператора создается задача Task. Процесс создания экземпляра объекта из оператора называется конструктором. Каждый конструктор имеет «task_id» в качестве уникального идентификатора задачи.

Далее мы определяем три Оператора, то есть три Задачи. Каждый идентификатор задачи не может повторяться.

Язык кода:javascript
копировать
# operator Поддерживает несколько типов, используется здесь BashOperator
first = BashOperator(
    task_id='first',
    bash_command='echo "run first task"',
    dag=dag
)

middle = BashOperator(
    task_id='middle',
    bash_command='echo "run middle task"',
    dag=dag
)

last = BashOperator(
    task_id='last',
    bash_command='echo "run last task"',
    dag=dag,
    retries=3
)

Уведомление:

  • Соответствующие параметры можно передать каждому оператору, чтобы переопределить параметры DAG по умолчанию. Например: «повторные попытки» = 3 в последней задаче заменяет значение по умолчанию 1. Правила приоритета параметров задачи следующие: ① Отображение переданных параметров ② Значение, существующее в словаре default_args ③ Значение оператора по умолчанию (если оно существует).
  • BashOperatorСправочник по использованию:http://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html#howto-operator-bashoperator

4. Установите зависимости задач

Язык кода:javascript
копировать
#использовать set_upstream、set_downstream Установленные зависимости не могут отображаться, иначе будет выдано сообщение об ошибке.
# middle.set_upstream(first) # middle будет выполнено после завершения первого выполнения
# last.set_upstream(middle) # last будет внутри Выполняется после завершения среднего выполнения

#Вы также можете использовать символы смещения для установки зависимостей
first >> middle >>last # first Выполнить первым, средним вторым, последним последним
# first >> [middle,last] # первый выполняется первый, средний ,последнее параллельное выполнение

Уведомление: При выполнении скрипта,если вDAGнайден водинкольцевая ссылка(Например:A->B->C-A)выдаст исключение。БолееDAG taskЗависимости можно найти на официальном сайте.:http://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#task-dependencies 

5、Загрузите скрипт конфигурации Python

На данный момент конфигурация Python выглядит следующим образом:

Язык кода:javascript
копировать
# импортировать DAG Объект, вам нужно позже создать экземпляр объекта DAG.
from airflow import DAG

# импортироватьBashOperator Операторы, нам нужно использовать этот объект для выполнения процесса
from airflow.example_dags.example_bash_operator import dag

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

# Определите некоторые параметры в default_args, которые можно использовать при создании экземпляра DAG, используя Python. dic определение формата
default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2021, 9, 4),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'myairflow_execute_bash', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = timedelta(days=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)

# operator Поддерживает несколько типов, используется здесь BashOperator
first = BashOperator(
    task_id='first',
    bash_command='echo "run first task"',
    dag=dag
)

middle = BashOperator(
    task_id='middle',
    bash_command='echo "run middle task"',
    dag=dag
)

last = BashOperator(
    task_id='last',
    bash_command='echo "run last task"',
    dag=dag,
    retries=3
)

#использовать set_upstream、set_downstream Установленные зависимости не могут отображаться, иначе будет выдано сообщение об ошибке.
# middle.set_upstream(first) # middle будет выполнено после завершения первого выполнения
# last.set_upstream(middle) # last будет внутри Выполняется после завершения среднего выполнения

#Вы также можете использовать символы смещения для установки зависимостей
first >> middle >>last # first Выполнить первым, средним вторым, последним последним
# first >> [middle,last] # первый выполняется первый, средний ,последнее параллельное выполнение

 Будет ли вышеизложенноеpythonФайл конфигурации загружен наКаталог AIRFLOW_HOME/dags, по умолчаниюAIRFLOW_HOMEЧтобы установить узел“/root/airflow”Оглавление,Каталог dags в текущем каталоге необходимо создать вручную.

6. Перезапустите воздушный поток.

«ps aux|grep webserver» и «ps aux|grep Scheduler» находят соответствующий процесс воздушного потока, завершают его и перезапускают Airflow. После перезапуска вы увидите соответствующий идентификатор DAG «myairflow_execute_bash» в веб-интерфейсе airflow.

7、Выполнить воздушный поток

Выполните следующие действия, чтобы выполнить DAG. Сначала откройте рабочий процесс, а затем «Запустите DAG». После этого вы увидите, что задача выполнена успешно.

Посмотреть журнал выполнения задач:

два、Время срабатывания планирования DAG

В Airflow планировщик запускает DAG на основе «start_date» и «schedule_interval», указанных в файле DAG. В частности, важно отметить, что планировщик Airflow запускает выполнение DAG в конце запланированного периода времени, а не запускает DAG в начале, например:

Язык кода:javascript
копировать
default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2022, 3, 25),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'myairflow_execute_bash', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = timedelta(days=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)

Настроенная выше группа обеспечения доступности баз данных запланирована на 24 марта 2022 г. по всемирному координированному времени (UTC) и выполняется каждый день. Конкретное время работы этой группы обеспечения доступности баз данных следующее:

Автоматически планировать дату выполнения DAG

Автоматически планировать фактическое время запуска DAG.

2022-03-24,00:00:00+00:00

2022-03-25,00:00:00+00:00

2022-03-25,00:00:00+00:00

2022-03-26,00:00:00+00:00

2022-03-26,00:00:00+00:00

2022-03-27,00:00:00+00:00

2022-03-27,00:00:00+00:00

2022-03-28,00:00:00+00:00

2022-03-28,00:00:00+00:00

2022-03-29,00:00:00+00:00

... ...

... ...

Первые данные в приведенной выше таблице используются в качестве примера для объяснения. Обычное расписание Airflow — 00:00:00 каждый день. Предположим, что дата дня — 2022-03-24. достигает 2022-03-24 00:00:00. Он будет выполнен, и период планирования измененного времени планирования будет 2022-03-24 00:00:00 ~ 2022-03-25. 00:00:00 , в Airflow выполнение фактически запускается в конце цикла планирования, то есть время выполнения, автоматически запускаемое в 2022-03-24 00:00:00, равно 2022-03-25 00: 00:00. 

Как показано на рисунке ниже, в airflow «execution_date» — это не фактическое время выполнения, а временная отметка начала цикла планирования. Например: дата_выполнения — 04.09.2021, 00:00:00. Фактическое время автоматического планирования группы обеспечения доступности баз данных — 05.09.2021, 00:00:00. Конечно, в дополнение к автоматическому планированию мы также можем вручную запустить выполнение DAG. Чтобы определить, запланирован ли запуск DAG (автоматическое планирование) или запущен вручную, вы можете просмотреть «Тип запуска».

три、DAG catchup Настройки параметров

В плане работы Airflow важной концепцией является догонялка. После реализации конкретной логики DAG, если для догона установлено значение True (по умолчанию — True), Airflow будет «заполнять» все прошлые запуски DAG. Если для догона установлено значение False, Airflow выполнит запуск DAG, начиная с момента, предшествующего последнему времени запуска DAG, игнорируя все предыдущие записи.

Например: теперь группа обеспечения доступности баз данных выполняется каждые 1 минуту, время начала планирования — 01.01.2001, а текущая дата — 01.10.2021, 15:23:21. начало с 2001-01. Текущая группа обеспечения доступности баз данных будет запускаться каждую минуту, начиная с -01 00:00:00. Если для параметра catchup установлено значение False, DAG будет выполнять запуск DAG с 15:22:20 2021-10-01 (момент перед текущим 15:23:21 2021-10-01).

Пример: есть три командные задачи оболочки: первая, вторая и третья. Они планируются по порядку и выполняются каждую минуту. Первое время выполнения — 01.01.2000.

Установите для параметра catchup значение True (по умолчанию), и конфигурация Python DAG будет следующей:

Язык кода:javascript
копировать
from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2001, 1, 1),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}
dag = DAG(
    dag_id = 'catchup_test1 ', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = timedelta(minutes=1), # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
    catchup=True # осуществлятьDAGчас,начнетсявремя До сих пор всеосуществлятьзадачиосуществлять,По умолчаниюTrue
)

first = BashOperator(
    task_id='first',
    bash_command='echo "run first task"',
    dag=dag
)
middle = BashOperator(
    task_id='second',
    bash_command='echo "run second task"',
    dag=dag
)
last = BashOperator(
    task_id='third',
    bash_command='echo "run third task"',
    dag=dag,
    retries=3
)
first >> middle >>last

Загрузите файл конфигурации Python в $AIRFLOW_HOME/dags, перезапустите airflow, и график выполнения DAG будет следующим:

Установите для параметра catchup значение False, и конфигурация Python DAG будет следующей:

Язык кода:javascript
копировать
from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2001, 1, 1),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}
dag = DAG(
    dag_id = 'catchup_test2', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = timedelta(minutes=1), # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
    catchup=False # осуществлятьDAGчас,начнетсявремя До сих пор всеосуществлятьзадачиосуществлять,По умолчаниюTrue
)

first = BashOperator(
    task_id='first',
    bash_command='echo "run first task"',
    dag=dag
)
middle = BashOperator(
    task_id='second',
    bash_command='echo "run second task"',
    dag=dag
)
last = BashOperator(
    task_id='third',
    bash_command='echo "run third task"',
    dag=dag,
    retries=3
)
first >> middle >>last

Загрузите файл конфигурации Python в $AIRFLOW_HOME/dags, перезапустите airflow, и график выполнения DAG будет следующим:

Есть два способа настроить догонялки в Airflow:

  • Глобальная конфигурация

В разделе планировщика файла конфигурации воздушного потока airflow.cfg установите catchup_by_default=True (по умолчанию) или False. Этот параметр является глобальным.

  • Конфигурация файла DAG

Установите параметры объекта DAG в конфигурации кода Python: dag.catchup=True или False.

Язык кода:javascript
копировать
dag = DAG(
    dag_id = 'myairflow_execute_bash',
default_args = default_args,    
catchup=False,
    schedule_interval = timedelta(days=1))

Четыре、Настройки цикла планирования DAG

Каждый DAG может иметь цикл выполнения планирования или нет. Если цикл планирования существует, мы можем установить параметр «schedule_interval» в конфигурации DAG кода Python, чтобы указать цикл планирования DAG. Его можно установить следующими тремя способами.

  • Готовое расписание Cron

AirflowПредустановкаодиннекоторыйCronПланированиенеделя Ожидать,Вы можете обратиться к:

DAG Runs — Airflow Documentation,Как показано ниже:

Используйте следующее в файле конфигурации Python:

Язык кода:javascript
копировать
default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2021, 9, 4),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'cron_test', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = '@daily' # использовать Готовое расписание Крон, запланированный в 0:00 каждый день
  • Cron

Этот метод заключается в написании команды запланированного задания crontab системы Linux.,Можно найти вhttps://crontab.guru/Веб-сайт сначала генерирует соответствующую настройку.час Планирование Заказ,Его формат следующий:

Язык кода:javascript
копировать
minute  hour  day  month  week
минута: представляет минуту, которая может быть любым целым числом от 0 до 59.
час: представляет час, который может быть любым целым числом от 0 до 23.
день: представляет дату, которая может быть любым целым числом от 1 до 31.
месяц: представляет месяц, который может быть любым целым числом от 1 до 12.
week:Указывает звезду Ожидать Несколько,Может быть любым целым числом от 0 до 7,0 или 7 здесь представляют воскресенье.

В каждом из вышеперечисленных полей также можно использовать специальные символы для обозначения различных значений:

Язык кода:javascript
копировать
Звездочка (*): представляет все возможные значения.,Например, если поле месяца отмечено звездочкой,Это означает, что после соблюдения ограничений других полей каждый месяцосуществлять该Заказ操作。
Запятая (,): вы можете указать диапазон списка со значениями, разделенными запятыми, например «1,2,5,7,8,9».
Центральная полоса (-): вы можете использовать центральную полосу между целыми числами для обозначения диапазона целых чисел, например, «2-6» означает «2,3,4,5,6».
Косая черта (/): косую черту можно использовать для указания частоты интервала и размера шага времени.,Например”0-23/2”значит каждые два Часосуществлятьодин Второсортный。

Используйте следующее в файле конфигурации Python:

Язык кода:javascript
копировать
default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2021, 9, 4),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'cron_test', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = '* * * * *' # Использование Кронтаба Команда запланированного задания, запускается раз в минуту
)
  • datetime.timedelta

timedelta использует Python timedelta Установите период планирования, вы можете настроить дни, недели, часы, минуты, секунды и миллисекунды. Используйте следующее в файле конфигурации Python:

Язык кода:javascript
копировать
default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2021, 9, 4),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'cron_test', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = timedelta(minutes=5) # используя питон timedelta Установите период планирования, вы можете настроить дни, недели, часы, минуты, секунды и миллисекунды.
)

пять、Параметры зависимостей задач DAG

1、Параметры зависимостей задач DAGодин

  • Блок-схема планирования DAG
  • зависимости выполнения задачи
Язык кода:javascript
копировать
A >> B >>C
  • Полный код
Язык кода:javascript
копировать
'''
airflow Настройка зависимости задач один

'''
from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2021, 9, 22),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'dag_relation_1', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = timedelta(minutes=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)


A = BashOperator(
    task_id='A',
    bash_command='echo "run A task"',
    dag=dag
)

B = BashOperator(
    task_id='B',
    bash_command='echo "run B task"',
    dag=dag
)

C = BashOperator(
    task_id='C',
    bash_command='echo "run C task"',
    dag=dag,
    retries=3
)

A >> B >>C

2、Параметры зависимостей задач DAGдва

  • Блок-схема планирования DAG
  • зависимости выполнения задачи
Язык кода:javascript
копировать
[A,B] >>C >>D
  • Полный код
Язык кода:javascript
копировать
'''
airflow Параметр зависимости задач 2

'''
from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2021, 9, 22),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'dag_relation_2', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = timedelta(minutes=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)


A = BashOperator(
    task_id='A',
    bash_command='echo "run A task"',
    dag=dag
)

B = BashOperator(
    task_id='B',
    bash_command='echo "run B task"',
    dag=dag
)

C = BashOperator(
    task_id='C',
    bash_command='echo "run C task"',
    dag=dag,
    retries=3
)

D = BashOperator(
    task_id='D',
    bash_command='echo "run D task"',
    dag=dag
)

[A,B] >>C >>D

3、Параметры зависимостей задач DAGтри

  • Блок-схема планирования DAG
  • зависимости выполнения задачи
Язык кода:javascript
копировать
[A,B,C] >>D >>[E,F]
  • Полный код
Язык кода:javascript
копировать
'''
airflow Параметр зависимости задач три

'''
from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2021, 9, 22),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'dag_relation_3', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = timedelta(minutes=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)


A = BashOperator(
    task_id='A',
    bash_command='echo "run A task"',
    dag=dag
)

B = BashOperator(
    task_id='B',
    bash_command='echo "run B task"',
    dag=dag
)

C = BashOperator(
    task_id='C',
    bash_command='echo "run C task"',
    dag=dag,
    retries=3
)

D = BashOperator(
    task_id='D',
    bash_command='echo "run D task"',
    dag=dag
)

E = BashOperator(
    task_id='E',
    bash_command='echo "run E task"',
    dag=dag
)

F = BashOperator(
    task_id='F',
    bash_command='echo "run F task"',
    dag=dag
)

[A,B,C] >>D >>[E,F]

4、Параметры зависимостей задач DAGЧетыре

  • Блок-схема планирования DAG
  • зависимости выполнения задачи
Язык кода:javascript
копировать
A >>B>>C>>D
A >>E>>F
  • Полный код
Язык кода:javascript
копировать
'''
airflow Настройки зависимости задач четыре

'''
from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2021, 9, 22),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'dag_relation_4', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = timedelta(minutes=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)


A = BashOperator(
    task_id='A',
    bash_command='echo "run A task"',
    dag=dag
)

B = BashOperator(
    task_id='B',
    bash_command='echo "run B task"',
    dag=dag
)

C = BashOperator(
    task_id='C',
    bash_command='echo "run C task"',
    dag=dag,
    retries=3
)

D = BashOperator(
    task_id='D',
    bash_command='echo "run D task"',
    dag=dag
)

E = BashOperator(
    task_id='E',
    bash_command='echo "run E task"',
    dag=dag
)

F = BashOperator(
    task_id='F',
    bash_command='echo "run F task"',
    dag=dag
)

A >>[B,C,D]
A >>[E,F]

5、Параметры зависимостей задач DAGпять

  • Блок-схема планирования DAG
  • зависимости выполнения задачи
Язык кода:javascript
копировать
A >>B>>E
C >>D>>E
  • Полный код
Язык кода:javascript
копировать
'''
airflow Параметр зависимости задач пять

'''
from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow', # имя владельца
    'start_date': datetime(2021, 9, 22),  # Время, когда выполнение начинается в первый раз, равно UTC время
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5),  # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'dag_relation_5', #DAG id , должен полностью состоять из букв, цифр и знаков подчеркивания
    default_args = default_args, #Внешне определено dic параметры формата
    schedule_interval = timedelta(minutes=1) # Определите частоту запуска DAG, вы можете настроить дни, недели, часы, минуты, секунды, миллисекунды.
)


A = BashOperator(
    task_id='A',
    bash_command='echo "run A task"',
    dag=dag
)

B = BashOperator(
    task_id='B',
    bash_command='echo "run B task"',
    dag=dag
)

C = BashOperator(
    task_id='C',
    bash_command='echo "run C task"',
    dag=dag,
    retries=3
)

D = BashOperator(
    task_id='D',
    bash_command='echo "run D task"',
    dag=dag
)

E = BashOperator(
    task_id='E',
    bash_command='echo "run E task"',
    dag=dag
)

A >>B>>E
C >>D>>E 
boy illustration
Учебное пособие по Jetpack Compose для начинающих, базовые элементы управления и макет
boy illustration
Код js веб-страницы, фон частицы, код спецэффектов
boy illustration
【новый! Суперподробное】Полное руководство по свойствам компонентов Figma.
boy illustration
🎉Обязательно к прочтению новичкам: полное руководство по написанию мини-программ WeChat с использованием программного обеспечения Cursor.
boy illustration
[Забавный проект Docker] VoceChat — еще одно приложение для мгновенного чата (IM)! Может быть встроен в любую веб-страницу!
boy illustration
Как реализовать переход по странице в HTML (html переходит на указанную страницу)
boy illustration
Как решить проблему зависания и низкой скорости при установке зависимостей с помощью npm. Существуют ли доступные источники npm, которые могут решить эту проблему?
boy illustration
Серия From Zero to Fun: Uni-App WeChat Payment Practice WeChat авторизует вход в систему и украшает страницу заказа, создает интерфейс заказа и инициирует запрос заказа
boy illustration
Серия uni-app: uni.navigateЧтобы передать скачок значения
boy illustration
Апплет WeChat настраивает верхнюю панель навигации и адаптируется к различным моделям.
boy illustration
JS-время конвертации
boy illustration
Обеспечьте бесперебойную работу ChromeDriver 125: советы по решению проблемы chromedriver.exe не найдены
boy illustration
Поле комментария, щелчок мышью, специальные эффекты, js-код
boy illustration
Объект массива перемещения объекта JS
boy illustration
Как открыть разрешение на позиционирование апплета WeChat_Как использовать WeChat для определения местонахождения друзей
boy illustration
Я даю вам два набора из 18 простых в использовании фонов холста Power BI, так что вам больше не придется возиться с цветами!
boy illustration
Получить текущее время в js_Как динамически отображать дату и время в js
boy illustration
Вам необходимо изучить сочетания клавиш vsCode для форматирования и организации кода, чтобы вам больше не приходилось настраивать формат вручную.
boy illustration
У ChatGPT большое обновление. Всего за 45 минут пресс-конференция показывает, что OpenAI сделал еще один шаг вперед.
boy illustration
Copilot облачной разработки — упрощение разработки
boy illustration
Микросборка xChatGPT с низким кодом, создание апплета чат-бота с искусственным интеллектом за пять шагов
boy illustration
CUDA Out of Memory: идеальное решение проблемы нехватки памяти CUDA
boy illustration
Анализ кластеризации отдельных ячеек, который должен освоить каждый&MarkerгенетическийВизуализация
boy illustration
vLLM: мощный инструмент для ускорения вывода ИИ
boy illustration
CodeGeeX: мощный инструмент генерации кода искусственного интеллекта, который можно использовать бесплатно в дополнение к второму пилоту.
boy illustration
Машинное обучение Реальный бой LightGBM + настройка параметров случайного поиска: точность 96,67%
boy illustration
Бесшовная интеграция, мгновенный интеллект [1]: платформа больших моделей Dify-LLM, интеграция без кодирования и встраивание в сторонние системы, более 42 тысяч звезд, чтобы стать свидетелями эксклюзивных интеллектуальных решений.
boy illustration
LM Studio для создания локальных больших моделей
boy illustration
Как определить количество слоев и нейронов скрытых слоев нейронной сети?
boy illustration
[Отслеживание целей] Подробное объяснение ByteTrack и детали кода