SpringBoot + Disruptor реализует чрезвычайно быструю и высокопараллельную обработку, поддерживая 6 миллионов заказов в секунду без какого-либо давления!
SpringBoot + Disruptor реализует чрезвычайно быструю и высокопараллельную обработку, поддерживая 6 миллионов заказов в секунду без какого-либо давления!

Привет всем, я брат Лэй.

1. Предыстория

На работе я столкнулся с проектом, который использовал Disruptor в качестве очереди сообщений. Вы правильно прочитали, это не Kafka и не RabbitMQ. Самое большое преимущество Disruptor в том, что он быстрый и имеет открытый исходный код.

2. Введение в Disruptor

1、 Disruptor — это высокопроизводительная очередь, разработанная LMAX, британской компанией, занимающейся торговлей иностранной валютой. Первоначальной целью исследований и разработок было решение проблемы задержки очереди памяти (в тесте производительности выяснилось, что она находится в очереди). того же порядка, что и операция ввода-вывода). Система, разработанная на основе Disruptor, может поддерживать однопоточную обработку. Имея 6 миллионов операций в секунду, она привлекла внимание отрасли после выступления на QCon в 2010 году; 2、 Disruptor — это платформа Java с открытым исходным кодом, предназначенная для достижения максимально возможной пропускной способности (TPS) и минимально возможной задержки при решении проблемы производитель-потребитель (PCP); 3、 С функциональной точки зрения Disruptor реализует функцию «очереди», и это ограниченная очередь, поэтому сценарий его применения, естественно, является сценарием применения модели «производитель-потребитель»; 4、 Disruptor является ключевым компонентом онлайн-торговой платформы LMAX. Платформа LMAX использует эту структуру для обработки заказов со скоростью 6 миллионов TPS. Помимо финансовой сферы, Disruptor может использоваться и в других общих приложениях. Улучшения производительности; 5、 Фактически, Disruptor — это не столько платформа, сколько идея дизайна. Для программ с такими элементами, как «параллелизм, буферы, модели производитель-потребитель и обработка транзакций», Disruptor предлагает существенные решения для повышения производительности (TPS); 6、 Домашняя страница Disruptor на GitHub: https://github.com/LMAX-Exchange/disruptor;

3. Основная концепция Disruptor

Давайте начнем с понимания основных концепций Disruptor, чтобы понять, как он работает. Представленная ниже концептуальная модель является одновременно объектом предметной области и основным объектом, сопоставленным с реализацией кода.

Кроме того, если вы планируете пройти собеседование и сменить работу в ближайшем будущем, рекомендуется ответить на вопросы онлайн на ddkk.com, охватывая более 10 000 вопросов для собеседования по Java, охватывая почти все основные вопросы технических собеседований и наиболее полные 500 наборов. существующих на рынке стеков технологий, представляющих собой высококачественные продукты. Серия учебных пособий, доступных бесплатно.

1. Ring Buffer

Как следует из названия, это кольцевой буфер. RingBuffer раньше был основным объектом Disruptor, но, начиная с версии 3.0, его обязанности были упрощены и теперь он отвечает только за хранение и обновление данных (событий), которыми обмениваются через Disruptor. В некоторых более сложных сценариях приложений кольцевой буфер можно полностью заменить пользовательской реализацией.

2. Sequence Disruptor

Обмениваемые через него данные (события) нумеруются и управляются посредством последовательного возрастания порядковых номеров, причем обработка данных (событий) всегда производится инкрементально по порядковому номеру. Последовательность используется для отслеживания хода обработки определенного обработчика событий (RingBuffer/Consumer). Хотя AtomicLong также можно использовать для определения прогресса, определение Sequence как ответственного за эту проблему имеет еще одну цель: предотвратить проблему ложного совместного использования кэша ЦП (Flase Sharing) между различными последовательностями. (Примечание: это один из ключевых моментов Disruptor для достижения высокой производительности. В Интернете уже есть много представлений о проблеме псевдосовместного использования, поэтому я не буду здесь вдаваться в подробности).

