«Go open source package» asynq: простой, надежный и эффективный пакет распределенной очереди задач на основе Redis.
«Go open source package» asynq: простой, надежный и эффективный пакет распределенной очереди задач на основе Redis.

Привет всем, я рыбак.

Сегодня я хотел бы порекомендовать вам продукт на основеredisвыполнитьизПростойнадежныйиЭффективныйизРаспределенная очередь задачasynq。Долженочередь От сотрудника GoogleKen Hibino

Адрес проекта с открытым исходным кодом: https://github.com/hibiken/asynq, звезда 6.1k, в настоящее время в нем участвуют 29 участников.

Архитектурный проект async

Кластер на базе Redis поддерживает дозорный режим. Таким образом, хранилище может быть горизонтально масштабируемым и иметь высокую доступность. Ниже приведен принцип проектирования верхнего уровня операции async:

  • Клиент ставит задачу в очередь.
  • Сервер поочередно извлекает задачи и обрабатывает каждую задачу, запускаемую с помощью сопрограммы (называемой воркером).
  • Задачи, в свою очередь, могут обрабатываться одновременно несколькими работниками.

Ниже приведена схема архитектуры пакета:

Основные особенности

  • Убедитесь, что каждая задача используется хотя бы один раз.
  • Поддержка запланированных задач
  • Повторная попытка неудачного использования задачи поддержки
  • Поддержка функции задержки
  • Поддержка режима кластера Redis (горизонтально масштабируемый) и режима Sentinals (высокой доступности)
  • Поддержка партиизадачи потребления
  • Поставляется с полной функцией монитора

Далее мы пишем клиентскую программу и серверную программу на основе асинхронного пакета для отправки и использования сообщений.

Инсталляционный пакет

пока getЗаказ Инсталляционный пакет

Язык кода:javascript
копировать
go get -u github.com/hibiken/asynq

основные типы данных

первый,Давайте посмотрим на два основных типа данных, которые вам необходимо использовать при использовании этого пакета.,

Тип конфигурации подключения Redis: RedisClientOpt

asynqиспользование пакетаRedisв качестве брокера сообщений,мы называем этоmessage broker。существовать Установить соединение с При использовании Redis используйте этот тип для настройки установки. соединение с Некоторые свойства Redis. Например, адрес Redis, пароль и т. д.

Язык кода:javascript
копировать
redisConnOpt := async.RedisClientOpt{
    Addr: "localhost:6379",
    // Omit if no password is required
    Password: "mypassword",
    // Use a dedicated db number for asynq.
    // By default, Redis offers 16 databases (0..15)
    DB: 0,
}

Задачи: Задачи

существоватьasynqсередина,Единица работы упакована вTask,существоватьTaskВ типе есть два поля:TypeиPayload

Язык кода:javascript
копировать
// Тип — строковый тип, представляющий тип задачи. 
func (t *Task) Type() string

// Payload Это тело сообщения.
func (t *Task) Payload() []byte

производственные задачи

Отправка сообщения toturn включает в себя три этапа: установить соединение с Redis, создать задачуTask и отправить задачу в redis.

Установить соединение с Redis

проходитьNewClientфункцияиredisУстановить соединение

Язык кода:javascript
копировать
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})

Поддержка кластера Redis

существовать Установить соединение с Redisчас,проходитьasynq.RedisClusterClientOptКластер конфигурации структуры。следующее:

Язык кода:javascript
копировать
client := asynq.NewClient(asynq.RedisClusterClientOpt{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})

Аналогично, со стороны потребителя Установить соединение с Redis также использует режим кластера.

Поддержка режима Redis Sentry

Чтобы обеспечить высокую доступность и позволить службе Redis автоматически восстанавливаться в случае сбоя, пакет async также поддерживает дозорный режим Redis. следующее:

Язык кода:javascript
копировать
var redis = &asynq.RedisFailoverClientOpt{
    MasterName:    "mymaster",
    SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
}

client := asynq.NewClient(redis)

Создание задач

От NewTaskСоздание задач。носуществовать Создание перед задачами,нуждатьсяхотеть Сначала по себеиз Текст сообщения с определением бизнесаизструктура。Здесь мы определяемEmailTaskPayloadиз消息体структура,Данные, используемые для бизнес-обработки при использовании.

