Привет всем, я рыбак.
Сегодня я хотел бы порекомендовать вам продукт на основеredis
выполнитьизПростой、надежныйиЭффективныйизРаспределенная очередь задач:asynq。Долженочередь От сотрудника GoogleKen Hibino。
Адрес проекта с открытым исходным кодом: https://github.com/hibiken/asynq, звезда 6.1k, в настоящее время в нем участвуют 29 участников.
Кластер на базе Redis поддерживает дозорный режим. Таким образом, хранилище может быть горизонтально масштабируемым и иметь высокую доступность. Ниже приведен принцип проектирования верхнего уровня операции async:
Ниже приведена схема архитектуры пакета:
Далее мы пишем клиентскую программу и серверную программу на основе асинхронного пакета для отправки и использования сообщений.
пока getЗаказ Инсталляционный пакет
go get -u github.com/hibiken/asynq
первый,Давайте посмотрим на два основных типа данных, которые вам необходимо использовать при использовании этого пакета.,
asynq
использование пакетаRedisв качестве брокера сообщений,мы называем этоmessage broker
。существовать Установить соединение с При использовании Redis используйте этот тип для настройки установки. соединение с Некоторые свойства Redis. Например, адрес Redis, пароль и т. д.
redisConnOpt := async.RedisClientOpt{
Addr: "localhost:6379",
// Omit if no password is required
Password: "mypassword",
// Use a dedicated db number for asynq.
// By default, Redis offers 16 databases (0..15)
DB: 0,
}
существоватьasynq
середина,Единица работы упакована вTask
,существоватьTask
В типе есть два поля:Type
иPayload
。
// Тип — строковый тип, представляющий тип задачи.
func (t *Task) Type() string
// Payload Это тело сообщения.
func (t *Task) Payload() []byte
Отправка сообщения toturn включает в себя три этапа: установить соединение с Redis, создать задачуTask и отправить задачу в redis.
проходитьNewClient
функцияиredis
Установить соединение
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
существовать Установить соединение с Redisчас,проходитьasynq.RedisClusterClientOpt
Кластер конфигурации структуры。следующее:
client := asynq.NewClient(asynq.RedisClusterClientOpt{
Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},
})
Аналогично, со стороны потребителя Установить соединение с Redis также использует режим кластера.
Чтобы обеспечить высокую доступность и позволить службе Redis автоматически восстанавливаться в случае сбоя, пакет async также поддерживает дозорный режим Redis. следующее:
var redis = &asynq.RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
}
client := asynq.NewClient(redis)
От NewTaskСоздание задач。носуществовать Создание перед задачами,нуждатьсяхотеть Сначала по себеиз Текст сообщения с определением бизнесаизструктура。Здесь мы определяемEmailTaskPayload
из消息体структура,Данные, используемые для бизнес-обработки при использовании.
type EmailTaskPayload struct {
// ID for the email recipient.
UserID int
}
payload, err := json.Marshal(EmailTaskPayload{UserID: 42})
//Создано 2 задачи, одна из них типа "email:welcome"
t1 := asynq.NewTask("email:welcome", payload)
//Создание задача, тип задачи "email:reminder"
t2 := asynq.NewTask("email:reminder", payload)
проходитьEnqueue
функция может отправить задачуочередь,и будет немедленно съеден.
info, err := client.Enqueue(t1)
Иногда мы не хотим, чтобы задача использовалась немедленно, а ждали определенного периода времени, прежде чем она будет использована, что часто называют отложенным потреблением или отложенной очередью. В пакете async есть два способа добиться этого.
проходитьProcessIn
функцияуказатьчасинтервал,следующее,Это означает, что он будет израсходован через 24 часа.
info, err = client.Enqueue(t2, asynq.ProcessIn(24*time.Hour))
проходитьProcessAt
функцияуказать具体изчасинтервал,следующее,Это также означает, что он будет употреблен через 24 часа.
info, err = client.Enqueue(t2, asynq.ProcessAt(time.Now().Add(24*time.Hour))
проходитьRetention
функцияуказатьчасинтервал,Означает, что при обработке задачи,Вы все еще можете сохранить некоторыеизчасмежду。следующее Это верноtask3Сохранять после успешного употребления2маленькийчас。
client.Enqueue(task3, asynq.Retention(2*time.Hour))
проходитьQueue
Функция options определяет, какиеочередь。следующее,Отправьте сообщение в очередь с именем «high» в Redis.
client.Enqueue(t2, asynq.Queue("high"))
Потреблять задачи в редисочереди,первыйхотеть Установить соединение с Redis, мы называем это сервером. Используйте следующие методы для связи с экземпляром службы Redis и очередью для подписки.
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
)
// NOTE: We'll cover what this `handler` is in the section below.
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
}
Например, если мы хотим использовать две очереди с именами «высокая» и «медленная» в Redis, мы можем указать их через структуру async.Config. следующее:
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{
Queues: map[string]int{
"high": 6,
"slow": 4,
}
},
)
6 и 4 в очереди представляют уровни приоритета двух очередей.
В Нью Сервере,проходитьasynq.Config
структура体серединаизConcurrency
Поле,Можно указатьзапускатьизworkerколичество。несколькоworkerМожет быть одновременноиз Потреблениеочередьсерединаиз Задача。следующее,Представляет запуск 10 рабочих. Если этот параметр конфигурации не указан,Число рабочих процессов по умолчанию — это количество процессоров на сервере.
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{
Concurrency: 10,
Queues: map[string]int{
"high": 6,
"slow": 4,
}
},
)
отredisочередьсередина获取到Задача后应Должен Как это решается??то естьсуществоватьосуществлятьserverизRunметодчасуказатьHandler
,ДолженHandler
этоинтерфейстип,нуждатьсяхотетьвыполнитьследующееинтерфейс:
type Handler interface {
ProcessTask(context.Context, *Task) error
}
Далее мы определяем обработчик типа, реализующий этот интерфейс, и выполняем в этом обработчике конкретную бизнес-обработку. В этом обработчике выполняется различная обработка в зависимости от типа полученной задачи.
func handler(ctx context.Context, t *asynq.Task) error {
switch t.Type() {
case "email:welcome":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
case "email:reminder":
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
default:
return fmt.Errorf("unexpected task type: %s", t.Type())
}
return nil
}
Затем запустите Сервер и укажите обработчик. следующее:
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
// Use asynq.HandlerFunc adapter for a handler function
if err := srv.Run(asynq.HandlerFunc(handler)); err != nil {
log.Fatal(err)
}
}
вышеиз Задача Потреблениеизhandlerиз Читабельность кода низкая.。существоватьasynqСумкасередина Также предоставляетMux
тип,用于进行注册Задачатипипереписыватьсяиз Потребление逻辑из Функция。следующее:
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "localhost:6379"},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc("email:welcome", sendWelcomeEmail)
mux.HandleFunc("email:reminder", sendReminderEmail)
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
}
В этом коде,проходитьmux.HandleFunc
функция分别注册了Задачатипemail:welcome
из Задачаиз Логика обработки такаяsendWelcomeEmail
функция;Задачатипдаemail:reminder
из Задачаиз Логика обработки такаяsendReminderEmail
функция。
Далее мы рассмотрим определения двух функций обработки. следующее:
func sendWelcomeEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Welcome Email to User %d", p.UserID)
return nil
}
func sendReminderEmail(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Send Reminder Email to User %d", p.UserID)
return nil
}
Пакет async также поддерживает Задачи. пакетной обработки。Прямо сейчассуществовать АдмиралНесколько задач отправлены в одну группу и в одну очередь。существовать Потреблениеизчас Просто подождипроходитьобозначениеизФункции агрегирования объединяют несколько задач в одну задачу.,Затем отправляется на обработку в процессор задач.
Когда выполняется любое из следующих двух условий, его можно агрегировать и отправить в процессор задач:
При отправке задачи используйте функцию «Группировать»:
// Enqueue three tasks to the same group.
client.Enqueue(task1, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task2, asynq.Queue("notifications"), asynq.Group("user1:email"))
client.Enqueue(task3, asynq.Queue("notifications"), asynq.Group("user1:email"))
При запуске службы для потребления вы можете указать параметры функции, связанные с агрегацией. следующее:
// This function is used to aggregate multiple tasks into one.
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
// ... Your logic to aggregate the given tasks and return the aggregated task.
// ... Use NewTask(typename, payload, opts...) to create a new task and set options if needed.
// ... (Note) Queue option will be ignored and the aggregated task will always be enqueued to the same queue the group belonged.
}
srv := asynq.NewServer(
redisConnOpt,
asynq.Config{
GroupAggregator: asynq.GroupAggregatorFunc(aggregate), //Укажите функцию агрегирования как агрегат
GroupMaxDelay: 10 * time.Minute, //Агрегация не позднее, чем каждые 10 минут
GroupGracePeriod: 2 * time.Minute, //Агрегация каждые 2 минуты
GroupMaxSize: 20, // Агрегировать каждые 20 задач
Queues: map[string]int{"notifications": 1},
},
)
// Функция агрегации одной группы, наконец, объединяет несколько задач в одну задачу и отправляет обработчику задач
func aggregate(group string, tasks []*asynq.Task) *asynq.Task {
log.Printf("Aggregating %d tasks from group %q", len(tasks), group)
var b strings.Builder
for _, t := range tasks {
b.Write(t.Payload())
b.WriteString("\n")
}
return asynq.NewTask("aggregated-task", []byte(b.String()))
}
Полная поворотная система, монитор необходим. Пакет async также поставляется с двумя формами монитора: веб-интерфейсом и инструментами командной строки.
webUImonitor реализован через пакет asyncmon Открытого исходного кода.,Адресhttps://github.com/hibiken/asynqmon。следующее Как показано на картинке:
image.png
image.png
Другой Монитор командной строки。пока install устанавливает инструмент async следующим образом:
go install github.com/hibiken/asynq/tools/asynq
Затем введите команду в терминале asynq dash
запускать Панель управления командной строкой. Эффект следующий:
Хорошо, выше представлен пакет распределенной очереди на основе Redis, которым мы поделились с вами. Дополнительные функции см. в документации: https://github.com/hibiken/asynq/wiki.
Особые инструкции:тыизсосредоточиться «на» — моя самая большая мотивация продолжать писать. Нажмите на карточку общедоступного аккаунта ниже, чтобы сразу сосредоточиться на。сосредоточиться Отправьте PDF-документ «100 распространенных ошибок в Го» и классические учебные материалы по Го.