Рубрики
Без рубрики

Устройство тестирования вашей библиотеки Pyspark

В разработке программного обеспечения мы часто единилируйте тест нашего кода (надеюсь). И код, написанный для искры, не отличается … Теги с Python, Spark, Testing, Pyspark.

В разработке программного обеспечения мы часто единилируйте тест нашего кода (надеюсь). И код, написанный для Зажигание ничем не отличается Так что здесь я хочу пройти через пример построения небольшой библиотеки, используя Pyspark и единица тестирования его. Я использую Визуальный студийный код Как здесь мой редактор, в основном потому, что я думаю, что это блестяще, но другие редакторы доступны.

Создание демо-библиотеки

Я действительно хотел избежать обычного «Давайте добавим два числа вместе» пример и использовать что-то немного больше, как то, что вы можете столкнуться с повседневностью. Итак, я собираюсь создать библиотеку с некоторыми методами вокруг создания измерения даты.

Размер даты – это то, что вы будете иметь тенденцию встречаться в любых хранилищах данных или хранилище данных, где вы работаете с датами. И действительно это почти везде! Этот вид измерения, как правило, создается для содержения информации о днях года, которые пользователи могут захотеть данные с ломтиками и DICE с позже. Если вы можете присоединиться к таблицам фактов в размерность даты на основе ключа даты, а затем хотеть просмотреть все данные в данном месяце, четверть, на прошлой неделе, по выходным, оно становится намного проще.

Я собираюсь придерживаться довольно простого размера даты для краткости, поэтому он просто включает в себя ключ, основную информацию о день, месяц и год, а также номер недели ISO.

Номер недели ISO – это кикер, и лучше объясняется Википедия Но это становится интересным, потому что, вопреки распространенному убеждению, не всегда 52 недели в год (даты всегда столь веселья). Это не то, для чего существует функция в вашей искре, но, к счастью, это фракция, мы можем получить прямо из Python, поскольку у класса DateTime есть Isocalendar Функция, которая дает нам значения ISO обратно в виде кортежа.

Но … Мы не можем просто позвонить в код Python прямо из Pyspark (ну, мы можем сделать, но мы не должны, потому что это действительно плохо для производительности). Поэтому нам нужно будет создать функцию определенной пользователем.

Пышпарк намного легче работать в последние годы, и это особенно верно для UDF с прибытием Pandas UDFS Отказ Если вы хотите узнать, почему тогда я предлагаю вам прочитать документы, так как это немного забрасывается здесь, но это означает, что мы можем написать пользовательские функции в Python, которые значительно больше исполняются. Все еще не совсем хорошо, как Scala, но когда вы думаете о том, что ему нужно сделать под капотом, они очень впечатляют.

Функция номера ISO недели

Напишите, давайте погрузиться в код для пользовательской функции, то.

def get_iso_weeknumber(dt: Union[date, datetime]) -> int:
    if type(dt) is not datetime and type(dt) is not date:
        return None

    return dt.isocalendar()[1]


@pandas_udf(IntegerType())
def iso_week_of_year(dt: pd.Series) -> pd.Series:
    return dt.apply(lambda x: get_iso_weeknumber(x) if not pd.isnull(x) else pd.NA)

Я намеренно разбил его на 2 функция здесь, и я, вероятно, мог бы использовать некоторые лучшие имена, но и мех. Причина разрушения его в зависимости от тестирования. Куда возможно, мы хотим проверить пользовательскую логику вне искры, поэтому нам не нужно раскрутить новую сессию, это означает, что мы можем писать много тестов и быстро выполнить их, чтобы проверить его работать, и это цель get_iso_weeknumber . функция.

В ISO_Week_of_year Функция Мы получаем серию Panda как наш вклад, и мы возвращаем еще одну серию Panda. Это функция серии для серии, а выход должен быть одинаковой длины, что и вход. Кодекс в этой функции намеренно коротко, поскольку мы хотим сохранить столько же логики отсюда, насколько это возможно, он действительно должен просто заботиться о применении get_iso_weeknumber . функция на вход. И это все, что он делает, до тех пор, пока входное значение не ноль.

Размер даты

Теперь, когда мы можем создать наш номер недели ISO, мы можем создать размерность даты по требованию. Есть несколько разных способов этого, но мой предпочтительный метод использует Sparksession.range Функция с использованием временных меток Unix.