Язык кода:javascript
копировать
type EmailTaskPayload struct {
    // ID for the email recipient.
    UserID int
}

payload, err := json.Marshal(EmailTaskPayload{UserID: 42})

//Создано 2 задачи, одна из них типа "email:welcome"
t1 := asynq.NewTask("email:welcome", payload)

//Создание задача, тип задачи "email:reminder"
t2 := asynq.NewTask("email:reminder", payload)

Отправить задачу команде

проходитьEnqueueфункция может отправить задачуочередь,и будет немедленно съеден.

Язык кода:javascript
копировать
info, err := client.Enqueue(t1)

Укажите время потребления задачи

Иногда мы не хотим, чтобы задача использовалась немедленно, а ждали определенного периода времени, прежде чем она будет использована, что часто называют отложенным потреблением или отложенной очередью. В пакете async есть два способа добиться этого.

Будет израсходовано через определенный период времени

проходитьProcessInфункцияуказатьчасинтервал,следующее,Это означает, что он будет израсходован через 24 часа.

Язык кода:javascript
копировать
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))

проходитьProcessAtфункцияуказать具体изчасинтервал,следующее,Это также означает, что он будет употреблен через 24 часа.

Язык кода:javascript
копировать
info, err = client.Enqueue(t2, asynq.ProcessAt(time.Now().Add(24*time.Hour))
Укажите время сохранения задачи

проходитьRetentionфункцияуказатьчасинтервал,Означает, что при обработке задачи,Вы все еще можете сохранить некоторыеизчасмежду。следующее Это верноtask3Сохранять после успешного употребления2маленькийчас。

Язык кода:javascript
копировать
client.Enqueue(task3, asynq.Retention(2*time.Hour))
Укажите очередь, которую необходимо поставить в очередь

проходитьQueueФункция options определяет, какиеочередь。следующее,Отправьте сообщение в очередь с именем «high» в Redis.

Язык кода:javascript
копировать
client.Enqueue(t2, asynq.Queue("high"))

задачи потребления

Потреблять задачи в редисочереди,первыйхотеть Установить соединение с Redis, мы называем это сервером. Используйте следующие методы для связи с экземпляром службы Redis и очередью для подписки.

Язык кода:javascript
копировать
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
    )

    // NOTE: We'll cover what this `handler` is in the section below.
    if err := srv.Run(handler); err != nil {
        log.Fatal(err)
    }
}

Укажите очередь для использования

Например, если мы хотим использовать две очереди с именами «высокая» и «медленная» в Redis, мы можем указать их через структуру async.Config. следующее:

Язык кода:javascript
копировать
srv := asynq.NewServer(
    asynq.RedisClientOpt{Addr: "localhost:6379"},
    asynq.Config{
     Queues: map[string]int{
         "high": 6,
         "slow": 4,
       }
    },
)

6 и 4 в очереди представляют уровни приоритета двух очередей.

Укажите количество рабочих

В Нью Сервере,проходитьasynq.Configструктура体серединаизConcurrencyПоле,Можно указатьзапускатьизworkerколичество。несколькоworkerМожет быть одновременноиз Потреблениеочередьсерединаиз Задача。следующее,Представляет запуск 10 рабочих. Если этот параметр конфигурации не указан,Число рабочих процессов по умолчанию — это количество процессоров на сервере.

Язык кода:javascript
копировать
srv := asynq.NewServer(
    asynq.RedisClientOpt{Addr: "localhost:6379"},
    asynq.Config{
     Concurrency: 10,
       Queues: map[string]int{
          "high": 6,
          "slow": 4,
      }
    },
)

Укажите потребительский процессор

отredisочередьсередина获取到Задача后应Должен Как это решается??то естьсуществоватьосуществлятьserverизRunметодчасуказатьHandler,ДолженHandlerэтоинтерфейстип,нуждатьсяхотетьвыполнитьследующееинтерфейс:

Язык кода:javascript
копировать
type Handler interface {
 ProcessTask(context.Context, *Task) error
}

Далее мы определяем обработчик типа, реализующий этот интерфейс, и выполняем в этом обработчике конкретную бизнес-обработку. В этом обработчике выполняется различная обработка в зависимости от типа полученной задачи.

