Рубрики
Без рубрики

Как мигрировать из Elasticsearch 1.7 до 6,8 с нулевым простоя

Автор оригинала: dor sever.

Моя последняя задача в Bigpanda Был на модернизацию существующей службы, которая использовала ELASTICSearch версию 1.7 до новой Elasticsearch Version, 6.8.1.

В этом посте я поделюсь, как мы перенесли от Elasticsearch 1.6 до 6.8 с жесткими ограничениями, такими как нулевые простоя, нет потери данных и нулевые ошибки. Я также предоставим вам сценарий, который делает миграцию для вас.

Этот пост содержит 6 глав (и один необязательный):

  • Что это значит для меня? -> Каковы были новые функции, которые привели нас к обновлению нашей версии?
  • Ограничения -> Каковы были наши бизнес-требования?
  • Решение проблем -> Как мы обратились к ограничениям?
  • Двигаться вперед -> план.
  • [Дополнительная глава] -> Как мы обрабатываем печально известную проблему сопоставления взрыва?
  • Наконец -> Как сделать миграцию данных между кластерами.

Какие преимущества мы ожидали решить, обновив наш магазин данных?

Было пару причин:

  1. Проблемы с производительностью и стабильностью – мы испытывали огромное количество отключений с длинным МТТР, который заставил нас много головных болей. Это было отражено в частых высоких задержках, высоком использовании ЦП и более вопросах.
  2. Несуществующая поддержка в старых версиях Elasticsearch – нам не хватало некоторого оперативного знания в Elasticsearch, и когда мы искали внешний консалтинг, нас рекомендовали мигрировать вперед для получения поддержки.
  3. Динамические сопоставления в нашей схеме – наша текущая схема в Elasticsearch 1.7 использовала функцию, называемую динамическими отображениями, которые сделали наш кластер взрываться много раз. Поэтому мы хотели решить эту проблему.
  4. Плохая видимость в нашем существующем кластере – мы хотели лучше просмотреть под капотом и увидели, что более поздние версии имели отличные инструменты экспорта метрик.
  • Нулевая променная простоя – у нас есть активные пользователи в нашей системе, и мы не могли позволить себе систему, пока мы были мигрировать.
  • План восстановления – мы не могли позволить себе «потерять» или «коррумпированные» данные, независимо от стоимости. Поэтому нам нужно было подготовить план восстановления, если наша миграция не удалась.
  • Zero Bugs – мы не смогли изменить существующие функции поиска для конечных пользователей.

Давайте решаем ограничения от самой простой до самой сложной:

Нулевые ошибки

Чтобы удовлетворить это требование, я изучал все возможные запросы на обслуживание, и какие его выходы были. Затем я добавил блок-тесты, где это необходимо.

Кроме того, я добавил несколько метрик (к ILASTICSearch Islearch и elasticsearch Islearch ), чтобы отслеживать задержку, пропускную способность и производительность, которая позволила мне проверить, что мы только улучшаем их.

План восстановления

Это означает, что мне нужно было решить следующую ситуацию: я развернул новый код до производства, а вещи не работали, как ожидалось. Что я могу сделать с этим, тогда

Так как я работал в сервисе, который использовал Поиск событий, Я мог бы добавить другой слушатель (диаграмма прилагается ниже) и начать писать на новый кластер Elasticsearch, не влияя на статус производства

Нулевое промежуточное время миграции

Текущий сервис находится в режиме прямого режима и не может быть «деактивирован» для периодов дольше 5-10 минут. Трюк, чтобы получить это правильно, так это:

  • Храните журнал всех действий Ваш сервис обрабатывается (мы используем кафка в производстве)
  • Запустите процесс миграции в автономном режиме (и отслеживать смещение перед началом миграции)
  • Когда миграция заканчивается, начните новую услугу против журнала с записанным смещением и догнать отставание
  • Когда задержка заканчивается, измените свой интерфейс, чтобы запросить новый сервис, и вы закончите

