Привет всем, я брат Лэй.
На работе я столкнулся с проектом, который использовал Disruptor в качестве очереди сообщений. Вы правильно прочитали, это не Kafka и не RabbitMQ. Самое большое преимущество Disruptor в том, что он быстрый и имеет открытый исходный код.
1、 Disruptor — это высокопроизводительная очередь, разработанная LMAX, британской компанией, занимающейся торговлей иностранной валютой. Первоначальной целью исследований и разработок было решение проблемы задержки очереди памяти (в тесте производительности выяснилось, что она находится в очереди). того же порядка, что и операция ввода-вывода). Система, разработанная на основе Disruptor, может поддерживать однопоточную обработку. Имея 6 миллионов операций в секунду, она привлекла внимание отрасли после выступления на QCon в 2010 году; 2、 Disruptor — это платформа Java с открытым исходным кодом, предназначенная для достижения максимально возможной пропускной способности (TPS) и минимально возможной задержки при решении проблемы производитель-потребитель (PCP); 3、 С функциональной точки зрения Disruptor реализует функцию «очереди», и это ограниченная очередь, поэтому сценарий его применения, естественно, является сценарием применения модели «производитель-потребитель»; 4、 Disruptor является ключевым компонентом онлайн-торговой платформы LMAX. Платформа LMAX использует эту структуру для обработки заказов со скоростью 6 миллионов TPS. Помимо финансовой сферы, Disruptor может использоваться и в других общих приложениях. Улучшения производительности; 5、 Фактически, Disruptor — это не столько платформа, сколько идея дизайна. Для программ с такими элементами, как «параллелизм, буферы, модели производитель-потребитель и обработка транзакций», Disruptor предлагает существенные решения для повышения производительности (TPS); 6、 Домашняя страница Disruptor на GitHub: https://github.com/LMAX-Exchange/disruptor;
Давайте начнем с понимания основных концепций Disruptor, чтобы понять, как он работает. Представленная ниже концептуальная модель является одновременно объектом предметной области и основным объектом, сопоставленным с реализацией кода.
Кроме того, если вы планируете пройти собеседование и сменить работу в ближайшем будущем, рекомендуется ответить на вопросы онлайн на ddkk.com, охватывая более 10 000 вопросов для собеседования по Java, охватывая почти все основные вопросы технических собеседований и наиболее полные 500 наборов. существующих на рынке стеков технологий, представляющих собой высококачественные продукты. Серия учебных пособий, доступных бесплатно.
Как следует из названия, это кольцевой буфер. RingBuffer раньше был основным объектом Disruptor, но, начиная с версии 3.0, его обязанности были упрощены и теперь он отвечает только за хранение и обновление данных (событий), которыми обмениваются через Disruptor. В некоторых более сложных сценариях приложений кольцевой буфер можно полностью заменить пользовательской реализацией.
Обмениваемые через него данные (события) нумеруются и управляются посредством последовательного возрастания порядковых номеров, причем обработка данных (событий) всегда производится инкрементально по порядковому номеру. Последовательность используется для отслеживания хода обработки определенного обработчика событий (RingBuffer/Consumer). Хотя AtomicLong также можно использовать для определения прогресса, определение Sequence как ответственного за эту проблему имеет еще одну цель: предотвратить проблему ложного совместного использования кэша ЦП (Flase Sharing) между различными последовательностями. (Примечание: это один из ключевых моментов Disruptor для достижения высокой производительности. В Интернете уже есть много представлений о проблеме псевдосовместного использования, поэтому я не буду здесь вдаваться в подробности).
Sequencer — это истинное сердце Disruptor. Этот интерфейс имеет два класса реализации: SingleProducerSequencer и MultiProducerSequencer, которые определяют алгоритмы параллелизма для быстрой и правильной передачи данных между производителями и потребителями.
Используется для хранения ссылок на основную опубликованную последовательность RingBuffer и последовательности других потребителей, от которых зависит потребитель. Барьер последовательности также определяет логику, которая определяет, есть ли у потребителя дополнительные события для обработки.
Определите стратегию того, как Потребитель ждет следующего события. (Примечание: Disruptor определяет множество различных стратегий, обеспечивающих разную производительность для разных сценариев)
Кроме того, если вы планируете пройти собеседование и сменить работу в ближайшем будущем, рекомендуется ответить на вопросы онлайн на ddkk.com, охватывая более 10 000 вопросов для собеседования по Java, охватывая почти все основные вопросы технических собеседований и наиболее полные 500 наборов. существующих на рынке стеков технологий, представляющих собой высококачественные продукты. Серия учебных пособий, доступных бесплатно.
В семантике Disruptor данные, которыми обмениваются производители и потребители, называются событиями. Это не конкретный тип, определенный Disruptor, а определяемый и указанный пользователем Disruptor.
EventProcessor хранит последовательность конкретного потребителя (Consumer) и предоставляет цикл событий (Event Loop) для вызова реализации обработки событий.
Интерфейс обработки событий, определенный Disruptor, реализуется пользователями и используется для обработки событий. Это реальная реализация Consumer.
То есть производитель обычно ссылается на пользовательский код, который вызывает Disruptor для публикации событий. Disruptor не определяет конкретный интерфейс или тип.
Выполнив следующие 8 шагов, вы можете получить Disruptor Get home:
1. Добавьте зависимость pom.xml
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
2. Модель тела сообщения
/**
* тело сообщения
*/
@Data
public class MessageModel {
private String message;
}
3. Создайте EventFactory
public class HelloEventFactory implements EventFactory<MessageModel> {
@Override
public MessageModel newInstance() {
return new MessageModel();
}
}
4. Создайте EventHandler-потребитель
@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
@Override
public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
try {
//Остановимся здесь на 1000 мсда, чтобы подтвердить, что сообщение потребления да асинхронно
Thread.sleep(1000);
log.info("Начинается обработка сообщения потребителя");
if (event != null) {
log.info("потребитель Информация о потреблении да: {}", event);
}
} catch (Exception e) {
log.info("потребителю не удалось обработать сообщение");
}
log.info("Обработка сообщения потребителя завершена");
}
}
5. Создайте BeanManager
/**
* Получить экземпляр объекта
*/
@Component
public class BeanManager implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext; }
public static Object getBean(String name) {
return applicationContext.getBean(name);
}
public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
}
6. Создайте MQManager
Кроме того, если вы планируете пройти собеседование и сменить работу в ближайшем будущем, рекомендуется ответить на вопросы онлайн на ddkk.com, охватывая более 10 000 вопросов для собеседования по Java, охватывая почти все основные вопросы технических собеседований и наиболее полные 500 наборов. существующих на рынке стеков технологий, представляющих собой высококачественные продукты. Серия учебных пособий, доступных бесплатно.
@Configuration
public class MQManager {
@Bean("messageModel")
public RingBuffer<MessageModel> messageModelRingBuffer() {
//Определяем пул потоков, используемый для обработки событий, Disruptor запускает обработку потребителя через поток, предоставляемый java.util.concurrent.ExecutorSerivce.
ExecutorService executor = Executors.newFixedThreadPool(2);
//Указываем фабрику событий
HelloEventFactory factory = new HelloEventFactory();
//Укажите размер байта кольцевого буфера, который должен быть равен 2 в N-й степени (он может преобразовать операцию по модулю в битовую операцию для повышения эффективности), в противном случае это повлияет на эффективность
int bufferSize = 1024 * 256;
//Однопоточный режим для повышения производительности
Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
ProducerType.SINGLE, new BlockingWaitStrategy());
//Установка события бизнес-процессора ---потребитель
disruptor.handleEventsWith(new HelloEventHandler());
// Начать ветку дезинтегратора
disruptor.start();
//Получаем кольцо кольцевого буфера, используемое для получения события, созданного производителем
RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}
7. Создайте Mqservice и создайте класс-производитель реализации.
public interface DisruptorMqService {
/**
* информация
* @param message
*/
void sayHelloMq(String message);
}
@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
@Autowired
private RingBuffer<MessageModel> messageModelRingBuffer;
@Override
public void sayHelloMq(String message) {
log.info("record the message: {}",message);
//Получаем индекс следующего слота Event
long sequence = messageModelRingBuffer.next();
try {
//Заполняем событие данными
MessageModel event = messageModelRingBuffer.get(sequence);
event.setMessage(message);
log.info("ПрошлоеInformationCheredДобавить Информация:{}", event);
} catch (Exception e) {
log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
} finally {
// Публикуем событие, активируем наблюдателя для потребления и передаем последовательность пользователю изменения
//Обратите внимание, что последний метод публикации должен быть помещен наконец, чтобы гарантировать, что он должен быть вызван; если последовательность запроса не отправлена, он заблокирует последующие операции публикации или других производителей.
messageModelRingBuffer.publish(sequence);
}
}
}
8. Создание тестовых классов и методов
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {
@Autowired
private DisruptorMqService disruptorMqService;
/**
* Disruptor используется внутри проекта для выполнения информацииочереди.
* @throws Exception
*/
@Test
public void sayHelloMqTest() throws Exception{
disruptorMqService.sayHelloMq("информацияприезжать,Hello world!");
log.info("Информацияочередь отправлена");
//Остановимся здесь на 2000 мсда, чтобы убедиться, что да обрабатывает информацию асинхронно
Thread.sleep(2000);
}
}
Результаты тестового запуска
2020-04-05 14:31:18.543 INFO 7274 --- [ main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl : record the message: информацияприезжать,Hello world!
2020-04-05 14:31:18.545 INFO 7274 --- [ main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl : Прошлоеинформацияочередь Добавитьинформация:MessageModel(message=информацияприезжать,Hello world!)
2020-04-05 14:31:18.545 INFO 7274 --- [ main] c.e.utils.demo.DemoApplicationTests : информация очередьбыла отправлена
2020-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : потребительиметь дело синформацияначинать
2020-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : потребительинформация о потреблениида:MessageModel(message=информацияприезжать,Hello world!)
2020-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : потребительиметь дело синформация Заканчивать
На самом деле генератор -> потребитель Шаблон очень распространен, и описанных выше эффектов можно легко достичь с помощью некоторых очередей сообщений. Разница в том, что Disruptor Он реализован в виде очереди в памяти и не блокируется. Это также Disruptor Причина, почему это эффективно.