Язык кода:javascript
копировать
func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type() {
    case "email:welcome":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Send Welcome Email to User %d", p.UserID)

    case "email:reminder":
        var p EmailTaskPayload
        if err := json.Unmarshal(t.Payload(), &p); err != nil {
            return err
        }
        log.Printf(" [*] Send Reminder Email to User %d", p.UserID)

    default:
        return fmt.Errorf("unexpected task type: %s", t.Type())
    }
    return nil
}

Затем запустите Сервер и укажите обработчик. следующее:

Язык кода:javascript
копировать
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    // Use asynq.HandlerFunc adapter for a handler function
    if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
        log.Fatal(err)
    }
}

Регистрация обработчика через Mux пакета async

вышеиз Задача Потреблениеизhandlerиз Читабельность кода низкая.。существоватьasynqСумкасередина Также предоставляетMuxтип,用于进行注册Задачатипипереписыватьсяиз Потребление逻辑из Функция。следующее:

Язык кода:javascript
копировать
func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{Concurrency: 10},
    )

    mux := asynq.NewServeMux()
    mux.HandleFunc("email:welcome", sendWelcomeEmail)
    mux.HandleFunc("email:reminder", sendReminderEmail)

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

В этом коде,проходитьmux.HandleFuncфункция分别注册了Задачатипemail:welcomeиз Задачаиз Логика обработки такаяsendWelcomeEmailфункция;Задачатипдаemail:reminderиз Задачаиз Логика обработки такаяsendReminderEmailфункция。

Далее мы рассмотрим определения двух функций обработки. следующее:

Язык кода:javascript
копировать
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
    return nil
}

func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
    var p EmailTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
    return nil
}

Задачи пакетной обработки

Пакет async также поддерживает Задачи. пакетной обработки。Прямо сейчассуществовать АдмиралНесколько задач отправлены в одну группу и в одну очередь。существовать Потреблениеизчас Просто подождипроходитьобозначениеизФункции агрегирования объединяют несколько задач в одну задачу.,Затем отправляется на обработку в процессор задач.

Когда выполняется любое из следующих двух условий, его можно агрегировать и отправить в процессор задач:

  • Максимальное время агрегации: при достижении определенного времени независимо от того, сколько задач находится в текущей группе, агрегация будет выполнена.
  • Максимальное количество задач агрегации: если указанное количество задач уже существует до достижения времени агрегации, агрегация также будет выполнена.

Укажите группу, к которой принадлежит задача

При отправке задачи используйте функцию «Группировать»:

Язык кода:javascript
копировать
// Enqueue three tasks to the same group.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))

Укажите конфигурации, связанные с агрегированием задач.

При запуске службы для потребления вы можете указать параметры функции, связанные с агрегацией. следующее:

Язык кода:javascript
копировать
// This function is used to aggregate multiple tasks into one.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
    // ... Your logic to aggregate the given tasks and return the aggregated task.
    // ... Use NewTask(typename, payload, opts...) to create a new task and set options if needed.
    // ... (Note) Queue option will be ignored and the aggregated task will always be enqueued to the same queue the group belonged.
}

srv := asynq.NewServer(
 redisConnOpt,
    asynq.Config{
     GroupAggregator:  asynq.GroupAggregatorFunc(aggregate), //Укажите функцию агрегирования как агрегат
        GroupMaxDelay:    10 * time.Minute, //Агрегация не позднее, чем каждые 10 минут
        GroupGracePeriod: 2 * time.Minute, //Агрегация каждые 2 минуты
        GroupMaxSize:     20, // Агрегировать каждые 20 задач
        Queues: map[string]int{"notifications": 1},
    },
)

// Функция агрегации одной группы, наконец, объединяет несколько задач в одну задачу и отправляет обработчику задач
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
        log.Printf("Aggregating %d tasks from group %q", len(tasks), group)
        var b strings.Builder
        for _, t := range tasks {
                b.Write(t.Payload())
                b.WriteString("\n")
        }
        return asynq.NewTask("aggregated-task", []byte(b.String()))
}

монитор

Полная поворотная система, монитор необходим. Пакет async также поставляется с двумя формами монитора: веб-интерфейсом и инструментами командной строки.