Наша текущая служба использует следующую архитектуру (на основе сообщения, проходящего в Кафке):

  1. Тема события Содержит события, произведенные другими приложениями (например, UserID 3 создан )
  2. Команда тема Содержит перевод этих событий на определенные команды, используемые этим приложением (например: Добавить пользователя 3 )
  3. Elasticsearch 1.7 – Детар Команда тема читать по Elasticsearch Indexer Отказ

Мы планировали добавить еще один потребитель ( новый elasticsearch Indexer ) к Команда тема , который прочитает те же точные сообщения и напиши их параллельно Elasticsearch 6.8.

Честно говоря, я посчитал себя пользователя Newbie Elasticsearch. Чувствовать себя уверенно, чтобы выполнить эту задачу, я должен был подумать о наилучшем способе приблизиться к этой теме и узнать его. Несколько вещей, которые помогли:

  1. Документация – это безумно полезный ресурс для всего Elasticsearch. Найдите время, чтобы прочитать его и делать заметки (не пропустите: картирование и Querydsl ).
  2. HTTP API – все под Кот API. Это было супер полезно для отладки вещей локально и посмотреть, как elasticsearch реагирует (не пропустите: здоровье кластеров, индексы кошек, поиск, удалить индекс).
  3. Метрики (❤️) – С первого дня мы настроили блестящую новую панель инструментов с большим количеством прохладных метрик (взятых из elasticsearch-exporter-for-primetheus ), которые помогли и подталкивали нас, чтобы понять больше о elasticsearch.

Наша кодовая база использовала библиотеку под названием Elastic4s И использовал самый старый релиз, доступный в библиотеке – действительно веская причина мигрировать! Таким образом, первое, что нужно сделать, это просто мигрировать версии и посмотреть, что сломалось.

Есть несколько тактиков о том, как выполнить эту миграцию кода. Тактика, которую мы выбрали, это попытаться восстановить существующую функциональность в первую очередь в новой версии Elasticsearch без переписывания всех кодов с самого начала. Другими словами, чтобы достичь существующей функциональности, но на более новой версии Elasticsearch.

К счастью, для нас код уже содержал почти полное охвату тестирования, поэтому наша задача была намного проще, и это заняло около 2 недель времени разработки.

Важно отметить, что, если это не так, мы должны были бы инвестировать некоторое время на заполнение этого охвата. Только тогда мы сможем мигрировать, так как одно из наших ограничений было не сломать существующие функциональные возможности.

Давайте опишем наш корпус использования более подробно. Это наша модель:

Класс insertmessagecommand (теги: карта [строка, строка])

А например, экземпляр этого сообщения будет:

Новый insertmessagecommand (карта (“Имя” -> “for”, “Фамилия” -> “Сервер”))

И учитывая эту модель, нам нужно было поддержать следующие требования к запросу:

  1. Запрос по значению
  2. Запрос по имени и значению тега

То, как это было моделировано в нашем elasticsearch 1.7 схема, использовали динамическую схему шаблона (поскольку клавиши тегов являются динамическими и не могут быть смоделированы в Advanced).

Динамический шаблон причинил нам несколько отключений из-за проблем сопоставления взрыва, и схема выглядела так:

curl -X PUT "localhost:9200/_template/my_template?pretty" -H 'Content-Type: application/json' -d '
{
    "index_patterns": [
        "your-index-names*"
    ],
    "mappings": {
            "_doc": {
                "dynamic_templates": [
                    {
                        "tags": {
                            "mapping": {
                                "type": "text"
                            },
                            "path_match": "actions.tags.*"
                        }
                    }
                ]
            }
        },
    "aliases": {}
}'  

curl -X PUT "localhost:9200/your-index-names-1/_doc/1?pretty" -H 'Content-Type: application/json' -d'
{
  "actions": {
    "tags" : {
        "name": "John",
        "lname" : "Smith"
    }
  }
}
'

