Рубрики
Без рубрики

Создание конвейера данных от Airtable до бэкэнда

Как вы можете использовать Airtable для вашей электронной коммерции или общей разработки приложений. Tagged с Python, Django, Airtable, Tipelines.

Я часто работаю с системами электронной коммерции, и нам часто нужно получать данные о продукте в любую систему электронной коммерции, которую мы используем, может ли это быть Saleor, Magento, WordPress, Shopify или что-то еще. Одним из инструментов, который мы находим полезным, является Airtable для сотрудников администрации, чтобы сначала заполнить всю информацию о продукте в организационной манере, подходящей для их команды. Затем после этого мы могли бы написать сценарий в Python, чтобы импортировать строки через AISTABLE API в нашу базу данных приложений. Airtable API прост в использовании, но есть некоторые соображения и проблемы, с которыми мы столкнулись. Я постараюсь наметить ниже, чтобы облегчить другим разработчикам.

Некоторые технологии и инструменты, которые мы будем освещать в этой статье:

  • Airtable и Airtable API
  • Python и Django (рамка электронной коммерции Saleor)
  • Graphql запросы и мутации

Для тех, кто не знаком с Airtable, это удобный инструмент для пользователя, который имеет хорошо задокументированный API, настроенный на каждую базу, которая у вас есть. Он имеет щедрый бесплатный уровень, и я нахожу его полезным для различных задач от легких систем CMS до предложения клиентов простыми настройками конфигурации.

GraphQL является альтернативой REST и позволяет разработчикам запросить именно необходимые им данные. Одна из точек ритма GraphQL заключается в том, что он самостоятельно документирует и набирает Язык запросов . Пример запроса GraphQL:

import gql from 'graphql-tag';
import { useQuery } from '@apollo/react-hooks';

const GET_DOGS = gql`
  {
    dogs {
      id
      breed
    }
  }
`;

Подготовка столбцов с атмосфером и ключа API

Airtable действительно интуитивно понятно и прост в использовании. Вот почему это хороший инструмент для нетехнических людей, таких как офисные администраторы и ввод данных. В качестве примера у нас есть такая база продукта, которая содержит столбцы:

  • SKU: Текст
  • Имя: Текст
  • Описание: длинный текст
  • base_price: номер
  • Дата публикации: Дата
  • is_published: Логический
  • масса: Десятичная дробь
  • количество Количество
  • Images_opt: вложения (объяснит больше позже)
  • Категория: SingleSelect
  • product_pk_auto: Текст (ID)
  • HAS_BEEN_IMPORTED: Логический

Есть некоторые дополнительные параметры для API GraphQL, в которые мы будем публиковать мутацию, поэтому мы можем отметить это с помощью _opt Суффикс, чтобы дать администраторам знать, что поле не является обязательным.

После того, как все столбцы и некоторые примеры данных установлены на базе Airtable, Airtable предоставит нам пользовательский API, заполненный нашими столбцами и примерами данных.

Нам нужно взять Base_id и Api_key использовать в нашем Получить запрос. Наш Api_key находится на левой панели, просто проверьте «Показать ключ API». В целях импорта списка продуктов в нашу базу данных нам нужно будет использовать конечную точку «Записи списков». Пример запросов скручивания выглядит так:

    curl "https://api.airtable.com/v0/appC7p39pnmKYOJ17/Collections?maxRecords=3&view=Grid%20view" \
      -H "Authorization: Bearer YOUR_API_KEY"

Общая логика импорта, которую мы хотим:

  1. Возьмите список всех рядов с Airtable
  2. Получите токен аутентификации от нашего API электронной коммерции, если он есть
  3. Обязательно получите какие -либо отображения, такие как категория → Category_IDS из нашей системы
  4. Карту каждой строки из Airtable в наш объект запроса систем
  5. Создайте запрос POST для обновления нашей системы (мутация GraphQL в моем случае)
  6. Получите продукт_ид и обновите Airtable с ним
  7. Загрузите и обработайте любые изображения в нашем приложении
  8. Повторите с каждым рядом Пока все ряды не исчерпаны