3. Sequencer

Sequencer — это истинное сердце Disruptor. Этот интерфейс имеет два класса реализации: SingleProducerSequencer и MultiProducerSequencer, которые определяют алгоритмы параллелизма для быстрой и правильной передачи данных между производителями и потребителями.

4. Sequence Barrier

Используется для хранения ссылок на основную опубликованную последовательность RingBuffer и последовательности других потребителей, от которых зависит потребитель. Барьер последовательности также определяет логику, которая определяет, есть ли у потребителя дополнительные события для обработки.

5. Wait Strategy

Определите стратегию того, как Потребитель ждет следующего события. (Примечание: Disruptor определяет множество различных стратегий, обеспечивающих разную производительность для разных сценариев)

Кроме того, если вы планируете пройти собеседование и сменить работу в ближайшем будущем, рекомендуется ответить на вопросы онлайн на ddkk.com, охватывая более 10 000 вопросов для собеседования по Java, охватывая почти все основные вопросы технических собеседований и наиболее полные 500 наборов. существующих на рынке стеков технологий, представляющих собой высококачественные продукты. Серия учебных пособий, доступных бесплатно.

6. Event

В семантике Disruptor данные, которыми обмениваются производители и потребители, называются событиями. Это не конкретный тип, определенный Disruptor, а определяемый и указанный пользователем Disruptor.

7. EventProcessor

EventProcessor хранит последовательность конкретного потребителя (Consumer) и предоставляет цикл событий (Event Loop) для вызова реализации обработки событий.

8. EventHandler

Интерфейс обработки событий, определенный Disruptor, реализуется пользователями и используется для обработки событий. Это реальная реализация Consumer.

9. Producer

То есть производитель обычно ссылается на пользовательский код, который вызывает Disruptor для публикации событий. Disruptor не определяет конкретный интерфейс или тип.

4. Кейс-демо

Выполнив следующие 8 шагов, вы можете получить Disruptor Get home:

1. Добавьте зависимость pom.xml

Язык кода:javascript
копировать
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

2. Модель тела сообщения

Язык кода:javascript
копировать
/**
 * тело сообщения
 */
@Data
public class MessageModel {
     
       
    private String message;
}

3. Создайте EventFactory

Язык кода:javascript
копировать
public class HelloEventFactory implements EventFactory<MessageModel> {
     
       
    @Override
    public MessageModel newInstance() {
     
       
        return new MessageModel();
    }
}

4. Создайте EventHandler-потребитель

Язык кода:javascript
копировать
@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
     
       
    @Override
    public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
     
       
        try {
     
       
            //Остановимся здесь на 1000 мсда, чтобы подтвердить, что сообщение потребления да асинхронно
            Thread.sleep(1000);
            log.info("Начинается обработка сообщения потребителя");
            if (event != null) {
     
       
                log.info("потребитель Информация о потреблении да: {}", event);
            }
        } catch (Exception e) {
     
       
            log.info("потребителю не удалось обработать сообщение");
        }
        log.info("Обработка сообщения потребителя завершена");
    }
}

5. Создайте BeanManager

Язык кода:javascript
копировать
/**
 * Получить экземпляр объекта
 */
@Component
public class BeanManager implements ApplicationContextAware {
     
       

    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
     
       
        this.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
     
        return applicationContext; }

    public static Object getBean(String name) {
     
       
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clazz) {
     
       
        return applicationContext.getBean(clazz);
    }
}

6. Создайте MQManager

Кроме того, если вы планируете пройти собеседование и сменить работу в ближайшем будущем, рекомендуется ответить на вопросы онлайн на ddkk.com, охватывая более 10 000 вопросов для собеседования по Java, охватывая почти все основные вопросы технических собеседований и наиболее полные 500 наборов. существующих на рынке стеков технологий, представляющих собой высококачественные продукты. Серия учебных пособий, доступных бесплатно.