webUIмонитор

webUImonitor реализован через пакет asyncmon Открытого исходного кода.,Адресhttps://github.com/hibiken/asynqmon。следующее Как показано на картинке:

перспектива очереди

image.png

перспектива миссии

image.png

Монитор командной строки

Другой Монитор командной строки。пока install устанавливает инструмент async следующим образом:

Язык кода:javascript
копировать
go install github.com/hibiken/asynq/tools/asynq

Затем введите команду в терминале asynq dash запускать Панель управления командной строкой. Эффект следующий:

Хорошо, выше представлен пакет распределенной очереди на основе Redis, которым мы поделились с вами. Дополнительные функции см. в документации: https://github.com/hibiken/asynq/wiki.

Особые инструкции:тыизсосредоточиться «на» — моя самая большая мотивация продолжать писать. Нажмите на карточку общедоступного аккаунта ниже, чтобы сразу сосредоточиться на。сосредоточиться Отправьте PDF-документ «100 распространенных ошибок в Го» и классические учебные материалы по Го.

boy illustration
[1269] Использование Gunicorn для развертывания проектов flask.
boy illustration
Краткое изложение 10 способов регистрации bean-компонентов в SpringBoot
boy illustration
Flask Learning-9. 2 способа включения режима отладки (debug mode).
boy illustration
Руководство по настройке самостоятельного сервера для Eudemons Parlu
boy illustration
40 вопросов для собеседований по SpringBoot, которые необходимо задавать на собеседованиях! При необходимости ответьте на вопросы для собеседования SpringBoot [предлагаемый сборник] [легко понять]
boy illustration
Через два года JVM может быть заменен GraalVM.
boy illustration
Разрешение циклических зависимостей Spring Bean: существует ли неразрешимая циклическая ссылка?
boy illustration
Разница между промежуточным программным обеспечением ASP.NET Core и фильтрами
boy illustration
[Серия Foolish Old Man] Ноябрь 2023 г. Специальная тема Winform Control Элемент управления DataGridView Подробное объяснение
boy illustration
.NET Как загрузить файлы через HttpWebRequest
boy illustration
[Веселый проект Docker] Обновленная версия 2023 года! Создайте эксклюзивный инструмент управления паролями за 10 минут — Vaultwarden
boy illustration
Высокопроизводительная библиотека бревен Golang zap + компонент для резки бревен лесоруба подробное объяснение
boy illustration
Концепция и использование Springboot ConstraintValidator
boy illustration
Новые функции Go 1.23: точная настройка основных библиотек, таких как срезы и синхронизация, значительно улучшающая процесс разработки.
boy illustration
[Весна] Введение и базовое использование AOP в Spring, SpringBoot использует AOP.
boy illustration
Чтобы начать работу с рабочим процессом Flowable, этой статьи достаточно.
boy illustration
Байтовое интервью: как решить проблему с задержкой сообщений MQ?
boy illustration
ASP.NET Core использует функциональные переключатели для управления реализацией доступа по маршрутизации.
boy illustration
[Проблема] Решение Невозможно подключиться к Redis; вложенное исключение — io.lettuce.core.RedisConnectionException.
boy illustration
От теории к практике: проектирование чистой архитектуры в проектах Go
boy illustration
Решение проблемы искажения китайских символов при чтении файлов Net Core.
boy illustration
Реализация легких независимых конвейеров с использованием Brighter
boy illustration
Как удалить и вернуть указанную пару ключ-значение из ассоциативного массива в PHP
boy illustration
Feiniu fnos использует Docker для развертывания учебного пособия по AList
boy illustration
Принципы и практика использования многопоточности в различных версиях .NET.
boy illustration
Как использовать PaddleOCRSharp в рамках .NET
boy illustration
CRUD используется уже два или три года. Как читать исходный код Spring?
boy illustration
Устраните проблему совместимости между версией Spring Boot и Gradle Java: возникла проблема при настройке корневого проекта «demo1» > Не удалось.
boy illustration
Научите вас шаг за шагом, как настроить Nginx.
boy illustration
Это руководство — все, что вам нужно для руководства по автономному развертыванию сервера для проектов Python уровня няни (рекомендуемый сборник).