Рубрики
Без рубрики

Использование Kedro в сценариях

С последними выпусками Kedro 0.17.x теперь можно запустить трубопроводы Kedro изнутри … Tagged с Python, Kedro.

С последними выпусками Kedro 0,17.x , теперь можно запустить kedro трубопроводы из сценариев. Хотя я бы не стал начинать проект с этим Техника, это будет отличный инструмент, чтобы держать в моем заднем кармане, когда я захочу Посыпать немного добра в существующих проектах.

Новичок в Кедро

Что такое Кедро

Если вы просто узнаете о Кедро, проверьте этот пост, пройдя через него

Больше нет кроличьей дыры ошибок

По состоянию на 0,17,2

Я пытался сделать это в Кедро 0.16.x, которая превратилась в кроличье дыру ошибки. Во -первых, Кедро нуждался в conf каталог. Если вы пытались подделать один, Kedro Затем попросил настройку журнала. Эти ошибки просто продолжали доходить до сути Это не стоило, и я мог бы также использовать правильный шаблон для реального Проекты и придерживаться простых функций, вызовы для вещей, которые не являются Kedro проект.

Кедро в сценарии

Чтобы получить работу KEDRO, вам понадобится конвейер, каталог и бегун как минимум. Если вы раньше использовали KEDRO, трубопровод будет очень похож на то, с чем вы знакомы, но каталог не будет загружен из YAML, и вам нужно будет принести своего бегуна.

from kedro.pipeline import Pipeline, node
from kedro.io import DataCatalog
from kedro.runner.sequential_runner import SequentialRunner


# additional datasets you want to use
from kedro.extras.datasets.pandas.csv_dataset import CSVDataSet
from kedro.extras.datasets.pandas.parquet_dataset import ParquetDataSet

# the sequential runner is the simplest. It runs one node at a time.
runner = SequentialRunner()

# this is a super simple example pipeline
pipeline = Pipeline(
    [
        node(lambda: range(100), None, "range"),
        node(lambda x: [i ** 2 for i in x], "range", "range**2"),
        node(lambda x: [i for i in x if i > 5000], "range**2", "range>5k"),
        node(lambda x: x[:5], "range>5k", "range>5k-head"),
        node(lambda x: sum(x) / len(x), "range>5k", "range>5k-mean"),
    ]
)

# to get up and running, you can use an empty catalog
catalog = DataCatalog()

runner.run(pipeline, catalog)

👆 Выше минимальная установка для работы с трубопроводом KEDRO

более практически

Чаще всего ваши трубопроводы KEDRO будут использовать функцию, а не лямбду, и пандас DataFrames.

def clean_columns(df: pd.DataFrame):
    df.columns = [col.lower().strip() for col in df.columns]

pipeline = Pipeline(
    [
        node(clean_columns, "raw_data", "clean_columns", name="create_clean_columns"),
    ]
)

catalog = DataCatalog(
    {
        "raw_data": ParquetDataSet(filepath=f"data/raw_data.parquet")
        "clean_columns": ParquetDataSet(filepath=f"data/clean_columns.parquet")
    }
)

Один трубопровод для одного узла, чтобы получить вас начал

Полуавтоматический каталог

По какой -то причине, когда я попытался использовать DataCatalogWithDefault, он не поднял мои наборы данных правильно. Я подозреваю, что это как -то связано с тем, что не настраивает надлежащий сеанс, так что это то, что я сделал в крайнем случае, чтобы получить этот каталог благосклонности для моего фрейма без настройки каждого из них вручную.

catalog = DataCatalog(
    {
        name: ParquetDataSet(filepath=f"data/{name}.parquet")
        for name in pipeline.all_outputs()
    }
)

⚠ Если все ваши наборы данных являются pandas DataFrames

Для примера выше, который не использует DataFrames, я бы забрал все свои выходы, чтобы позволить их перегрузить их позже.

catalog = DataCatalog(
    {
        name: PickleDataSet(filepath=f"data/{name}.pkl")
        for name in pipeline.all_outputs()
    }
)

🔥 Для использования с наборами данных без Pandas

логирование

Как только вы явно добавляете наборы данных, Kedro начнет журнал, когда он загружается, работает или сохраняет каждый узел. Вещи начнут выглядеть немного более знакомым для любого, кто раньше использовал кедро.