Язык кода:javascript
копировать
@Configuration
public class MQManager {
     
       

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
     
       
        //Определяем пул потоков, используемый для обработки событий, Disruptor запускает обработку потребителя через поток, предоставляемый java.util.concurrent.ExecutorSerivce.
        ExecutorService executor = Executors.newFixedThreadPool(2);

        //Указываем фабрику событий
        HelloEventFactory factory = new HelloEventFactory();

        //Укажите размер байта кольцевого буфера, который должен быть равен 2 в N-й степени (он может преобразовать операцию по модулю в битовую операцию для повышения эффективности), в противном случае это повлияет на эффективность
        int bufferSize = 1024 * 256;

        //Однопоточный режим для повышения производительности
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
                ProducerType.SINGLE, new BlockingWaitStrategy());

        //Установка события бизнес-процессора ---потребитель
        disruptor.handleEventsWith(new HelloEventHandler());

        // Начать ветку дезинтегратора
        disruptor.start();

        //Получаем кольцо кольцевого буфера, используемое для получения события, созданного производителем
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }

7. Создайте Mqservice и создайте класс-производитель реализации.

Язык кода:javascript
копировать
public interface DisruptorMqService {
     
       

    /**
     * информация
     * @param message
     */
    void sayHelloMq(String message);
}

@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
     
       

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;
    @Override
    public void sayHelloMq(String message) {
     
       
        log.info("record the message: {}",message);
        //Получаем индекс следующего слота Event
        long sequence = messageModelRingBuffer.next();
        try {
     
       
            //Заполняем событие данными
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("ПрошлоеInformationCheredДобавить Информация:{}", event);
        } catch (Exception e) {
     
       
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
     
       
            // Публикуем событие, активируем наблюдателя для потребления и передаем последовательность пользователю изменения
            //Обратите внимание, что последний метод публикации должен быть помещен наконец, чтобы гарантировать, что он должен быть вызван; если последовательность запроса не отправлена, он заблокирует последующие операции публикации или других производителей.
            messageModelRingBuffer.publish(sequence);
        }
    }
}

8. Создание тестовых классов и методов

Язык кода:javascript
копировать
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {
     
       

    @Autowired
    private DisruptorMqService disruptorMqService;
    /**
     * Disruptor используется внутри проекта для выполнения информацииочереди.
     * @throws Exception
     */
    @Test
    public void sayHelloMqTest() throws Exception{
     
       
        disruptorMqService.sayHelloMq("информацияприезжать,Hello world!");
        log.info("Информацияочередь отправлена");
        //Остановимся здесь на 2000 мсда, чтобы убедиться, что да обрабатывает информацию асинхронно
        Thread.sleep(2000);
    }
}

Результаты тестового запуска

Язык кода:javascript
копировать
2020-04-05 14:31:18.543  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : record the message: информацияприезжать,Hello world!
2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl  : Прошлоеинформацияочередь Добавитьинформация:MessageModel(message=информацияприезжать,Hello world!)
2020-04-05 14:31:18.545  INFO 7274 --- [           main] c.e.utils.demo.DemoApplicationTests      : информация очередьбыла отправлена
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : потребительиметь дело синформацияначинать
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : потребительинформация о потреблениида:MessageModel(message=информацияприезжать,Hello world!)
2020-04-05 14:31:19.547  INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler    : потребительиметь дело синформация Заканчивать

5. Резюме

На самом деле генератор -> потребитель Шаблон очень распространен, и описанных выше эффектов можно легко достичь с помощью некоторых очередей сообщений. Разница в том, что Disruptor Он реализован в виде очереди в памяти и не блокируется. Это также Disruptor Причина, почему это эффективно.