curl -X PUT "localhost:9200/your-index-names-1/_doc/2?pretty" -H 'Content-Type: application/json' -d'
{
  "actions": {
    "tags" : {
        "name": "Dor",
        "lname" : "Sever"
  }
}
}
'

curl -X PUT "localhost:9200/your-index-names-1/_doc/3?pretty" -H 'Content-Type: application/json' -d'
{
  "actions": {
    "tags" : {
        "name": "AnotherName",
        "lname" : "AnotherLastName"
  }
}
}
'
curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d'
{
    "query": {
        "match" : {
            "actions.tags.name" : {
                "query" : "John"
            }
        }
    }
}
'
# returns 1 match(doc 1)


curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d'
{
    "query": {
        "match" : {
            "actions.tags.lname" : {
                "query" : "John"
            }
        }
    }
}
'
# returns zero matches

# search by value
curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d'
{
    "query": {
        "query_string" : {
            "fields": ["actions.tags.*" ],
            "query" : "Dor"
        }
    }
}
'

Вложенный документ решение

Наш первый инстинкт в решении проблем сопоставления взрыва заключался в использовании вложенных документов.

Мы читаем учебное пособие типа вложенного типа в упругих документах и определили следующую схему и запросы:

curl -X PUT "localhost:9200/my_index?pretty" -H 'Content-Type: application/json' -d'
{
        "mappings": {
            "_doc": {
            "properties": {
            "tags": {
                "type": "nested" 
                }                
            }
        }
        }
}
'

curl -X PUT "localhost:9200/my_index/_doc/1?pretty" -H 'Content-Type: application/json' -d'
{
  "tags" : [
    {
      "key" : "John",
      "value" :  "Smith"
    },
    {
      "key" : "Alice",
      "value" :  "White"
    }
  ]
}
'


# Query by tag key and value
curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "nested": {
      "path": "tags",
      "query": {
        "bool": {
          "must": [
            { "match": { "tags.key": "Alice" }},
            { "match": { "tags.value":  "White" }} 
          ]
        }
      }
    }
  }
}
'

# Returns 1 document


curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "nested": {
      "path": "tags",
      "query": {
        "bool": {
          "must": [
            { "match": { "tags.value":  "Smith" }} 
          ]
        }
      }
    }
  }
}
'

# Query by tag value
# Returns 1 result

И это решение работало. Однако, когда мы пытались вставить реальные данные клиента, мы увидели, что количество документов в нашем индексе увеличилось примерно в 500 раз.

Мы думали о следующих проблемах и продолжали найти лучшее решение:

  1. Сумма документов, которые мы имели в нашем кластере, составляли около 500 миллионов документов. Это означало, что с новой схемой мы собирались достичь двухста пятидесяти миллиардов документов (это 250 000 000 000 документов?).
  2. Мы читаем это действительно хорошее сообщение в блоге – https://blog.gojekengineering.com/elasticsearch-the-truble-with-nested-documents-e97b33b46194 которые подчеркивают, что вложенные документы могут вызвать высокую задержку в запросах и проблемах использования кучи.
  3. Тестирование – поскольку мы преобразовав 1 документ в старом кластере на неизвестное количество документов в новом кластере, было бы намного сложнее отслеживать, если процесс миграции работал без каких-либо потерь данных. Если наше преобразование было 1: 1, мы могли бы утверждать, что счет в старом кластере равнил счет в новом кластере.

Избегая вложенных документов

Реальный трюк в этом заключался в том, чтобы сосредоточиться на том, какие поддерживаемые запросы мы работали: поиск по значению тега и поиск по ключу тега и значением.

Первый запрос не требует вложенных документов, поскольку он работает на одном поле. Для последних мы сделали следующую хитрость. Мы создали поле, содержащее комбинацию ключа и значения. Всякий раз, когда пользовательские запросы на клавише, значение совпадают, мы переводим их запрос к соответствующему тексту и запросу против этого поля.