В нашем случае нам нравится реализовать пользовательский Django Command Чтобы инициировать задачу импорта. Позже это довольно приятно и возможно работать на работе Cron Cron, если мы желаем. Вот функция сценария обработчика, чтобы получить список всех рядов от Airtable, аутентификации, отправки мутаций и обновления обратно в Airtable. В последующих шагах мы заполним логику субботней функции.

    from django.core.management.base import BaseCommand, CommandError
    import requests
    from pprint import pprint
    import json
    import os

    API_KEY = 'airtable-api-key';

    class Command(BaseCommand):

      def handle(self, *args, **options):
        # pull records from airtable
        response = requests.get('https://api.airtable.com/v0/appC7p39pnmKYOJ17/Products?maxRecords=1000&view=Grid%20view', headers={'Authorization': 'Bearer ' + API_KEY})

        product_rows = response.json().get('records')
        pprint('product rows >> ', product_rows)

        json = self.get_permission_token()
        if len(json['data']['tokenCreate']['errors']) is not 0:
          print('Abort importing, Authentication failed.')
          return
        print('New token created: {} for import user: {} \n\n'.format(json['data']['tokenCreate']['token'], json['data']['tokenCreate']['user']['email']))
        headers = {
          'Authorization': 'JWT {}'.format(json['data']['tokenCreate']['token'])
        }

        category_mappings = self.get_categories();

        for pr in product_rows:
          fields = pr.get('fields', None)

          if not fields or fields.get('has_been_imported') or fields.get('product_id_pk'):
            print('No fields or row has already been imported: sku={}, name={}'.format(fields.get('sku'), fields.get('name')))
            continue
          create_input = self.prepare_input(fields=fields, collections_mappings=collections_mappings)

          results = self.send_mutation(create_input, headers)
          if not results:
            self.handle_error()

          self.update_airtable(pr['id'], results)

      def prepare_input():
        pass

      def get_permission_token():
        pass

      def get_categories(self):
        pass

      def send_mutation(self):
        pass

      def update_airtable(self):
        pass

      def handle_error(self):
        pass

В следующих разделах мы будем реализовать каждый этап логики, заполнив функции. У нас будет основной для Цикл, который проходит каждую запись, поэтому, если у нас есть токен аутентификации для нашего приложения, что, скорее всего, вы получите, нам нужно получить этот токен, прежде чем войти в цикл. Пример, который работает для нашего случая, показан ниже:

 def get_permission_token():
   mutation = '''
     mutation TokenCreateMutation($email: String!, $password: String!) {
       tokenCreate(email: $email, password: $password) {
         token
         errors{
           field
           message
         }
         user{
           id
           email
           }
         }
       }
   '''
   input = {"email": auth_email, "password": auth_password}
   response = requests.post('http://localhost:8000/graphql/', json={'query': mutation, "variables": input})

Теперь, когда у нас есть токен Auth, мы можем использовать его каждый раз, когда нам нужно обновлять что-либо в нашей системе электронной коммерции. Следующий шаг должен убедиться, что данные правильно отображаются в формате, который примет наш API. Мы также должны убедиться, что любые столбцы, которые не являются необязательными, должны быть предоставлены, и пропустили запись с дополнительной обработкой ошибок, если поля отсутствуют. Каждый API и система отличаются, поэтому вам придется настроить его, чтобы он работал для вас. Я покажу свою реализацию ниже для API Producte Producte Producte:

def prepare_input(self, fields):
  if (fields.get('name') and fields.get('base_price') and fields.get('category') and fields.get('description') and fields.get('publication_date') and fields.get('weight')):
    input = {
      'publicationDate': fields.get('publication_date'),
      'name': fields.get('name'),
      'description': fields.get('description'),
      'isPublished': fields.get('is_published'),
      'weight': fields.get('weight'),
      'basePrice': fields.get('base_price'),
      'sku': fields.get('sku'),
      'weight': fields.get('weight'),
      # just pass the first image
      'airtableImageUrl': fields.get('images_opt')[0]['url']
    }
 # handle mappings for category

Мы также должны убедиться, что реляционные отображения верны в процессе импорта. Итак, продукт: настенный шкаф A может принадлежать категории: «Кухонные наборы». В API это обычно делается путем прохождения списка идентификаторов, таких как {'Категория': 'Q2F0ZWDVCNK6MTA'} к созданию мутации. На интерфейсе Airtable мы хотим, чтобы идентификаторы были человеческими читаемыми входами, такими как: Кухонные наборы Анкет Мы можем сохранить сопоставления как твердый словарь в сценариях импорта, или мы можем динамически получить список продуктов из нашего приложения.

 def get_categories(self):
     query = '''
       query categories($first: Int) {
           categories(first: $first) {
               edges {
                   node {
                       id
                       name
                   }
                   cursor
               }
               totalCount
           }
       }
   '''
   response = requests.post(GRAPHQL_BASE, json={'query': query, 'variables': { 'first': 20 }})
   json = response.json()
   category_mappings_to_id = {}
   for category in json['data']['categories']['edges']:
     category_mappings_to_id[category['node']['name']] = category['node']['id']
   return category_mappings_to_id

   def prepare_input(self, fields, category_mappings):
     # basic input fields...
     # handle mappings for category and any other related models
     if fields.get('category'):
       input['category'] = category_mappings[fields.get('category')]