def create_date_dimension(spark: SparkSession, start: datetime, end: datetime) -> DataFrame:
    if spark is None or type(spark) is not SparkSession:
        raise ValueError("A valid SparkSession instance must be provided")

    if type(start) is not datetime:
        raise ValueError("Start date must be a datetime.datetime object")

    if type(end) is not datetime:
        raise ValueError("End date must be a datetime.datetime object")

    if start >= end:
        raise ValueError("Start date must be before the end date")

    if start.tzinfo is None:
        start = datetime.combine(start.date(), time(0, 0, 0), tzinfo=timezone.utc)

    if end.tzinfo is None:
        end = datetime.combine(end.date(), time(0, 0, 0), tzinfo=timezone.utc)

    end = end + timedelta(days=1)

    return (
        spark.range(start=start.timestamp(), end=end.timestamp(), step=24 * 60 * 60)
             .withColumn("date", to_date(from_unixtime("id")))
             .withColumn("date_key", date_format("date", "yyyyMMdd").cast("int"))
             .withColumn("day", dayofmonth("date"))
             .withColumn("day_name", date_format("date", "EEEE"))
             .withColumn("day_short_name", date_format("date", "EEE"))
             .withColumn("month", month("date"))
             .withColumn("month_name", date_format("date", "MMMM"))
             .withColumn("month_short_name", date_format("date", "MMM"))
             .withColumn("year", year("date"))
             .withColumn("week_number", iso_week_of_year("date"))
    )

В этой функции есть много установки, просто чтобы убедиться, что мы даем всю правильную информацию для создания измерения, но фактическое поколение довольно быстро. Вы можете увидеть на последней строке, которую мы используем наше ISO_Week_of_year UDF, чтобы получить номер недели и позвонить ему так же, как и любая другая функция искры.

Используя Sparksession.range Функция с шагом 86 400 означает, что мы увеличиваем на 1 день за раз, независимо от любого временного компонента, мы перейдем к функции в первую очередь.

Запуск Это дает нам стол, выглядящую следующее:

>>> start = datetime(2021, 1, 1)
>>> end = datetime(2021, 1, 10)
>>> create_date_dimension(spark, start, end).show()
+----------+----------+--------+---+---------+--------------+-----+----------+----------------+----+-----------+
|        id|      date|date_key|day| day_name|day_short_name|month|month_name|month_short_name|year|week_number|
+----------+----------+--------+---+---------+--------------+-----+----------+----------------+----+-----------+
|1609459200|2021-01-01|20210101|  1|   Friday|           Fri|    1|   January|             Jan|2021|         53|
|1609545600|2021-01-02|20210102|  2| Saturday|           Sat|    1|   January|             Jan|2021|         53|
|1609632000|2021-01-03|20210103|  3|   Sunday|           Sun|    1|   January|             Jan|2021|         53|
|1609718400|2021-01-04|20210104|  4|   Monday|           Mon|    1|   January|             Jan|2021|          1|
|1609804800|2021-01-05|20210105|  5|  Tuesday|           Tue|    1|   January|             Jan|2021|          1|
|1609891200|2021-01-06|20210106|  6|Wednesday|           Wed|    1|   January|             Jan|2021|          1|
|1609977600|2021-01-07|20210107|  7| Thursday|           Thu|    1|   January|             Jan|2021|          1|
|1610064000|2021-01-08|20210108|  8|   Friday|           Fri|    1|   January|             Jan|2021|          1|
|1610150400|2021-01-09|20210109|  9| Saturday|           Sat|    1|   January|             Jan|2021|          1|
|1610236800|2021-01-10|20210110| 10|   Sunday|           Sun|    1|   January|             Jan|2021|          1|
+----------+----------+--------+---+---------+--------------+-----+----------+----------------+----+-----------+

Вы можете увидеть, что первые несколько дней 2021 года были частью недели 53, а неделя 1 на самом деле не началась до 4 января.

Установка тестирования кода

Но название этого поста о единице тестирования кода, так как это делается?

Ну, мы видели, что разбиты функции номер недели ISO, так Устройство тестирования get_iso_weeknumber. довольно прямо вперед и может быть проверен как и любой другой код Python.

В то время как мы можем проверить ISO_Week_of_year Функция, которую мы теряем способность захватывать любую информацию о покрытии из-за того, как зажечь вызывает функцию. Если вы используете Coverage.py, то вы можете добавить шаблоны для Ваш .ceveragerc Файл Чтобы исключить эти функции, подобные этому из ваших результатов покрытия, таких как следующее.

[report]
exclude_lines =
    def .* -> pd.Series

Мы собираемся проверить наши create_date_dimension Функция, которая позвонит ISO_Week_of_year функция.

Нам понадобится сеанс искры, чтобы проверить код, и самый очевидный способ сделать это, как это было бы таким (я использую Python Unittest здесь).

class DateDimensionTests(unittest.TestCase):
        def test_simple_creation(self):
        spark = SparkSession.builder.appName("unittest").master("local[*]").getOrCreate()

        df = create_date_dimension(spark, datetime(2021, 1, 1), datetime(2021, 1, 5))

Но есть проблема с этим. Для каждого теста мы собираемся создавать совершенно новый Spark сеанс, который будет дорого. И если нам нужно изменить какую-либо из конфигурации искры, нам нужно будет делать это в каждом тесте!

