Снимок — это функция, предоставляемая Elasticsearch для резервного копирования данных кластера в удаленный репозиторий. Например, выполните резервное копирование данных в S3, HDFS, общую файловую систему и т. д.
• Функция моментального снимка обеспечивает возможности резервного копирования и восстановления данных, гарантируя, что данные не будут потеряны из-за непредвиденных сбоев.
• Вы можете использовать функцию моментального снимка для перемещения данных из Elasticsearch Кластерная миграция на другой кластер.
• Сохраняйте исторические данные в хранилище, освобождая место для хранения в вашем онлайн-кластере.
PUT /_snapshot/my_backup
{
"type": "fs",
"settings": {
"location": "/mount/backups",
"compress": true
}
}
PUT /_snapshot/my_backup/snapshot_1
{
"indices": "index_1,index_2",
"ignore_unavailable": true,
"include_global_state": false
}
Описание параметра:
• индексы: список индексов для резервного копирования.
• ignore_unavailable: игнорировать ли индекс, если он недоступен.
• include_global_state: включать ли глобальное состояние кластера.
POST /_snapshot/my_backup/snapshot_1/_restore
{
"indices": "index_1,index_2",
"ignore_unavailable": true,
"include_global_state": false,
"rename_pattern": "index_(.+)",
"rename_replacement": "restored_index_$1"
}
Описание параметра:
• индексы: список индексов, подлежащих восстановлению.
• ignore_unavailable: игнорировать ли индекс, если он недоступен.
• include_global_state: включать ли глобальное состояние кластера.
• rename_pattern и rename_replacement: индексы, используемые для восстановления переименования.
#Просмотреть все снимки под складом
GET /_snapshot/my_backup/_all
#Просмотр конкретного снимка
GET /_snapshot/my_backup/snapshot1
#Посмотреть снимоксостояние
GET /_snapshot/my_backup/snapshot1/_status
DELETE /_snapshot/my_backup/snapshot_1
• Тип и конфигурация репозиторий следует выбирать на основе реальных потребностей и среды. Например, использование репозитория S3 требует настройки учетных данных для доступа и сегментов хранилища.
• Создать и Восстановить Ресурсы кластера будут потребляться при съемке, и эти операции следует выполнять в периоды низкой нагрузки, чтобы избежать влияния на онлайн-сервисы.
• Создать снимок Elasticsearch обеспечивает согласованность данных. Даже если в процессе создания снимка выполняются операции записи данных, это не повлияет на согласованность снимка.
• Убедитесь, что разрешения на доступ к репозиторию и разрешения на операции с моментальными снимками настроены правильно для предотвращения несанкционированных операций.
С момента инициирования запроса на создание моментального снимка до завершения резервного копирования моментального снимка он условно делится на следующие этапы:
Детали логики выполнения задачи резервного копирования моментальных снимков в Elasticsearch показаны на рисунке ниже:
CreateSnapshotRequest.java
В основном используется для создания снимокпросить,сдержанныйrepository
уникальный параметр,и соглашения об именах.
public CreateSnapshotRequest(String repository, String snapshot) {
this.snapshot = snapshot;
this.repository = repository;
}
public CreateSnapshotRequest(StreamInput in) throws IOException {
super(in);
snapshot = in.readString();
repository = in.readString();
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
if (in.getTransportVersion().before(SETTINGS_IN_REQUEST_VERSION)) {
readSettingsFromStream(in);
}
featureStates = in.readStringArray();
includeGlobalState = in.readBoolean();
waitForCompletion = in.readBoolean();
partial = in.readBoolean();
userMetadata = in.readMap();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(snapshot);
out.writeString(repository);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
if (out.getTransportVersion().before(SETTINGS_IN_REQUEST_VERSION)) {
Settings.EMPTY.writeTo(out);
}
out.writeStringArray(featureStates);
out.writeBoolean(includeGlobalState);
out.writeBoolean(waitForCompletion);
out.writeBoolean(partial);
out.writeGenericMap(userMetadata);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (snapshot == null) {
validationException = addValidationError("snapshot is missing", validationException);
}
if (repository == null) {
validationException = addValidationError("repository is missing", validationException);
}
if (indices == null) {
validationException = addValidationError("indices is null", validationException);
} else {
for (String index : indices) {
if (index == null) {
validationException = addValidationError("index is null", validationException);
break;
}
}
}
if (indicesOptions == null) {
validationException = addValidationError("indicesOptions is null", validationException);
}
if (featureStates == null) {
validationException = addValidationError("featureStates is null", validationException);
}
final int metadataSize = metadataSize(userMetadata);
if (metadataSize > MAXIMUM_METADATA_BYTES) {
validationException = addValidationError(
"metadata must be smaller than 1024 bytes, but was [" + metadataSize + "]",
validationException
);
}
return validationException;
}
@Override
public String toString() {
return "CreateSnapshotRequest{"
+ "snapshot='"
+ snapshot
+ '\''
+ ", repository='"
+ repository
+ '\''
+ ", indices="
+ (indices == null ? null : Arrays.asList(indices))
+ ", indicesOptions="
+ indicesOptions
+ ", featureStates="
+ Arrays.asList(featureStates)
+ ", partial="
+ partial
+ ", includeGlobalState="
+ includeGlobalState
+ ", waitForCompletion="
+ waitForCompletion
+ ", masterNodeTimeout="
+ masterNodeTimeout
+ ", metadata="
+ userMetadata
+ '}';
}
CreateSnapshotRequest
Классы наследуются отMasterNodeRequest
добрый,В то же время при наследованииCreateSnapshotRequest
Передается как общий параметр。имя таблицы Создать Запросы снимков должны обрабатываться главным узлом. Вызов родительского класса MasterNodeRequest
Метод строительства,Чтение из входного потокаиинициализировать родительский элементдобрыйполя。проходитьпозвонить родителюдобрыйизwriteTo()
метод Снимокпроситьв Поля записываются в выходной поток。в то же времяпроходитьvalidate()
Правильный путьsnapshot
,repository
,indices
,indicesOptions
,featureStates
,metadataSize
Провести соответствующую проверку。проходитьtoString()
метод завершенrequestСтроительство。
Описание поля:
snapshot
: Считывает имя снимка из входного потока.repository
: Считывает имя репозитория из входного потока.indices
: Считайте индексированный массив из входного потока.indicesOptions
: Считайте параметры индекса из входного потока.if (in.getTransportVersion().before(SETTINGS_IN_REQUEST_VERSION)) { readSettingsFromStream(in); }:
Если транспортная версия предшествует указанной версии, прочтите настройки из входного потока.featureStates
: Считывает массив статусов объектов из входного потока.includeGlobalState
: Считывает логическое значение из входного потока, следует ли включать глобальное состояние.waitForCompletion
: Логическое значение, указывающее, следует ли ждать завершения при чтении из входного потока.partial
: Считывает логическое значение из входного потока, указывающее, является ли снимок частичным.userMetadata
: Считайте метаданные пользователя из входного потока.SnapshotsService.java
Этот класс в основном отвечает за создание. сопутствующие услуги. Эта услуга работает через Create снимок,Удалить makeВсе шаги, выполняемые на главном узле.
public static final Setting<Integer> MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING = Setting.intSetting(
"snapshot.max_concurrent_operations",
1000,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
private volatile int maxConcurrentOperations;
в этом классе,Сначала мы видим, что создали файл с именемMAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING
статическая константа,Диапазон от 1 до 1000. Значение по умолчанию — 1000. Этот параметр используется для управления максимальным количеством одновременных задач создания снимков, которые могут выполняться одновременно. Если мы не укажем значение этого параметра в операторе моментального снимка,Тогда снимок загрузит значение этого параметра по умолчанию при выполнении построения.
public SnapshotsService(
Settings settings,
ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
RepositoriesService repositoriesService,
TransportService transportService,
ActionFilters actionFilters,
SystemIndices systemIndices
) {
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.repositoriesService = repositoriesService;
this.threadPool = transportService.getThreadPool();
this.transportService = transportService;
// The constructor of UpdateSnapshotStatusAction will register itself to the TransportService.
this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(
transportService,
clusterService,
threadPool,
actionFilters,
indexNameExpressionResolver
);
if (DiscoveryNode.isMasterNode(settings)) {
// addLowPriorityApplier to make sure that Repository will be created before snapshot
clusterService.addLowPriorityApplier(this);
maxConcurrentOperations = MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, i -> maxConcurrentOperations = i);
}
this.systemIndices = systemIndices;
this.masterServiceTaskQueue = clusterService.createTaskQueue("snapshots-service", Priority.NORMAL, new SnapshotTaskExecutor());
}
В этом конструкторе инициализируются некоторые ключевые параметры, участвующие в процессах создания, резервного копирования и удаления моментальных снимков.
• Settings settings: Элементы конфигурации, включая параметры, связанные с конфигурацией кластера.
• ClusterService clusterService: Службы кластеров обеспечивают состояние кластера и операции на уровне кластера.
• IndexNameExpressionResolver indexNameExpressionResolver: Анализатор выражений имен индексов, используемый для анализа выражений имен индексов.
• RepositoriesService repositoriesService: Служба складов управляет созданием хранилищ моментальных снимков и доступом к ним.
• TransportService transportService: Транспортная служба, отвечающая за связь между узлами.
• ActionFilters actionFilters: Фильтр действий управляет логикой фильтрации запросов операций.
• SystemIndices systemIndices: Системный индекс, управляет индексами системного уровня.
public void createSnapshot(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
final String repositoryName = request.repository();
final String snapshotName = IndexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
validate(repositoryName, snapshotName);
// TODO: create snapshot UUID in CreateSnapshotRequest and make this operation idempotent to cleanly deal with transport layer
// retries
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
Repository repository = repositoriesService.repository(request.repository());
if (repository.isReadOnly()) {
listener.onFailure(new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"));
return;
}
submitCreateSnapshotRequest(request, listener, repository, new Snapshot(repositoryName, snapshotId), repository.getMetadata());
}
проходитьcreateSnapshot()
Метод, используемый для инициализации процесса моментального снимка。В этом методе мы видим, что дляrepositoryName
,snapshotName
Получить и проанализировать,Оценка разрешений для хранилища снимков,Если разрешения на складReadOnlyзатем вернитесь напрямую。Звонок после завершения всех проверокsubmitCreateSnapshotRequest()
методзафиксировать снимокпросить。
private void submitCreateSnapshotRequest(
CreateSnapshotRequest request,
ActionListener<Snapshot> listener,
Repository repository,
Snapshot snapshot,
RepositoryMetadata initialRepositoryMetadata
) {
repository.getRepositoryData(
listener.delegateFailure(
(l, repositoryData) -> masterServiceTaskQueue.submitTask(
"create_snapshot [" + snapshot.getSnapshotId().getName() + ']',
new CreateSnapshotTask(repository, repositoryData, l, snapshot, request, initialRepositoryMetadata),
request.masterNodeTimeout()
)
)
);
}
submitCreateSnapshotRequest()
Основное использование методов заключается в отправке Создать просьба Сэта. После правильного получения всех параметров будет создан новый резервный снимок SnapshotTask.
В основном он включает в себя следующие ключевые этапы:
1. Получить данные склада: Сначала все начинается с вызова repository.getRepositoryData для получения данных из хранилища снимков.
2. Реагировать на неудачные ситуации: он использует listener.delegateFailure метод обработки любых возможных условий отказа.
3. Отправить задачу создания снимка: Если данные склада успешно получены, будет создано Задание фотографа было отправлено masterServiceTaskQueue。
• CreateSnapshotRequest request: Создать Объект запроса съемки содержит все параметры, необходимые для операции создания моментального снимка.
• ActionListener listener: Прослушиватель обратного вызова после завершения операции для обработки успеха или неудачи.
• Repository repository: Целевой репозиторий для операций моментальных снимков.
• Snapshot snapshot: Объект моментального снимка, который необходимо создать.
• RepositoryMetadata initialRepositoryMetadata: Начальные метаданные хранилища.
После отправки задачи моментального снимка нам необходимо проанализировать и выполнить запрос моментального снимка. Здесь мы продолжаем смотреть вниз.
private class SnapshotTaskExecutor implements ClusterStateTaskExecutor<SnapshotTask> {
@Override
public ClusterState execute(BatchExecutionContext<SnapshotTask> batchExecutionContext) throws Exception {
final ClusterState state = batchExecutionContext.initialState();
final SnapshotShardsUpdateContext shardsUpdateContext = new SnapshotShardsUpdateContext(batchExecutionContext);
final SnapshotsInProgress initialSnapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
SnapshotsInProgress snapshotsInProgress = shardsUpdateContext.computeUpdatedState();
for (final var taskContext : batchExecutionContext.taskContexts()) {
if (taskContext.getTask()instanceof CreateSnapshotTask task) {
try {
final var repoMeta = state.metadata()
.custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY)
.repository(task.snapshot.getRepository());
if (Objects.equals(task.initialRepositoryMetadata, repoMeta)) {
snapshotsInProgress = createSnapshot(task, taskContext, state, snapshotsInProgress);
} else {
// repository data changed in between starting the task and executing this cluster state update so try again
taskContext.success(
() -> submitCreateSnapshotRequest(
task.createSnapshotRequest,
task.listener,
task.repository,
task.snapshot,
repoMeta
)
);
}
} catch (Exception e) {
taskContext.onFailure(e);
}
}
}
shardsUpdateContext.completeWithUpdatedState(snapshotsInProgress);
if (snapshotsInProgress == initialSnapshots) {
return state;
}
return ClusterState.builder(state).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).build();
}
private SnapshotsInProgress createSnapshot(
CreateSnapshotTask createSnapshotTask,
TaskContext<SnapshotTask> taskContext,
ClusterState currentState,
SnapshotsInProgress snapshotsInProgress
) {
final RepositoryData repositoryData = createSnapshotTask.repositoryData;
final Snapshot snapshot = createSnapshotTask.snapshot;
final String repositoryName = snapshot.getRepository();
final String snapshotName = snapshot.getSnapshotId().getName();
ensureRepositoryExists(repositoryName, currentState);
final Repository repository = createSnapshotTask.repository;
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
ensureSnapshotNameNotRunning(snapshotsInProgress, repositoryName, snapshotName);
validate(repositoryName, snapshotName, currentState);
final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(
SnapshotDeletionsInProgress.TYPE,
SnapshotDeletionsInProgress.EMPTY
);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshotsInProgress, deletionsInProgress);
final CreateSnapshotRequest request = createSnapshotTask.createSnapshotRequest;
// Store newSnapshot here to be processed in clusterStateProcessed
Map<Boolean, List<String>> requestedIndices = Arrays.stream(
indexNameExpressionResolver.concreteIndexNames(currentState, request)
).collect(Collectors.partitioningBy(systemIndices::isSystemIndex));
List<String> requestedSystemIndices = requestedIndices.get(true);
//Проверка системного индекса
if (requestedSystemIndices.isEmpty() == false) {
Set<String> explicitlyRequestedSystemIndices = new HashSet<>(requestedSystemIndices);
explicitlyRequestedSystemIndices.retainAll(Arrays.asList(request.indices()));
if (explicitlyRequestedSystemIndices.isEmpty() == false) {
throw new IllegalArgumentException(
format(
"the [indices] parameter includes system indices %s; to include or exclude system indices from a "
+ "snapshot, use the [include_global_state] or [feature_states] parameters",
explicitlyRequestedSystemIndices
)
);
}
}
List<String> indices = requestedIndices.get(false);
final List<String> requestedStates = Arrays.asList(request.featureStates());
final Set<String> featureStatesSet;
//Проверка статуса запроса
if (request.includeGlobalState() || requestedStates.isEmpty() == false) {
if (request.includeGlobalState() && requestedStates.isEmpty()) {
// If we're including global state and feature states aren't specified, include all of them
featureStatesSet = systemIndices.getFeatureNames();
} else if (requestedStates.size() == 1 && NO_FEATURE_STATES_VALUE.equalsIgnoreCase(requestedStates.get(0))) {
// If there's exactly one value and it's "none", include no states
featureStatesSet = Collections.emptySet();
} else {
// Otherwise, check for "none" then use the list of requested states
if (requestedStates.contains(NO_FEATURE_STATES_VALUE)) {
throw new IllegalArgumentException(
"the feature_states value ["
+ SnapshotsService.NO_FEATURE_STATES_VALUE
+ "] indicates that no feature states should be snapshotted, "
+ "but other feature states were requested: "
+ requestedStates
);
}
featureStatesSet = new HashSet<>(requestedStates);
featureStatesSet.retainAll(systemIndices.getFeatureNames());
}
} else {
featureStatesSet = Collections.emptySet();
}
final Set<SnapshotFeatureInfo> featureStates = new HashSet<>();
final Set<String> systemDataStreamNames = new HashSet<>();
final Set<String> indexNames = new HashSet<>(indices);
for (String featureName : featureStatesSet) {
SystemIndices.Feature feature = systemIndices.getFeature(featureName);
Set<String> featureSystemIndices = feature.getIndexDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
.collect(Collectors.toSet());
Set<String> featureAssociatedIndices = feature.getAssociatedIndexDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
.collect(Collectors.toSet());
Set<String> featureSystemDataStreams = new HashSet<>();
Set<String> featureDataStreamBackingIndices = new HashSet<>();
for (SystemDataStreamDescriptor sdd : feature.getDataStreamDescriptors()) {
List<String> backingIndexNames = sdd.getBackingIndexNames(currentState.metadata());
if (backingIndexNames.size() > 0) {
featureDataStreamBackingIndices.addAll(backingIndexNames);
featureSystemDataStreams.add(sdd.getDataStreamName());
}
}
if (featureSystemIndices.size() > 0 || featureAssociatedIndices.size() > 0 || featureDataStreamBackingIndices.size() > 0) {
featureStates.add(new SnapshotFeatureInfo(featureName, List.copyOf(featureSystemIndices)));
indexNames.addAll(featureSystemIndices);
indexNames.addAll(featureAssociatedIndices);
indexNames.addAll(featureDataStreamBackingIndices);
systemDataStreamNames.addAll(featureSystemDataStreams);
}
indices = List.copyOf(indexNames);
}
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
final Map<String, IndexId> allIndices = new HashMap<>();
for (SnapshotsInProgress.Entry runningSnapshot : snapshotsInProgress.forRepo(repositoryName)) {
allIndices.putAll(runningSnapshot.indices());
}
final Map<String, IndexId> indexIds = repositoryData.resolveNewIndices(indices, allIndices);
final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null);
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(
snapshotsInProgress,
deletionsInProgress,
currentState,
indexIds.values(),
useShardGenerations(version),
repositoryData,
repositoryName
);
if (request.partial() == false) {
Set<String> missing = new HashSet<>();
//Проверка информации, связанной с шардингом
for (Map.Entry<ShardId, ShardSnapshotStatus> entry : shards.entrySet()) {
if (entry.getValue().state() == ShardState.MISSING) {
missing.add(entry.getKey().getIndex().getName());
}
}
if (missing.isEmpty() == false) {
throw new SnapshotException(snapshot, "Indices don't have primary shards " + missing);
}
}
// Создать новую запись снимка
final var newEntry = SnapshotsInProgress.startedEntry(
snapshot,
request.includeGlobalState(),
request.partial(),
indexIds,
CollectionUtils.concatLists(
indexNameExpressionResolver.dataStreamNames(currentState, request.indicesOptions(), request.indices()),
systemDataStreamNames
),
threadPool.absoluteTimeInMillis(),
repositoryData.getGenId(),
shards,
request.userMetadata(),
version,
List.copyOf(featureStates)
);
// Обновить контекст задачии Вернуть новый прогресс снимка
final var res = snapshotsInProgress.withAddedEntry(newEntry);
taskContext.success(() -> {
logger.info("snapshot [{}] started", snapshot);
createSnapshotTask.listener.onResponse(snapshot);
if (newEntry.state().completed()) {
endSnapshot(newEntry, currentState.metadata(), createSnapshotTask.repositoryData);
}
});
return res;
}
}
в это времяElasticsearchЗдесь файл под названиемSnapshotTaskExecutor
из内部добрый,ОсуществленныйClusterStateTaskExecutor<SnapshotTask>
Интерфейс для выполнения задач создания снимков.
существовать Это внутреннеедобрыйсерединаpublic ClusterState execute(BatchExecutionContext<SnapshotTask> batchExecutionContext)
Метод отвечает за обработку отправленных задач снимка.,И выполнить соответствующие операции в зависимости от типа задачи.
В основном он включает в себя следующие этапы:
1. Инициализировать состояние кластера:获取初始из集群состояние。
2. Создать контекст обновления:используется для更新Снимокизсостояние。
3. Получить текущий снимок в процессе:из кластерасостояниесередина Получить текущий снимок в процессеинформация。
4. задачи обработки:Обход контекста задачи,Проверьте, является ли тип задачи CreateSnapshotTask и выполнена ли операция.
• Если метаданные хранилища не изменились, позвоните createSnapshot метод Создать снимок。
• Если метаданные хранилища изменяются, повторите команду Создать. снимокпросить。
5. Обновить статус снимка:Обновить статус снимка,если нет изменений,Вернуться в исходное состояние кластера,В противном случае верните обновленное состояние кластера.
private SnapshotsInProgress createSnapshot(CreateSnapshotTask createSnapshotTask,TaskContext<SnapshotTask> taskContext,ClusterState currentState,SnapshotsInProgress snapshotsInProgress)
Метод, отвечающий за конкретное создание сделать шаги.
В этом методе детально проверяются хранилище моментальных снимков, моментальный снимок, индекс и статус подключаемого модуля. В основном это включает в себя следующие шаги
1. Различные проверки данных, связанных со снимками:Убедитесь, что склад существует、Доступно имя снимка、На средней очистке не происходит.
2. Обработка запрошенного индекса и статуса функции:в соответствии спросить处理需要Снимокиз索引ихарактеристикасостояние。
3. Создать новую запись снимка:Создать новую запись снимка。
4. Обновить контекст задачи:Обновить контекст задачу и вызвать метод ответа прослушивателя при запуске моментального снимка.
5. Вернуть новый прогресс снимка:Возвращает обновленный прогресс снимка。
SnapshotShardsService.java
SnapshotShardsService.java
В основном работает на узлах данных,и контролировать снимки осколков, которые выполняются на этих узлах,Отвечает за управление этими снимками на уровне сегментов.,включая запуск,Хватит ждать действий.
public SnapshotShardsService(
Settings settings,
ClusterService clusterService,
RepositoriesService repositoriesService,
TransportService transportService,
IndicesService indicesService
) {
this.indicesService = indicesService;
this.repositoriesService = repositoriesService;
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = transportService.getThreadPool();
this.remoteFailedRequestDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
if (DiscoveryNode.canContainData(settings)) {
// this is only useful on the nodes that can hold data
clusterService.addListener(this);
}
}
Сначала инициализируется в конструктореindices
,repository
,cluster
,threadPool
Связанные свойства。в то же времянужно вниманиеclusterService.addListener(this);
Действует только на том узле, где хранятся данные.。
private void snapshot(
final ShardId shardId,
final Snapshot snapshot,
final IndexId indexId,
final IndexShardSnapshotStatus snapshotStatus,
Version version,
final long entryStartTime,
ActionListener<ShardSnapshotResult> resultListener
) {
ActionListener.run(resultListener, listener -> {
snapshotStatus.ensureNotAborted();
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
if (indexShard.routingEntry().primary() == false) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
}
if (indexShard.routingEntry().relocating()) {
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating");
}
final IndexShardState indexShardState = indexShard.state();
if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) {
// shard has just been created, or still recovering
throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
}
final Repository repository = repositoriesService.repository(snapshot.getRepository());
Engine.IndexCommitRef snapshotRef = null;
try {
snapshotRef = indexShard.acquireIndexCommitForSnapshot();
snapshotStatus.ensureNotAborted();
repository.snapshotShard(
new SnapshotShardContext(
indexShard.store(),
indexShard.mapperService(),
snapshot.getSnapshotId(),
indexId,
snapshotRef,
getShardStateId(indexShard, snapshotRef.getIndexCommit()),
snapshotStatus,
version,
entryStartTime,
listener
)
);
} catch (Exception e) {
IOUtils.close(snapshotRef);
throw e;
}
});
}
snapshot()
В основном отвечает за создание снимков осколков.。существовать Долженметодсерединанас可以看到существовать Получить индексинформацияс шардингомidназад,Этот метод определяет, является ли текущий сегмент основным.,Он находится в статусе переезда?,Он находится в состоянии инициализации?,Все было строго проверено. после позднего завершения,создаст снимок осколка。Создание снимка завершеноназадвызовIndexShard.java
вacquireIndexCommitForSnapshot()
метод Отправьте индексный файл。нассуществоватьназад续将对Долженметодпровести анализ。
Здесь мы видим только проверку шардов,Если что-то пошло не так с индексом перед проверкой шарда,Тогда здесь будут лазейки в логике,Итак, мы нашли,Предоставление объектаindexShardПолучить индексс шардингомиз相关информация时вызов了indexServiceSafe()
метод,Возможно, он сможет ответить на наши вопросы.
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
public IndexService indexServiceSafe(Index index) {
IndexService indexService = indices.get(index.getUUID());
if (indexService == null) {
throw new IndexNotFoundException(index);
}
assert indexService.indexUUID().equals(index.getUUID())
: "uuid mismatch local: " + indexService.indexUUID() + " incoming: " + index.getUUID();
return indexService;
}
существоватьindexServiceSafe(Index index)
методсерединанас可以看到,Существование входящего индекса проверено. Это позволяет избежать исключений создания, вызванных проблемами индекса при создании снимков сегментов.
При создании моментального снимка сегмента,Чтобы гарантировать, что созданный нами снимок сегмента соответствует информации о сегменте, хранящейся в текущем кластере Elasticsearch.,здесь мыпроходитьgetShardStateId(IndexShard indexShard,IndexCommit snapshotIndexCommit)
Выполнить проверку согласованности。
@Nullable
public static String getShardStateId(IndexShard indexShard, IndexCommit snapshotIndexCommit) throws IOException {
final Map<String, String> userCommitData = snapshotIndexCommit.getUserData();
final SequenceNumbers.CommitInfo seqNumInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userCommitData.entrySet());
final long maxSeqNo = seqNumInfo.maxSeqNo;
if (maxSeqNo != seqNumInfo.localCheckpoint || maxSeqNo != indexShard.getLastSyncedGlobalCheckpoint()) {
return null;
}
return userCommitData.get(Engine.HISTORY_UUID_KEY)
+ "-"
+ userCommitData.getOrDefault(Engine.FORCE_MERGE_UUID_KEY, "na")
+ "-"
+ maxSeqNo;
}
В методе getShardStateId() для сегмента генерируется идентификатор на основе его текущего состояния. Этот идентификатор можно использовать для определения того, изменилось ли содержимое сегмента между двумя снимками. Если глобальная и локальная контрольные точки шарда равны, предполагается, что содержимое шарда не изменилось, его максимальный порядковый номер не изменился и его history-
и force-merge-uuid
Без изменений. Если глобальные и локальные контрольные точки шарда различны, этот метод возвращает {@code null}, потому что безопасное уникальное состояние шарда в этом случае использовать нельзя. ID, так как основной аварийный переход может привести к разному содержимому сегмента для одного и того же порядкового номера в последующих снимках.
/** Notify the master node that the given shard has been successfully snapshotted **/
private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, ShardSnapshotResult shardSnapshotResult) {
assert shardSnapshotResult != null;
assert shardSnapshotResult.getGeneration() != null;
sendSnapshotShardUpdate(snapshot, shardId, ShardSnapshotStatus.success(clusterService.localNode().getId(), shardSnapshotResult));
}
/** Notify the master node that the given shard failed to be snapshotted **/
private void notifyFailedSnapshotShard(
final Snapshot snapshot,
final ShardId shardId,
final String failure,
final ShardGeneration generation
) {
sendSnapshotShardUpdate(
snapshot,
shardId,
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, generation)
);
}
После завершения резервного копирования моментального снимка на уровне сегмента нам также необходимо синхронизировать сегмент с главным узлом. Информация, связанная со снимком, и статус запроса. На данный момент нам необходимо определить статус снимка. вызов不同из回调метод К главному узлуи Шардинг синхронизации узлов данныхинформация,Снимоксостояние。Ранназад才会进行相关资源из释放;Вызывается, если резервное копирование прошло успешноnotifySuccessfulSnapshotShard()
,Если не получится, нужно позвонитьnotifyFailedSnapshotShard()
。Независимо от того, какой этометод,Все необходимоесуществовать Передано при обратном вызовеsnapshot
,shardId
,ShardSnapshotStatus
,ShardGeneration
Эти четыре необходимых параметра。
private void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) {
remoteFailedRequestDeduplicator.executeOnce(
new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status),
new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
logger.trace("[{}][{}] updated snapshot state to [{}]", shardId, snapshot, status);
}
@Override
public void onFailure(Exception e) {
logger.warn(() -> format("[%s][%s] failed to update snapshot state to [%s]", shardId, snapshot, status), e);
}
},
(req, reqListener) -> transportService.sendRequest(
transportService.getLocalNode(),
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
req,
new ActionListenerResponseHandler<>(reqListener.map(res -> null), in -> ActionResponse.Empty.INSTANCE)
)
);
}
Этот метод отвечает за обновление статуса снимка на главном узле.
IndexShard.java
public Engine.IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException {
final IndexShardState state = this.state; // one time volatile read
if (state == IndexShardState.STARTED) {
// unlike acquireLastIndexCommit(), there's no need to acquire a snapshot on a shard that is shutting down
return getEngine().acquireIndexCommitForSnapshot();
} else {
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
}
}
этотметодиспользуется длясуществовать Снимок过程середина获取当前索引分片из索引提交(Index Commit)
Цитировать,以确保Снимок操作能够существовать一致из视图上执行。Долженметодсуществовать Получить ссылку на фиксацию индекса проверит состояние шардов индекса и разрешит Tweet только в том случае, если шарды находятся в запущенном состоянии. ссылку на фиксацию индекса. Пройти проверяет состояние сегмента и гарантирует, что Tweet разрешен только в том случае, если сегмент включен, а не выключен. ссылку на фиксацию индекса。Если шардинг не началсясостояние,выдает исключение,Предотвратите незаконные операции со снимками。существовать Долженметодиз返回серединаgetEngine().acquireIndexCommitForSnapshot();
вызывающий двигатель acquireIndexCommitForSnapshot Метод для получения ссылки на фиксацию индекса текущего фрагмента индекса. Это гарантирует, что полученная фиксация представляет собой согласованное представление, которое можно использовать для операций моментального снимка.
Анализируя логику этого метода, мы можем найти:
1. Чтение статуса шарда:
• Final IndexShardState state = this.state;: прочитать текущее состояние сегмента за один раз. Здесь this.state — изменчивая переменная, поэтому чтение является потокобезопасным.
2. Проверить статус шарда:
• if (state == IndexShardState.STARTED): Проверяет, является ли шард статусом STARTED (запущен). Получение ссылок на фиксацию индекса разрешено только при включенном сегментировании.
3. Получить ссылку на фиксацию индекса:
• return getEngine().acquireIndexCommitForSnapshot();: вызывающий механизм acquireIndexCommitForSnapshot Метод для получения ссылки на фиксацию индекса текущего фрагмента индекса. Это гарантирует, что полученная фиксация представляет собой согласованное представление, которое можно использовать для операций моментального снимка.
4. Обработка исключений:
• Ветка else: если статус сегмента не STARTED, выдается исключение IllegalIndexShardStateException, указывающее, что операции с моментальными снимками не разрешены в текущем состоянии.
getEngine().acquireIndexCommitForSnapshot()
Представление индекса, полученное здесь, зависит отLuceneвIndexCommit.java
IndexCommit.java
IndexCommit.java
Это абстрактный класс в базовом пакете Lucene, представляющий точку фиксации индекса (commit точка). Он играет ключевую роль в процессе управления индексами, особенно при обработке таких операций, как фиксация индекса, удаление и создание снимков.
public abstract class IndexCommit implements Comparable<IndexCommit> {
/** Получите файл сегмента, связанный с точкой фиксации. */
public abstract String getSegmentsFileName();
/** Возвращает все связанные файлы, на которые ссылается эта точка фиксации. */
public abstract Collection<String> getFileNames() throws IOException;
/** Возвращает индексированный каталог. */
public abstract Directory getDirectory();
/**
* Удаляет точку фиксации только для точек фиксации, указанных в контексте.
* После вызова этого метода вызывающая сторона будет уведомлена о необходимости удалить точку отправки. Конкретная политика удаления определяется IndexDeleationPolicy.
* Его можно вызвать только методом onInit() или onCommit() в соответствии с его политикой IndexDeleationPolicy;
*/
public abstract void delete();
/**
* Возвращает true, если вызывается удаление. По умолчанию вызывается IndexWriter.
*/
public abstract boolean isDeleted();
/** Возвращает количество ссылочных сегментов. */
public abstract int getSegmentCount();
/**Единственный метод отправки, обычно вызываемый неявно*/
protected IndexCommit() {}
/** Используется для определения равенства содержимого и каталогов, отправленных двумя IndexCommit. */
@Override
public boolean equals(Object other) {
if (other instanceof IndexCommit) {
IndexCommit otherCommit = (IndexCommit) other;
return otherCommit.getDirectory() == getDirectory()
&& otherCommit.getGeneration() == getGeneration();
} else {
return false;
}
}
@Override
public int hashCode() {
return getDirectory().hashCode() + Long.valueOf(getGeneration()).hashCode();
}
/** Возвращает сегмент, созданный текущей отправкой */
public abstract long getGeneration();
/**
* Возвращает пользовательские данные, переданные в IndexWriter.
*/
public abstract Map<String, String> getUserData() throws IOException;
@Override
public int compareTo(IndexCommit commit) {
if (getDirectory() != commit.getDirectory()) {
throw new UnsupportedOperationException(
"cannot compare IndexCommits from different Directory instances");
}
long gen = getGeneration();
long comgen = commit.getGeneration();
return Long.compare(gen, comgen);
}
/**
* Получить инициализацию точки фиксации от NRT или не от NRT
*/
StandardDirectoryReader getReader() {
return null;
}
}
Этот абстрактный класс в основном предоставляет следующие функции:
1. Управление и точка подачи справочного индекса:
• Предоставляет интерфейс для получения файлов сегментов, индексных файлов и каталогов для определенной точки фиксации.
2. Удаление и проверка точек фиксации:
• Позволяет отмечать точки фиксации для удаления с помощью метода delete и проверять статус удаления с помощью метода isDeleted.
3. Отправить сравнение баллов:
• Сравните поколения двух точек фиксации с помощью метода CompareTo, чтобы убедиться в правильном порядке операций.
4. Управление пользовательскими данными:
• Поддерживает хранение и извлечение пользовательских данных, связанных с точками отправки.
5. Обеспечьте последовательность:
• предоставил equals и hashCode Метод, обеспечивающий согласованность и уникальность точек подачи.