Пример:

curl -X PUT "localhost:9200/my_index_2?pretty" -H 'Content-Type: application/json' -d'
{
    "mappings": {
        "_doc": {
            "properties": {
                "tags": {
                    "type": "object",
                    "properties": {
                        "keyToValue": {
                            "type": "keyword"
                        },
                        "value": {
                            "type": "keyword"
                        }
                    }
                }
            }
        }
    }
}
'


curl -X PUT "localhost:9200/my_index_2/_doc/1?pretty" -H 'Content-Type: application/json' -d'
{
  "tags" : [
    {
      "keyToValue" : "John:Smith",
      "value" : "Smith"
    },
    {
      "keyToValue" : "Alice:White",
      "value" : "White"
    }
  ]
}
'

# Query by key,value
# User queries for key: Alice, and value : White , we then query elastic with this query:

curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
        "bool": {
          "must": [ { "match": { "tags.keyToValue": "Alice:White" }}]
  }}}
'

# Query by value only
curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
        "bool": {
          "must": [ { "match": { "tags.value": "White" }}]
  }}}
'

Мы планировали мигрировать около 500 миллионов документов с нулевым простоя. Для этого нам нужно:

  1. Стратегия о том, как передавать данные от старого эластичного на новый Elasticsearch
  2. Стратегия о том, как закрыть отставание между началом миграции и конец его

И наши два варианта в закрытии отставания:

  1. Наша система обмена сообщениями на основе кафка. Мы могли бы просто взять текущее смещение до начала миграции, и после того, как миграция закончилась, начните потребление от этого конкретного смещения. Это решение требует некоторой ручной настройки смещений и некоторых других вещей, но будет работать.
  2. Другим подходом к решению этого вопроса состоял в том, чтобы начать потребление сообщений с начала темы в Кафке и сделать наши действия по эластичному поиску IDEMPotent – значение, если изменение было «применено» уже, ничего не изменится в эластичном магазине.

Запросы, сделанные нашим сервисом против эластичных, были уже Idempotent, поэтому мы выбираем вариант 2, потому что она требовала нулевую ручную работу (не нужно принимать конкретные смещения, а затем устанавливать их в новую группу потребителей).

Как мы можем мигрировать данные?

Это были варианты, которые мы думали:

  1. Если наша кафка содержит все сообщения с самого начала времени, мы могли бы просто играть с самого начала, а конечное состояние будет равно. Но так как мы применяем удерживающую темы, это не было вариантом.
  2. Дамп сообщений на диск, а затем глотать их в эластичном напрямую – это решение выглядело своего рода странно. Зачем хранить их на диске вместо того, чтобы просто писать их непосредственно упругими?
  3. Передача сообщений между старой эластичной для новой эластичности – это означало, написать какой-то «скрипт» (кто-нибудь сказал Python??), Который подключится к старому кластеру elasticsearch, запрос элементов, преобразуйте их в новую схему и индексируйте их в кластере.

Мы выбираем последний вариант. Это были выборы дизайна, который мы имели в виду:

  1. Давайте не будем пытаться подумать об обработке ошибок, если нам не нужно. Давайте попробуем написать что-то Super Simple, и если возникнут ошибки, давайте попробуем обратиться к ним. В конце концов, нам не нужно было устранить эту проблему, поскольку во время миграции не произошло никаких ошибок.
  2. Это разовая операция, так что все равно первым/поцелуй.
  3. Метрики – поскольку процессы миграции могут занять часы до нескольких дней, мы хотели, чтобы Способность от первого дня можно было отслеживать количество ошибок и отслеживать текущий прогресс и скорость копирования сценария.

Мы думали долго и трудно и выбираем Python как наше оружие выбора. Окончательная версия кода ниже:

dictor==0.1.2 - to copy and transform our Elasticsearch documentselasticsearch==1.9.0 - to connect to "old" Elasticsearchelasticsearch6==6.4.2 - to connect to the "new" Elasticsearchstatsd==3.3.0 - to report metrics
from elasticsearch import Elasticsearch
from elasticsearch6 import Elasticsearch as Elasticsearch6
import sys
from elasticsearch.helpers import scan
from elasticsearch6.helpers import parallel_bulk
import statsd

ES_SOURCE = Elasticsearch(sys.argv[1])
ES_TARGET = Elasticsearch6(sys.argv[2])
INDEX_SOURCE = sys.argv[3]
INDEX_TARGET = sys.argv[4]
QUERY_MATCH_ALL = {"query": {"match_all": {}}}
SCAN_SIZE = 1000
SCAN_REQUEST_TIMEOUT = '3m'
REQUEST_TIMEOUT = 180
MAX_CHUNK_BYTES = 15 * 1024 * 1024
RAISE_ON_ERROR = False


def transform_item(item, index_target):
    # implement your logic transformation here
    transformed_source_doc = item.get("_source")
    return {"_index": index_target,
            "_type": "_doc",
            "_id": item['_id'],
            "_source": transformed_source_doc}


def transformedStream(es_source, match_query, index_source, index_target, transform_logic_func):
    for item in scan(es_source, query=match_query, index=index_source, size=SCAN_SIZE,
                     timeout=SCAN_REQUEST_TIMEOUT):
        yield transform_logic_func(item, index_target)


def index_source_to_target(es_source, es_target, match_query, index_source, index_target, bulk_size, statsd_client,
                           logger, transform_logic_func):
    ok_count = 0
    fail_count = 0
    count_response = es_source.count(index=index_source, body=match_query)
    count_result = count_response['count']
    statsd_client.gauge(stat='elastic_migration_document_total_count,index={0},type=success'.format(index_target),
                        value=count_result)
    with statsd_client.timer('elastic_migration_time_ms,index={0}'.format(index_target)):
        actions_stream = transformedStream(es_source, match_query, index_source, index_target, transform_logic_func)
        for (ok, item) in parallel_bulk(es_target,
                                        chunk_size=bulk_size,
                                        max_chunk_bytes=MAX_CHUNK_BYTES,
                                        actions=actions_stream,
                                        request_timeout=REQUEST_TIMEOUT,
                                        raise_on_error=RAISE_ON_ERROR):
            if not ok:
                logger.error("got error on index {} which is : {}".format(index_target, item))
                fail_count += 1
                statsd_client.incr('elastic_migration_document_count,index={0},type=failure'.format(index_target),
                                   1)
            else:
                ok_count += 1
                statsd_client.incr('elastic_migration_document_count,index={0},type=success'.format(index_target),
                                   1)

    return ok_count, fail_count


statsd_client = statsd.StatsClient(host='localhost', port=8125)

if __name__ == "__main__":
    index_source_to_target(ES_SOURCE, ES_TARGET, QUERY_MATCH_ALL, INDEX_SOURCE, INDEX_TARGET, BULK_SIZE,
                           statsd_client, transform_item)

Миграция данных в живой производственной системе является сложной задачей, которая требует большого внимания и тщательного планирования. Я рекомендую поделиться на время работы через шаги, перечисленные выше, и выясните, что лучше всего подходит для ваших потребностей.

Как правило, всегда старайтесь максимально снизить ваши требования. Например, требуется нулевая простоя? Можете ли вы позволить себе потерю данных?

Модернизация хранилище данных обычно является марафоном, а не спринт, поэтому глубоко вздохните и постарайтесь наслаждаться поездкой.

  • Весь процесс, перечисленный выше, взял меня около 4 месяцев работы
  • Все примеры Elasticsearch, которые появляются в этом посте, были протестированы против версии 6.8.1