Так что лучше? Ну, мы можем создать наш собственный тестовый класс на основе модульный тест. Testcase который делает все настроен для нас.

class PySparkTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls) -> None:
        conf = SparkConf()
        conf.setAppName("demo-unittests")
        conf.setMaster("local[*]")
        conf.set("spark.sql.session.timezone", "UTC")

        cls.spark = SparkSession.builder.config(conf=conf).getOrCreate()

    @classmethod
    def tearDownClass(cls) -> None:
        cls.spark.stop()

    def tearDown(self) -> None:
        self.spark.catalog.clearCache()

    def assert_dataframes_equal(self, expected: DataFrame, actual: DataFrame, display_differences: bool = False) -> None:
        diff: DataFrame = actual.subtract(expected).cache()
        diff_count: int = diff.count()

        try:
            if diff_count != 0 and display_differences:
                diff.show()
            self.assertEqual(0, diff_count)
        finally:
            diff.unpersist

Что мы получили сейчас? Ну, у нас есть наш setupclass метод, который делает работу по настройке нашей искровой сессии; И у нас есть наш therdownclass Метод, который очищает за искровой сеанс, как только мы закончим.

Мы также добавили в Обзор Метод, который очищает после каждого теста, в этом случае он очищает кеш, на всякий случай. И есть новый метод сравнения кадров данных, которые тестируют много повторяющегося кода, мы можем написать в наших модульных тестах.

Одна из других вещей, которые отличаются в этой установке, это линия conf.set ("Spark.sql.session.timeZone", "UTC") Отказ Температуры являются злом. Это так просто. Дата вроде хорошо, он представляет точку в календаре, поэтому они приятно работать. Но диапазон, который мы создаем, используют секунды из эпохи, и поэтому мы находимся в мире временных метров, и этот мир имеет дневное сбережения и часовые появления! И как они работают варьируется в зависимости от вашей ОС выбора.

Запуск следующего на моем ноутбуке Windows 10 с помощью Python 3 дает ошибку Oserror: [Errno 22] Неверный аргумент

from datetime import datetime
datetime(1066, 10, 14).timestamp()

Но работает в Ubuntu 20.04 на WSL дает результат -28502755125.0 . Сейчас, Запуск его снова в обоих, но с предоставленной информацией о часовой зоне.

from datetime import datetime, timezone
datetime(1066, 10, 14, tzinfo=timezone.utc).timestamp()

Ну, теперь они оба дают ответ -28502755200.0 . Таким образом, установка часового пояса Spark Session в UTC означает, что он будет постоянно обрабатывать временные метки, независимо от того, где в мире я или на какую ОС я бегу.

Написание сами тесты

Теперь, когда у нас есть класс нашего теста, мы можем написать несколько модульных тестов, и путем размещения ряда их в тот же класс тестового теста, который мы собираемся использовать тот же сеанс искры, что означает, что мы не собираемся нести Штраф производительности для постоянного повторного создания его.

class DateDimensionTests(PySparkTestCase):
    def test_simple_creation(self):
        expected = self.spark.read.csv(
            self.get_input_file("simple_date_dim.csv"),
            header=True,
            schema=DateDimensionTests.get_dimension_schema())

        result = create_date_dimension(self.spark, datetime(2021, 1, 1), datetime(2021, 1, 5))

        self.assert_dataframes_equal(expected, result, display_differences=True)

    def test_simple_range_week53(self):
        week_53_day_count = (
            create_date_dimension(self.spark, datetime(2020, 12, 1), datetime(2021, 1, 31))
            .filter(col("week_number") == 53)
            .count()
        )

        self.assertEqual(7, week_53_day_count)

Теперь они могут быть выполнены с помощью Test Runner Code Code Code Visual Studio …

… или из командной строки.

python -m unittest discover -v -s ./tests -p test_*.py

Они все еще не такие исполнительные тесты, которые не требуют зажимного сеанса и поэтому, по возможности, убедитесь, что бизнес-логика проверена вне искровой сессии. Но если вам нужно один, то это определенно путь.

Резюме

Флаг Так что это мой первый пост для dev.to. Я был сидел на чтении от других на некоторое время и думал, что пришло время, когда я внес свой собственный. Надеюсь, это поможет кому-то или дает кому-то несколько идей для чего-то еще.

Если вы можете подумать о том, что может быть улучшено, или у вас есть другой способ делать вещи, а затем дайте мне знать, все улучшаются только с сотрудничеством.

Если вы хотите взглянуть на код или использовать его каким-либо образом (это лицензированное MIT), то вы можете найти его на Github Отказ

Оригинал: “https://dev.to/dazfuller/unit-testing-your-pyspark-library-e5j”