ww3 ↪main ©kedro-in-scripts v3.8.8 ipython
❯ runner.run(pipeline, catalog)
2021-04-18 09:30:58,099 - kedro.pipeline.node - INFO - Running node: (None) -> [range]
2021-04-18 09:30:58,100 - kedro.io.data_catalog - INFO - Saving data to `range` (PickleDataSet)...
2021-04-18 09:30:58,104 - kedro.runner.sequential_runner - INFO - Completed 1 out of 5 tasks
2021-04-18 09:30:58,105 - kedro.io.data_catalog - INFO - Loading data from `range` (PickleDataSet)...
2021-04-18 09:30:58,105 - kedro.pipeline.node - INFO - Running node: ([range]) -> [range**2]
2021-04-18 09:30:58,105 - kedro.io.data_catalog - INFO - Saving data to `range**2` (PickleDataSet)...
2021-04-18 09:30:58,111 - kedro.runner.sequential_runner - INFO - Completed 2 out of 5 tasks
2021-04-18 09:30:58,111 - kedro.io.data_catalog - INFO - Loading data from `range**2` (PickleDataSet)...
2021-04-18 09:30:58,112 - kedro.pipeline.node - INFO - Running node: ([range**2]) -> [range>5k]
2021-04-18 09:30:58,112 - kedro.io.data_catalog - INFO - Saving data to `range>5k` (PickleDataSet)...
2021-04-18 09:30:58,115 - kedro.runner.sequential_runner - INFO - Completed 3 out of 5 tasks
2021-04-18 09:30:58,115 - kedro.io.data_catalog - INFO - Loading data from `range>5k` (PickleDataSet)...
2021-04-18 09:30:58,115 - kedro.pipeline.node - INFO - Running node: ([range>5k]) -> [range>5k-mean]
2021-04-18 09:30:58,115 - kedro.io.data_catalog - INFO - Saving data to `range>5k-mean` (PickleDataSet)...
2021-04-18 09:30:58,118 - kedro.runner.sequential_runner - INFO - Completed 4 out of 5 tasks
2021-04-18 09:30:58,119 - kedro.io.data_catalog - INFO - Loading data from `range>5k` (PickleDataSet)...
2021-04-18 09:30:58,119 - kedro.pipeline.node - INFO - Running node: ([range>5k]) -> [range>5k-head]
2021-04-18 09:30:58,119 - kedro.io.data_catalog - INFO - Saving data to `range>5k-head` (PickleDataSet)...
2021-04-18 09:30:58,122 - kedro.runner.sequential_runner - INFO - Completed 5 out of 5 tasks
2021-04-18 09:30:58,122 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.

Кедро, а именно

Я не смог быстро поднять Кедро, а именно бежать для моего варианта использования. Если вы Очень хотелось, чтобы вы могли начать изменение их функции format_pipelines_data в Server.py Анкет Или вы можете отобразить новый шаблон и положить туда свой трубопровод для, а именно, а именно, а именно, а именно, а именно цели.

Это возможно, но может также придерживаться шаблона

Кли

Для чего -то, на чем я буду использовать это, я, вероятно, не приложу много усилий в CLI, так как это вряд ли у нас будет команда разработчиков, с которой постоянно взаимодействуют. Я бы просто собрал минимум, необходимый для запуска моего приложения, как мне это нужно.

if __name__ == "__main__":
    import sys

    if '--skip-raw' in sys.argv:
        runner.run(pipeline.from_inputs('range**2'), catalog)
    else:
        runner.run(pipeline, catalog)

Сохраняя это просто

Если я хочу спуститься по маршруту, созданного полным CLI, я, вероятно, собираюсь использовать полный шаблон Kedro или что -то очень похожее.

Это немного грубо

Хотя я мог бы использовать это где -то в производстве, это будет внутри другого приложения, а не кедро. Я все равно буду использовать что -то очень похожее на их шаблон для моих проектов по трубопроводу. Это скучает по некоторым отличным вещам, которые приводят меня к кедро, как крючки, плагины, учетные данные, каталог, конфигурации журнала, CLI и, а именно.

Проверьте эти связанные сообщения

Оригинал: “https://dev.to/waylonwalker/using-kedro-in-scripts-6l2”