boy illustration
RasaGpt — платформа чат-ботов на основе Rasa и LLM.
boy illustration
Nomic Embed: воспроизводимая модель внедрения SOTA с открытым исходным кодом.
boy illustration
Улучшение YOLOv8: EMA основана на эффективном многомасштабном внимании, основанном на межпространственном обучении, и эффект лучше, чем у ECA, CBAM и CA. Малые цели имеют очевидные преимущества | ICASSP2023
boy illustration
Урок 1 серии Libtorch: Тензорная библиотека Silky C++
boy illustration
Руководство по локальному развертыванию Stable Diffusion: подробные шаги и анализ распространенных проблем
boy illustration
Полностью автоматический инструмент для работы с видео в один клик: VideoLingo
boy illustration
Улучшения оптимизации RT-DETR: облегченные улучшения магистрали | Support Paddle облегченный rtdetr-r18, rtdetr-r34, rtdetr-r50, rtdet
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | Деформируемое внимание с большим ядром (D-LKA Attention), большое ядро ​​​​свертки улучшает механизм внимания восприимчивых полей с различными функциями
boy illustration
Создано Datawhale: выпущено «Руководство по тонкой настройке развертывания большой модели GLM-4»!
boy illustration
7B превышает десятки миллиардов, aiXcoder-7B с открытым исходным кодом Пекинского университета — это самая мощная модель большого кода, лучший выбор для корпоративного развертывания.
boy illustration
Используйте модель Huggingface, чтобы заменить интерфейс внедрения OpenAI в китайской среде.
boy illustration
Оригинальные улучшения YOLOv8: несколько новых улучшений | Сохранение исходной информации — алгоритм отделяемой по глубине свертки (MDSConv) |
boy illustration
Второй пилот облачной разработки | Быстро поиграйте со средствами разработки на базе искусственного интеллекта
boy illustration
Бесшовная интеграция, мгновенный интеллект [1]: платформа больших моделей Dify-LLM, интеграция с нулевым кодированием и встраивание в сторонние системы, более 42 тысяч звезд, чтобы стать свидетелями эксклюзивных интеллектуальных решений.
boy illustration
Решенная Ошибка | Загрузка PyTorch медленная: TimeoutError: [Errno 110] При загрузке факела истекло время ожидания — Cat Head Tiger
boy illustration
Brother OCR, библиотека с открытым исходным кодом для Python, которая распознает коды проверки.
boy illustration
Новейшее подробное руководство по загрузке и использованию последней демонстрационной версии набора данных COCO.
boy illustration
Выпущен отчет о крупной модели финансовой отрасли за 2023 год | Полный текст включен в загрузку |
boy illustration
Обычные компьютеры также могут работать с большими моделями, и вы можете получить личного помощника с искусственным интеллектом за три шага | Руководство для начинающих по локальному развертыванию LLaMA-3
boy illustration
Одной статьи достаточно для анализа фактора транскрипции SCENIC на Python (4)
boy illustration
Бросая вызов ограничениям производительности небольших видеокарт, он научит вас запускать большие модели глубокого обучения с ограниченными ресурсами, а также предоставит полное руководство по оценке и эффективному использованию памяти графического процессора!
boy illustration
Команда Fudan NLP опубликовала 80-страничный обзор крупномасштабных модельных агентов, в котором в одной статье представлен обзор текущего состояния и будущего агентов ИИ.
boy illustration
[Эксклюзив] Вы должны знать о новой функции JetBrains 2024.1 «Полнострочное завершение кода», чтобы решить вашу путаницу!
boy illustration
Краткое изложение базовых знаний о регистрации изображений 1.0
boy illustration
Новейшее подробное руководство по установке и использованию библиотеки cv2 (OpenCV, opencv-python) в Python.
boy illustration
Легко создайте локальную базу знаний для крупных моделей на основе Ollama+AnythingLLM.
boy illustration
[Решено] ошибка установки conda. Среда решения: не удалось выполнить первоначальное зависание решения. Повторная попытка с помощью файла (графическое руководство).
boy illustration
Одна статья поможет вам понять RAG (Retrival Enhanced Generation) | Введение в концепцию и теорию + практику работы с кодом (включая исходный код).
boy illustration
Эволюция архитектуры шлюза облачной разработки
boy illustration
Docker и Kubernetes [Разработка контейнерных приложений с помощью Python]