С последними выпусками Kedro 0,17.x
, теперь можно запустить kedro трубопроводы из сценариев. Хотя я бы не стал начинать проект с этим Техника, это будет отличный инструмент, чтобы держать в моем заднем кармане, когда я захочу Посыпать немного добра в существующих проектах.
Новичок в Кедро
Что такое Кедро |
Если вы просто узнаете о Кедро, проверьте этот пост, пройдя через него
Больше нет кроличьей дыры ошибок
По состоянию на 0,17,2
Я пытался сделать это в Кедро 0.16.x,
которая превратилась в кроличье дыру ошибки. Во -первых, Кедро нуждался в conf
каталог. Если вы пытались подделать один, Kedro Затем попросил настройку журнала. Эти ошибки просто продолжали доходить до сути Это не стоило, и я мог бы также использовать правильный шаблон для реального Проекты и придерживаться простых функций, вызовы для вещей, которые не являются Kedro проект.
Кедро в сценарии
Чтобы получить работу KEDRO, вам понадобится конвейер, каталог и бегун как минимум. Если вы раньше использовали KEDRO, трубопровод будет очень похож на то, с чем вы знакомы, но каталог не будет загружен из YAML, и вам нужно будет принести своего бегуна.
from kedro.pipeline import Pipeline, node from kedro.io import DataCatalog from kedro.runner.sequential_runner import SequentialRunner # additional datasets you want to use from kedro.extras.datasets.pandas.csv_dataset import CSVDataSet from kedro.extras.datasets.pandas.parquet_dataset import ParquetDataSet # the sequential runner is the simplest. It runs one node at a time. runner = SequentialRunner() # this is a super simple example pipeline pipeline = Pipeline( [ node(lambda: range(100), None, "range"), node(lambda x: [i ** 2 for i in x], "range", "range**2"), node(lambda x: [i for i in x if i > 5000], "range**2", "range>5k"), node(lambda x: x[:5], "range>5k", "range>5k-head"), node(lambda x: sum(x) / len(x), "range>5k", "range>5k-mean"), ] ) # to get up and running, you can use an empty catalog catalog = DataCatalog() runner.run(pipeline, catalog)
👆 Выше минимальная установка для работы с трубопроводом KEDRO
более практически
Чаще всего ваши трубопроводы KEDRO будут использовать функцию, а не лямбду, и пандас DataFrames.
def clean_columns(df: pd.DataFrame): df.columns = [col.lower().strip() for col in df.columns] pipeline = Pipeline( [ node(clean_columns, "raw_data", "clean_columns", name="create_clean_columns"), ] ) catalog = DataCatalog( { "raw_data": ParquetDataSet(filepath=f"data/raw_data.parquet") "clean_columns": ParquetDataSet(filepath=f"data/clean_columns.parquet") } )
Один трубопровод для одного узла, чтобы получить вас начал
Полуавтоматический каталог
По какой -то причине, когда я попытался использовать DataCatalogWithDefault, он не поднял мои наборы данных правильно. Я подозреваю, что это как -то связано с тем, что не настраивает надлежащий сеанс, так что это то, что я сделал в крайнем случае, чтобы получить этот каталог благосклонности для моего фрейма без настройки каждого из них вручную.
catalog = DataCatalog( { name: ParquetDataSet(filepath=f"data/{name}.parquet") for name in pipeline.all_outputs() } )
⚠ Если все ваши наборы данных являются pandas DataFrames
Для примера выше, который не использует DataFrames, я бы забрал все свои выходы, чтобы позволить их перегрузить их позже.
catalog = DataCatalog( { name: PickleDataSet(filepath=f"data/{name}.pkl") for name in pipeline.all_outputs() } )
🔥 Для использования с наборами данных без Pandas
логирование
Как только вы явно добавляете наборы данных, Kedro начнет журнал, когда он загружается, работает или сохраняет каждый узел. Вещи начнут выглядеть немного более знакомым для любого, кто раньше использовал кедро.
ww3 ↪main ©kedro-in-scripts v3.8.8 ipython ❯ runner.run(pipeline, catalog) 2021-04-18 09:30:58,099 - kedro.pipeline.node - INFO - Running node:(None) -> [range] 2021-04-18 09:30:58,100 - kedro.io.data_catalog - INFO - Saving data to `range` (PickleDataSet)... 2021-04-18 09:30:58,104 - kedro.runner.sequential_runner - INFO - Completed 1 out of 5 tasks 2021-04-18 09:30:58,105 - kedro.io.data_catalog - INFO - Loading data from `range` (PickleDataSet)... 2021-04-18 09:30:58,105 - kedro.pipeline.node - INFO - Running node: ([range]) -> [range**2] 2021-04-18 09:30:58,105 - kedro.io.data_catalog - INFO - Saving data to `range**2` (PickleDataSet)... 2021-04-18 09:30:58,111 - kedro.runner.sequential_runner - INFO - Completed 2 out of 5 tasks 2021-04-18 09:30:58,111 - kedro.io.data_catalog - INFO - Loading data from `range**2` (PickleDataSet)... 2021-04-18 09:30:58,112 - kedro.pipeline.node - INFO - Running node: ([range**2]) -> [range>5k] 2021-04-18 09:30:58,112 - kedro.io.data_catalog - INFO - Saving data to `range>5k` (PickleDataSet)... 2021-04-18 09:30:58,115 - kedro.runner.sequential_runner - INFO - Completed 3 out of 5 tasks 2021-04-18 09:30:58,115 - kedro.io.data_catalog - INFO - Loading data from `range>5k` (PickleDataSet)... 2021-04-18 09:30:58,115 - kedro.pipeline.node - INFO - Running node: ([range>5k]) -> [range>5k-mean] 2021-04-18 09:30:58,115 - kedro.io.data_catalog - INFO - Saving data to `range>5k-mean` (PickleDataSet)... 2021-04-18 09:30:58,118 - kedro.runner.sequential_runner - INFO - Completed 4 out of 5 tasks 2021-04-18 09:30:58,119 - kedro.io.data_catalog - INFO - Loading data from `range>5k` (PickleDataSet)... 2021-04-18 09:30:58,119 - kedro.pipeline.node - INFO - Running node: ([range>5k]) -> [range>5k-head] 2021-04-18 09:30:58,119 - kedro.io.data_catalog - INFO - Saving data to `range>5k-head` (PickleDataSet)... 2021-04-18 09:30:58,122 - kedro.runner.sequential_runner - INFO - Completed 5 out of 5 tasks 2021-04-18 09:30:58,122 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
Кедро, а именно
Я не смог быстро поднять Кедро, а именно бежать для моего варианта использования. Если вы Очень хотелось, чтобы вы могли начать изменение их функции format_pipelines_data в Server.py Анкет Или вы можете отобразить новый шаблон и положить туда свой трубопровод для, а именно, а именно, а именно, а именно, а именно цели.
Это возможно, но может также придерживаться шаблона
Кли
Для чего -то, на чем я буду использовать это, я, вероятно, не приложу много усилий в CLI, так как это вряд ли у нас будет команда разработчиков, с которой постоянно взаимодействуют. Я бы просто собрал минимум, необходимый для запуска моего приложения, как мне это нужно.
if __name__ == "__main__": import sys if '--skip-raw' in sys.argv: runner.run(pipeline.from_inputs('range**2'), catalog) else: runner.run(pipeline, catalog)
Сохраняя это просто
Если я хочу спуститься по маршруту, созданного полным CLI, я, вероятно, собираюсь использовать полный шаблон Kedro или что -то очень похожее.
Это немного грубо
Хотя я мог бы использовать это где -то в производстве, это будет внутри другого приложения, а не кедро. Я все равно буду использовать что -то очень похожее на их шаблон для моих проектов по трубопроводу. Это скучает по некоторым отличным вещам, которые приводят меня к кедро, как крючки, плагины, учетные данные, каталог, конфигурации журнала, CLI и, а именно.
Проверьте эти связанные сообщения
Оригинал: “https://dev.to/waylonwalker/using-kedro-in-scripts-6l2”