Цель:Понимание формата источников данных и создание данных моделирования
путь
осуществлять
Формат данных
время сообщения | Ник отправителя | Счет отправителя | Пол отправителя | IP-адрес отправителя | система отправителя | Модель мобильного телефона отправителя | Формат сети отправителя | Отправитель GPS | Ник получателя | IP-адрес получателя | Счет получателя | Система получателя | Модель мобильного телефона получателя | Формат сети получателя | GPS получателя | Пол получателя | Тип сообщения | Расстояние между двумя сторонами | информация |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
msg_time | sender_nickyname | sender_account | sender_sex | sender_ip | sender_os | sender_phone_type | sender_network | sender_gps | receiver_nickyname | receiver_ip | receiver_account | receiver_os | receiver_phone_type | receiver_network | receiver_gps | receiver_sex | msg_type | distance | message |
2020/05/08 15:11:33 | Гу Бойи | 14747877194 | мужской | 48.147.134.255 | Android 8.0 | Сяоми Редми К30 | 4G | 94.704577,36.247553 | Лейю | 97.61.25.52 | 17832829395 | IOS 10.0 | Apple iPhone 10 | 4G | 84.034145,41.423804 | женский | TEXT | 77.82KM | Блуждание по краям света,Пастух переплывает реку. Поклоняюсь перед троном Будды,Я просто хочу провести сто лет вместе. |
Генерация данных
Создать исходный каталог файлов
mkdir /export/data/momo_init
Загрузите программу данных моделирования
cd /export/data/momo_init
rz
Создать каталог данных моделирования
mkdir /export/data/momo_data
Запустите программу для генерации данных
грамматика
java -jar /export/data/momo_init/MoMo_DataGen.jar оригинальныйданныепуть моделированиеданныепуть Случайно генерировать интервал данных, мс время
Тест: генерировать фрагмент данных каждые 500 мс.
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
500
Результат: создается файл данных моделирования MOMO_DATA.dat, а разделитель полей в каждой части данных равен \001.
краткое содержание
Цель:Обзор базового использования и реализации установки Flume. лотка тест
путь
осуществлять
Обзор флюма
Установка лотка
Загрузить установочный пакет
cd /export/software/
rz
Разархивируйте и установите
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
cd /export/server
mv apache-flume-1.9.0-bin flume-1.9.0-bin
Изменить конфигурацию
#Интегрируйте HDFS и скопируйте файлы конфигурации HDFS
cd /export/server/flume-1.9.0-bin
cp /export/server/hadoop/etc/hadoop/core-site.xml ./conf/
#Изменить переменные среды Flume
cd /export/server/flume-1.9.0-bin/conf/
mv flume-env.sh.template flume-env.sh
vim flume-env.sh
#Изменить строку 22
export JAVA_HOME=/export/server/jdk1.8.0_65
#Изменить строку 34
export HADOOP_HOME=/export/server/hadoop-3.3.0
Удалите собственный пакет гуавы Flume и замените его пакетом Hadoop.
cd /export/server/flume-1.9.0-bin
rm -rf lib/guava-11.0.2.jar
cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
Создать каталог
cd /export/server/flume-1.9.0-bin
#Каталог хранения файлов конфигурации программы
mkdir usercase
#Taildir Каталог хранения данных в юанях
mkdir position
тест Флюма
Требования: собирать данные чата и писать в HDFS.
анализировать
развивать
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
# define a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#Указываем файл записи метаданных
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json
#Преобразуйте все источники данных, которые необходимо отслеживать, в группу
a1.sources.s1.filegroups = f1
#Укажите, кто такой f1: контролировать все файлы в каталоге
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#Укажите, что заголовок данных, собранных f1, содержит пару KV
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#define k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d
a1.sinks.k1.hdfs.fileType = DataStream
#Укажите создание файлов по времени, обычно закрытых
a1.sinks.k1.hdfs.rollInterval = 0
#Укажите размер файла для создания файла, обычно 120 ~ Количество байтов, соответствующее 125M
a1.sinks.k1.hdfs.rollSize = 102400
#Указываем количество событий для создания файла, обычно закрытого
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = momo
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
Запустить HDFS
start-dfs.sh
Запустить Флюм
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
Запуск смоделированных данных
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
100
Посмотреть результаты
краткое содержание
Цель:Пример реализации программы сбора Flume
путь
осуществлять
нуждатьсяанализировать
Требования: собирать данные чата и записывать их в Kafka в режиме реального времени.
Source:taildir
Channel:mem
Sink:Kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
программаразвивать
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
# define a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#Указываем файл записи метаданных
a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json
#Преобразуйте все источники данных, которые необходимо отслеживать, в группу
a1.sources.s1.filegroups = f1
#Укажите, кто такой f1: контролировать все файлы в каталоге
a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
#Укажите, что заголовок данных, собранных f1, содержит пару KV
a1.sources.s1.headers.f1.type = momo
a1.sources.s1.fileHeader = true
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
#define k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = MOMO_MSG
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 100
#bound
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
Тестовая реализация
Запустить Кафку
start-zk-all.sh
start-kafka.sh
Создать тему
kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
перечислять
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
Начать потребитель
kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
Запустите программу Flume
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
Запустить данные моделирования
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
50
Наблюдения
краткое содержание