Рубрики
Без рубрики

5-минутное руководство по использованию ведения в pyspark

Руководство по ведению ведения – метод оптимизации, которая использует ведра для определения разделения данных и избежать перетасовки данных. Tagged Spark, Python, Bigdata.

В мире есть много разных инструментов, каждый из которых решает ряд проблем. Многие из них судят по тому, насколько хорошо и правильно они решают эту или иную проблему, но есть инструменты, которые вам просто нравятся, вы хотите их использовать. Они правильно спроектированы и хорошо вписываются в вашу руку, вам не нужно копаться в документации и понимать, как выполнить то или иное простое действие. О одном из этих инструментов для меня я буду писать эту серию постов.

Я опишу методы оптимизации и советы, которые помогут мне решить определенные технические проблемы и достичь высокой эффективности. Это моя обновленная коллекция.

Многие из оптимизаций, которые я опишу, не повлияет на языки JVM, но без этих методов многие приложения Python могут просто не работать.

Целая серия:

Давайте начнем с проблемы.

У нас есть два стола, и мы делаем одно простое внутреннее соединение по одной столбце:

t1 = spark.table("unbucketed1")
t2 = spark.table("unbucketed2")

t1.join(t2, "key").explain()

В физическом плане вы получите что -то вроде следующего:

== Physical Plan ==                                                             
*(5) Project [key#10L, value#11, value#15]
+- *(5) SortMergeJoin [key#10L], [key#14L], Inner
   :- *(2) Sort [key#10L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#10L, 200)
   :     +- *(1) Project [key#10L, value#11]
   :        +- *(1) Filter isnotnull(key#10L)
   :           +- *(1) FileScan parquet default.unbucketed1[key#10L,value#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/unbucketed1], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct
   +- *(4) Sort [key#14L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(key#14L, 200)
         +- *(3) Project [key#14L, value#15]
            +- *(3) Filter isnotnull(key#14L)
               +- *(3) FileScan parquet default.unbucketed2[key#14L,value#15] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/unbucketed2], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct, SelectedBucketsCount: 16 out of 16

Sortmergejoin это соединение Spark по умолчанию, но теперь мы обеспокоены двумя другими вещами в плане выполнения. Они два Обмен операции. Мы всегда обеспокоены обменами, потому что они перетасовывают наши данные – мы хотим избежать этого, ну, если не нет выбора. Но…

Мы знаем, что присоединились к ключ Столбец, поэтому мы будем использовать эту информацию, чтобы избавиться от этих двух обменов.

Как?

Используйте ведро

Ведение – это метод оптимизации, которая разлагает данные на более управляемые детали (ведра) для определения разделения данных. Мотивация состоит в том, чтобы оптимизировать производительность запроса соединения, избегая перетасовывает (AKA Bearchs) таблиц, участвующих в объединении. Ведение приводит к меньшему количеству обменов (и, следовательно, этапов), поскольку перетасование может не потребоваться – оба фрейма уже могут быть расположены в одних и тех же разделах.

Ведение включено по умолчанию. Spark SQL использует spark.sql.sources.bucketing.enabled Свойство конфигурации для управления, следует ли его включить и использовать для оптимизации запросов или нет.

Ведение указывает физическое размещение данных, поэтому мы предварительно перетасовываем наши данные, потому что мы хотим избежать перетасовки данных во время выполнения.

Окей, мне действительно нужно сделать дополнительный шаг, если перетасовка будет выполнена в любом случае?

Если ты Присоединение несколько раз чем да. Чем больше присоединяется к лучшему росту производительности.

Пример того, как создать таблицу ведения:

df.write\
    .bucketBy(16, "key") \
    .sortBy("value") \
    .saveAsTable("bucketed", format="parquet")

Итак, здесь, Bucketby Распределяет данные по фиксированному количеству ведра (16 в нашем случае) и может использоваться, когда ряд уникальных значений не ограничены. Если количество уникальных значений ограничено, лучше использовать разделение, а не ведение.

t2 = spark.table("bucketed")
t3 = spark.table("bucketed")

# bucketed - bucketed join. 
# Both sides have the same bucketing, and no shuffles are needed.
t3.join(t2, "key").explain()

И в результате физический план:

== Physical Plan ==
*(3) Project [key#14L, value#15, value#30]
+- *(3) SortMergeJoin [key#14L], [key#29L], Inner
   :- *(1) Sort [key#14L ASC NULLS FIRST], false, 0
   :  +- *(1) Project [key#14L, value#15]
   :     +- *(1) Filter isnotnull(key#14L)
   :        +- *(1) FileScan parquet default.bucketed[key#14L,value#15] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark/spark-warehouse/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct, SelectedBucketsCount: 16 out of 16
   +- *(2) Sort [key#29L ASC NULLS FIRST], false, 0
      +- *(2) Project [key#29L, value#30]
         +- *(2) Filter isnotnull(key#29L)
            +- *(2) FileScan parquet default.bucketed[key#29L,value#30] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/opt/spark-warehouse/bucketed], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct, SelectedBucketsCount: 16 out of 16

Здесь у нас есть не только меньше этапов кода, но и обменов.

По состоянию на Spark 2.4, Spark поддерживает обрезку ведра для оптимизации фильтрации на ведре с ведром (путем уменьшения количества файлов ковша для сканирования).

Резюме

В целом, ведение – это относительно новая техника, которая в некоторых случаях может быть большим улучшением как в стабильности, так и в результате производительности. Тем не менее, я обнаружил, что использование его не является тривиальным и имеет много GotChas.

Ведение работает хорошо, когда количество уникальных значений не ограничено. Столбцы, которые часто используются в запросах и обеспечивают высокую селективность, являются хорошим выбором для ведения. Spark Tables, которые представляют собой метаданные магазина о том, как их ведет и отсортированы, которые помогают оптимизировать соединения, агрегации и запросы на веде -колонках.

Полная суть

Спасибо за чтение!

Любые вопросы? Оставьте свой комментарий ниже, чтобы начать фантастические дискуссии!

Проверить Мой блог или прийти, чтобы сказать привет 👋 на Twitter или подписаться на мой канал телеграммы . Планируйте все возможное!

Оригинал: “https://dev.to/luminousmen/the-5-minute-guide-to-using-bucketing-in-pyspark-4egg”