Теперь, когда ввод – это то, как мы хотим его, нам нужно фактически отправить мутацию в наш API, чтобы обновить строки в базе данных. Мы будем реализовать send_mution функционируйте сейчас. Мы обрабатываем случай ошибки, если результаты API пусты и возвращаем продукт в функцию обработчика.

 def send_mutation(self, input, headers):
     mutation = '''
       mutation ProductCreateMutation($input: ProductCreateInput!) {
           productCreate(input: $input) {
               errors {
                   field
                   message
               }
               product {
                   id
                   name
                   description
               }
           }
       }
   '''

   response = requests.post(GRAPHQL_BASE, json={'query': mutation, "variables": { "input": input }}, headers=headers)
   json = response.json()
   print('Product updated :: ', json['data']['productCreate'])
   if len(json['data']['productCreate']['errors']) is not 0:
     print('Import of {} failed'.format(input['name']))
     return None
   else:
     return json['data']['productCreate']['product']

Как только мы уверены, что продукт был обновлен, и в нашем приложении не было никаких ошибок, мы захотим обновить строку Airtable, чтобы отразить то, что продукт теперь существует в нашей базе данных приложений. Это также гарантирует, что один и тот же сценарий может работать несколько раз, так как в лист Airtable добавляется больше продуктов, так как мы можем пропустить записи, которые уже импортировались.

 def update_airtable(self, id, updated):
   patch_payload = {
     "records": [{
       "id": id,
       "fields": {
         "has_been_imported_auto": True,
         "product_id_pk_auto": updated['id'],
       }
     }]
   }

   response = requests.patch('https://api.airtable.com/v0//Products', json=patch_payload, headers={'Authorization': 'Bearer ' + api_key})
   data = response.json()
   print('Airtable id updated: {} \n'.format(data['records'][0]['id']))

Итак, один из способов сделать это – сделать модель приложения, сохранив airtable_image В другом столбце используйте фоновое задание, чтобы загрузить вложение изображения из сохраненного URL -адреса. Специфика фоновых заданий выходит за рамки этого учебника, но логика загрузки изображений может выглядеть примерно так:

 # background tasks file in Django
import requests
import tempfile

 @app.task
 def save_airtable_product_image(pk):
   instance = Product.objects.get(pk=pk)
   temp = download_airtable_attachment_image(instance.airtable_image_url)
   file_name = instance.airtable_image_url.split('/')[-1]
   pi = ProductImage()
   pi.product = instance
   pi.image.save(file_name, files.File(temp))
   create_product_thumbnails(pi.pk)

 def download_airtable_attachment_image(image_url):
   '''Download airtable (https://dl.airtable.com/.attachments) image and save to filesystem, then save reference in product_collection.background_image, then generate thumbnails for various sizes
   '''
   request.get(image_url, stream=True)
   if request.status_code != requests.codes.ok:
     print('Could not download image {}'.format(image_url))

   temp = tempfile.NamedTemporaryFile()
   for block in request.iter_content(1024 * 8):
     if not block:
       break
     temp.write(block)

   return temp

Это подводит итог основного урока по созданию трубопровода от Airtable до вашего заявления на электронную коммерцию. Некоторые дополнительные улучшения, чтобы попробовать, могут быть добавлены предупреждения о Slack для членов команды с резюме импорта или автоматизировать задачу в ежедневном графике. Оставит это читателю, чтобы попробовать.

Мне нравится создавать приложения электронной коммерции на различных технологических стеклах, включая Django, Node.js и React. Еще более приятным помогает торговцам добиться успеха и масштабировать с технологиями. Я надеюсь, что мои разработчики сочинений для писателей и, если вы знаете торговцев, которые могли бы использовать цифровые технологии для улучшения своего бизнеса, не стесняйтесь обращаться к общению!

Оригинал: “https://dev.to/adrienshen/creating-a-data-pipeline-from-airtable-to-backend-31m0”