Flume собирает скрытые данные о поведении на стороне приложения в HDFS
Flume собирает скрытые данные о поведении на стороне приложения в HDFS

Собрать фон

Эта статья взята из хранилища данных электронной коммерции Shang Silicon Valley 6.0.

Когда мы собираем данные журнала сервера журналов, мы сначала передаем данные в Kafka через Flumel (чтобы облегчить последующую обработку в реальном времени), а затем собираем данные в HDFS через Flume. Затем соберите данные из Kafka в HDFS. В это время возникнет проблема дрейфа нулевой точки. (Когда данные около 24:00 в первый день поступают из Kafka и собираются Flume, временная метка времени в заголовке [текущее записанное время не является рабочим временем] станет временем следующего дня из-за задержки) и мы находятся в HDFSSink. Путь времени — это временная метка из заголовка, поэтому мы создаем перехватчик для обработки этой ситуации. Таким образом, данные точно собираются в каталог дат в HDFS.

Коллектор лотка 1

file_to_kafka.conf

Этот сборщик собирает скрытые данные о поведении сервера журналов в Kafka.

Поскольку KafkaChannel может собирать данные непосредственно в Kafka, мы больше не используем приемник для обработки.

Язык кода:shell
копировать
vim file_to_kafka.conf

#Определить компонент
a1.sources = r1
a1.channels = c1

#Настроить источник
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

#Настроить канал
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#собрать 
a1.sources.r1.channels = c1

Скрипт запуска коллектора 1

Язык кода:shell
копировать
# Создать скрипт
vim f1.sh

#!/bin/bash

echo " --------запускать hadoop102 Собрать лоток -------"
nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &

# Добавить разрешения
chmod 777 ./f1.sh

Коллектор лотка 2

kafka_to_hdfs_log.conf

Этот сборщик собирает данные Kafka в HDFS. Мы добавляем перехватчик, чтобы обеспечить точность данных.

Язык кода:shell
копировать
#Определить компонент
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#Настроить источник1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
# Перехватчик
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.example.TimestampInterceptor$Builder

#Настроить канал
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#Настроить раковину
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#Управление типом выходного файла
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#собрать 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Скрипт запуска Collector 2

Язык кода:shell
копировать
# Создать скрипт
vim f2.sh

#!/bin/bash

echo " --------запускать hadoop102 Поток данных журнала -------"
nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &

# Добавить разрешения
chmod 777 ./f2.sh

Перехватчик лотка

Формат данных журнала следующий:

Язык кода:json
копировать
{
  "common": {
    "ar": "12",
    "ba": "realme",
    "ch": "wandoujia",
    "is_new": "1",
    "md": "realme Neo2",
    "mid": "mid_411",
    "os": "Android 13.0",
    "sid": "4f34596c-ca8f-434c-a8d5-356b944eb0d6",
    "vc": "v2.1.134"
  },
  "start": {
    "entry": "icon",
    "loading_time": 12974,
    "open_ad_id": 16,
    "open_ad_ms": 5415,
    "open_ad_skip_ms": 0
  },
  "ts": 1654620592548
}

pom-файл

Если maven не может быть загружен, вы можете принудительно обновить зависимости в кеше в корневом каталоге проекта: mvn clean install -U

Язык кода:xml
копировать
<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.10.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

TimestampInterceptor

Принцип коллектора:

Из-за проблемы дрейфа нулевой точки мы настроили перехватчик для перехвата каждого события. На данный момент инкапсулированные данные поступают из Kafka, а данные Kafka поступают с сервера журналов. Нам нужны данные ts тела. , который используется для настройки пути коллектора Flume. (/%Y-%m-%d) Итак, нам нужно получить эти данные, обработать их, а затем загрузить в заголовок.

Язык кода:java
копировать
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
    //1. Получаем данные заголовка и тела.
    Map<String, String> headers = event.getHeaders();
    String log = new String(event.getBody(), StandardCharsets.UTF_8);

    try {
        //2. Преобразуем тип данных body в тип jsonObject (удобно получать данные)
        JSONObject jsonObject = JSONObject.parseObject(log);

        //3. Заменить поле времени временной метки в заголовке на временную метку, сгенерированную журналом (решаем проблему дрейфа данных)
        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);

        return event;
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}

@Override
public List<Event> intercept(List<Event> list) {
    Iterator<Event> iterator = list.iterator();
    while (iterator.hasNext()) {
        Event event = iterator.next();
        if (intercept(event) == null) {
            iterator.remove();
        }
    }
    return list;
}

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

Запустить канал сбора

Язык кода:shell
копировать
# Запуск коллектора лотка
f1.sh
f2.sh

# Запустить сервер журналов
java -jar /opt/module/applog/gmall-remake-mock-2023-05-15-3.jar

Проверить результаты

boy illustration
Углубленный анализ переполнения памяти CUDA: OutOfMemoryError: CUDA не хватает памяти. Попыталась выделить 3,21 Ги Б (GPU 0; всего 8,00 Ги Б).
boy illustration
[Решено] ошибка установки conda. Среда решения: не удалось выполнить первоначальное зависание. Повторная попытка с помощью файла (графическое руководство).
boy illustration
Прочитайте нейросетевую модель Трансформера в одной статье
boy illustration
.ART Теплые зимние предложения уже открыты
boy illustration
Сравнительная таблица описания кодов ошибок Amap
boy illustration
Уведомление о последних правилах Points Mall в декабре 2022 года.
boy illustration
Даже новички могут быстро приступить к работе с легким сервером приложений.
boy illustration
Взгляд на RSAC 2024|Защита конфиденциальности в эпоху больших моделей
boy illustration
Вы используете ИИ каждый день и до сих пор не знаете, как ИИ дает обратную связь? Одна статья для понимания реализации в коде Python общих функций потерь генеративных моделей + анализ принципов расчета.
boy illustration
Используйте (внутренний) почтовый ящик для образовательных учреждений, чтобы использовать Microsoft Family Bucket (1T дискового пространства на одном диске и версию Office 365 для образовательных учреждений)
boy illustration
Руководство по началу работы с оперативным проектом (7) Практическое сочетание оперативного письма — оперативного письма на основе интеллектуальной системы вопросов и ответов службы поддержки клиентов
boy illustration
[docker] Версия сервера «Чтение 3» — создайте свою собственную программу чтения веб-текста
boy illustration
Обзор Cloud-init и этапы создания в рамках PVE
boy illustration
Корпоративные пользователи используют пакет регистрационных ресурсов для регистрации ICP для веб-сайта и активации оплаты WeChat H5 (с кодом платежного узла версии API V3)
boy illustration
Подробное объяснение таких показателей производительности с высоким уровнем параллелизма, как QPS, TPS, RT и пропускная способность.
boy illustration
Удачи в конкурсе Python Essay Challenge, станьте первым, кто испытает новую функцию сообщества [Запускать блоки кода онлайн] и выиграйте множество изысканных подарков!
boy illustration
[Техническая посадка травы] Кровавая рвота и отделка позволяют вам необычным образом ощипывать гусиные перья! Не распространяйте информацию! ! !
boy illustration
[Официальное ограниченное по времени мероприятие] Сейчас ноябрь, напишите и получите приз
boy illustration
Прочтите это в одной статье: Учебник для няни по созданию сервера Huanshou Parlu на базе CVM-сервера.
boy illustration
Cloud Native | Что такое CRD (настраиваемые определения ресурсов) в K8s?
boy illustration
Как использовать Cloudflare CDN для настройки узла (CF самостоятельно выбирает IP) Гонконг, Китай/Азия узел/сводка и рекомендации внутреннего высокоскоростного IP-сегмента
boy illustration
Дополнительные правила вознаграждения амбассадоров акции в марте 2023 г.
boy illustration
Можно ли открыть частный сервер Phantom Beast Palu одним щелчком мыши? Супер простой урок для начинающих! (Прилагается метод обновления сервера)
boy illustration
[Играйте с Phantom Beast Palu] Обновите игровой сервер Phantom Beast Pallu одним щелчком мыши
boy illustration
Maotouhu делится: последний доступный внутри страны адрес склада исходного образа Docker 2024 года (обновлено 1 декабря)
boy illustration
Кодирование Base64 в MultipartFile
boy illustration
5 точек расширения SpringBoot, супер практично!
boy illustration
Глубокое понимание сопоставления индексов Elasticsearch.
boy illustration
15 рекомендуемых платформ разработки с нулевым кодом корпоративного уровня. Всегда найдется та, которая вам понравится.
boy illustration
Аннотация EasyExcel позволяет экспортировать с сохранением двух десятичных знаков.