Рубрики
Без рубрики

Получение данных от датчиков ESP8266

Хорошо, так что в прошлый раз я прошел через то, как я запрограммировал и подключил несколько микроконтроллеров ESP8266, чтобы … Tagged с помощью Python, Sockets, SQLite3, потока.

Правильно, так что в прошлый раз я прошел через то, как я запрограммировал и подключил несколько микроконтроллеров ESP8266 для сбора данных температуры, влажности и влажности почвы и пучка, которые данные по указанному адресу в сети.

В этом посте я пройду через код, чтобы запустить сервер на чем -то вроде ноутбука или Raspberry Pi, чтобы собрать входящие данные и написать его в базу данных SQLITE3 для последующего анализа.

Этот пост будет охватывать такие темы, как потоки, более продвинутое управление сокетами и взаимодействие с базой данных от Python.

Полный код версии доступен на GitHub Анкет Обратите внимание, что ссылка исправлена и не будет обновляться по мере того, как я улучшаюсь-для этого, Go Здесь И проверьте проект Greenhouses.

Я буду втягивать фрагменты из этого кода в этот пост, но чтобы увидеть все это в контексте, вы должны посмотреть на ссылку.

Быстрое перефразирование

Итак, помните, что моя цель для этого проекта – развернуть парк экологических датчиков по всей теплице, чтобы выбирать данные и отправить его в центральное место, где я принимаю решения по контролю (например, включите обогреватель, открыть вентиляционное отверстие, вентиляционное отверстие, водный лоток А и т. Д.). Нам нужна архитектура нашей сервера, чтобы одновременно поддерживать несколько датчиков, а также для сопоставления показаний в течение двух секунд в среднем за минуту или около того.

На мгновение давайте притворимся, что мы ожидаем только одного клиентского датчика поговорить с нами. Позже мы будем иметь дело с несколькими датчиками, но тогда нам приходится иметь дело с темами и блокировкой, и это гигантский беспорядок.

Итак, сейчас один датчик.

В стороне: программирование с сокетами сервера

Когда вы являетесь сервером, у вас, как правило, есть один сокет/порт, открытый для прослушивания входящих клиентов-как веб-сервер, прослушивая порт 80 для новых клиентов. Сложно то, что вы не хотите делать свое основное общение со своими клиентами в этом розетке, или вы не сможете использовать его для прослушивания для более входящих клиентов.

Представьте, что система с именем BOB идет в Google и просит загрузить какой -то гигантский файл, и Google отвечает этим файлом в том же сокете, который он использует для принятия новых клиентских подключений. Затем вы заходите в Google, но не можете пройти через, потому что «линия занята», поскольку Google разгружает файл на Боба, который занимает вечно-и все это время, вы не можете поговорить с Google.

Таким образом, вам нужен один розетка на «публичном» порту, чтобы инициализировать соединение, а затем еще один розетка, чтобы фактически обрабатывать общение-вроде как оператор в бизнесе, принимающий входящие вызовы, а затем маршрутируют их на разные строки. Что-то вроде.

Вернуться в сеть …

Итак, у нас есть куча ESP8226, пытающихся открыть соединения с фиксированным IP и портом-я собираюсь сказать, это IP 192.168.1.5 и порт 5555. Это не имеет значения, если сервер и микроконтроллеры согласны, а номер порта находится где -то между 2048 и 60666 или около того. (Я предполагаю, что в этих цифрах вы можете их посмотреть).

Ваш сервер должен прослушать этот адрес-комбинация IP/порта-для входящих соединений. Это довольно просто:

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(("192.168.1.5", 5555))
serversocket.listen()

Пройдя через это:

Сначала мы создаем розетку с некоторыми атрибутами. Это сокет Af_inet Семья, которая в основном означает, что она будет разговаривать с Интернетом (вместо другого процесса в вашей системе или какого -то странного другого сетевого протокола). В основном это просто определяет, как сокет работает под капюшоном, а также тип адреса, который мы собираемся дать.

Это также розетка Sock_stream Тип, что на самом деле означает, что сокет будет иметь внутренний буфер, поэтому мы можем «транслировать» данные с полезной скоростью для себя.

Затем мы связываем гнездо с адресом. Примечание: Если вы столкнетесь с трудностями с этой линией, убедитесь, что IP, который у вас есть, соответствует IP -адресу системы, которую вы используете. Если они несоответствуют, вы получите здесь исключение (поскольку вы пытаетесь связаться с адресом, которого у вас на самом деле нет).

После этого мы установили розетку в режим «прослушать».

Со мной до сих пор?

(clientsocket, address) = serversocket.accept()

Хорошо, теперь, теперь мы говорим: «Эй, сокет, послушай новое соединение, и когда вы получите его, передайте мне розетку, чтобы поговорить с этим новым соединением и дайте мне знать, что это за адрес». Это Блокирующий звонок , что означает, что наша программа теперь остановится, пока не появится новое соединение. Очевидно, что это значительно усложняет какую-либо обработку данных, пока мы просто сидим здесь и ждем-но мы исправим это позже.

Если вы играете с этим вместе с бегущим микроконтроллером, что вы должны увидеть, это запуск программы, а затем повесить, когда он ждет подключения входящего сокета-если вы используете мой код ESP8266, он будет неуклонно мигать, когда он приобретает Wi-Fi Соединение, тогда оно будет более сердито моргнуть, поскольку оно создаст подключение к розетку, и тогда оно должно быть твердое синее с прерывистым миганием, когда он пишет в розетку.

Конечно, мы еще не читаем какие -либо данные из сокета! Давайте исправим это:

buffer = ""

while 1:
    data = clientsocket.recv(128)
    datastr = buffer + data.decode('utf-8')
    split = datastr.split("\n")
    buffer = split[-1]
    print(split[:-1])

Итак, рабочий дом здесь clientsocket.recv (128) вызов. Это требует 128 байтов данных из сокета-это также блокирующий вызов, что означает, что выполнение остановится, пока мы не получим 128 байтов данных. Когда мы получаем данные, мы декодируем их от кучу байтов в строку и шлепаем их на буфер.

Буфер необходим, потому что 128 байтов могут быть не точно длиной одной «записи», отправленной из ESP8266-у вас может быть вся запись A и половина записей B. В нашем «протоколе» новички используются в качестве границ между записями, что означает, что у нас есть простой способ рассказать, когда у нас есть целая запись.

Итак, мы добавляем данные, которые мы получили в буфер (на случай, если мы только что получили вторую половину записи B), а затем разделили буфер на новичках, чтобы разделить записи. Мы поместили последнюю запись из раскола, которая может быть частичной, и откидываем ее обратно в буфер. Затем мы печатаем остальные записи (за исключением последнего).

При этом вы сможете увидеть входящие данные с одного датчика ESP8266! Верна!

Теперь нам нужно работать над масштабированием до нескольких соединений.

Как я упоминал пару раз, мы используем некоторые вызовы, которые блокируют выполнение, что заставило нашу программу остановкой, пока не произойдет какая -то внешняя вещь. Это затрудняет масштабирование до большого количества датчиков, поэтому нам придется представить некоторые резьбы.

Если вы не знакомы, то нить-это способ для одной программы сделать много вещей одновременно-Multiple «потоки исполнения». Одиночный процесс может иметь несколько потоки где все потоки имеют одну и ту же память, что делает их быстрее, чем если бы у вас было несколько процессов, которые просто пытались поговорить друг с другом. (Это также значительно облегчает писать.)

Сложная вещь о потоках – это то, что вы можете иметь условия гонки , где два потока пытаются одновременно получить доступ к одному и тому же ресурсу. Как правило, это приводит к странному-и трудно воспроизвести-поведение, поэтому вы должны быть очень, очень осторожны, чтобы управлять вашим доступом к общим ресурсам при использовании потоков для предотвращения этого, или отладки это будет живым адом.

Общая архитектура, которую мы собираемся создать, будет выглядеть как большинство простых серверных моделей. У нас будет серверный поток, который будет прослушивать на порту 5555 и принять входящие подключения. Эти подключения будут перемещены в другой розетку на другом порте в другом потоке, поэтому поток сервера может продолжать ждать вещи.

Python предоставляет полезную библиотеку потоков, к счастью. Есть пара способов писать темы-вы можете либо сделать новый Нить. Тема Объект и передайте функцию, которую поток будет работать как аргумент, или вы можете сделать класс, который происходит из Нить. Тема и просто переопределить запустить метод Я выбрал последнее, так как я хотел иметь какое -то спасенное состояние, и это казалось хорошим способом сделать это. Вы можете делать все, что вам нравится.

Принимая входящие соединения

class Overseer(threading.Thread):
    def __init__(self, threads):
        super(Overseer, self).__init__()
        print("Starting overseer thread...")

    def run(self):
        global threads_run
        global threads

        serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        serversocket.bind(("192.168.1.149", 5555))
        serversocket.listen()

        while threads_run:
            (clientsocket, address) = serversocket.accept()
            l = Listener(clientsocket, address)
            threads.append(l)
            l.start()

        serversocket.close()

Большая часть этого должна выглядеть знакомо, правда. Важными вещами здесь являются Global Threads_run переменная и L (Clientsocket, Address) с L.Start () внизу.

Класс слушателей – это еще один класс темы, который я сделал, к которому мы доберемся через мгновение.

Прогулка через:

Метод init должен быть большим тем, что вы ожидаете. Один улов, ты должен Обязательно позвоните в Superclass Init, или вы получите исключение, когда попытаетесь .start () нить.

Процетка while отключена от переменной, разделенной между всеми потоками-и модифицированной только в исходном основном потоке. Это позволяет нам чисто остановить все потоки из оригинала и очистить наши ресурсы, когда мы закончим.

С надзирателем в своей собственной потоке мы можем постоянно блокировать и ждать входящих соединений, не мешая нашей программе делать другие вещи. Это основная идея и основное значение потока.

Теперь один недостаток в этом дизайне заключается в том, что если мы переключаем Threads_run В то время как Мы ждем входящего соединения, мы могли бы просто ждать вечно. Я хотел написать прерывание для этого, но … позже.

Также обратите внимание, как мы добавляем новую ветку слушателя к глобальному потоки Список-мы придут к этому.

Слушая эти входящие соединения

class Listener(threading.Thread):
    def __init__(self, clientsocket: socket.socket, address) -> None:
        super(Listener, self).__init__()
        print("Starting listener thread...")
        self.socket = clientsocket
        self.address = address
        self.buffer = ""

    def run(self):
        global lock
        global shared_list
        global threads_run

        while threads_run:
            data = self.socket.recv(128)
            datastr = self.buffer + data.decode('utf-8')
            self.buffer = ""
            split = datastr.split("\n")
            self.buffer = split[-1]
            split = split[:-1]

            with lock:
                shared_list.extend(split)
                lock.notify()

        self.socket.close()

Именно здесь все начинает становиться немного сложнее, но это все еще довольно близко к тому, что у нас было раньше. Помните, что потоки слушателя строятся и начинаются в ветке надзирателя, где они получают свой розетка и свой адрес.

В ветке слушателя, в то время как глобальный Threads_run Это правда, мы постоянно блокируем и получаем данные из сокета. Опять же, поскольку это происходит в его собственной ветке, блокировка здесь не помешать нам выполнять в других потоках.

Мы делаем ту же обработку по полученным данным, которые мы делали раньше … А потом у нас есть это странное с замком вещь.

Итак, помните, как я говорил о том, как вы должны быть очень осторожны при доступе к общим ресурсам, когда вы работаете с потоками? Это один из тех случаев. Здесь, shared_list передается между всеми работающими потоками, и именно там они пишут данные, которые они получают из сети. Чтобы избежать столкновений и странного поведения, мы используем это Замок вещь, которая является резьба. Условие учебный класс. По сути, замок – это способ для потоков, чтобы гарантировать, что только один из них затрагивает вещь за раз. Поток должен приобрести блокировку, который может иметь только один поток за раз, а затем он может что -то делать, а затем он должен отпустить блокировку для других потоков, которые блокируют, когда они Попробуйте приобрести замок. Это хорошо обрабатывается с с синтаксис в Python; Когда ты с С замком Python обрабатывает приобретение и выпуск замка. Обратите внимание, что вам нужно вручную позвонить lock.notify () хотя, чтобы другие потоки знали, что блокировка доступна. Также обратите внимание, что я приобретаю блокировку только тогда, когда собираюсь взаимодействовать с общими данными, и я выпускаю их сразу после того, как я закончил возиться с общими данными- Нет смысла заставить другие темы ждать меня, пока я обрабатываю строки, которыми они не делятся, они могли бы делать что -то полезное в то время, поэтому я стараюсь удержать замок на как можно меньше времени.

Теперь это супер высокий уровень вида замков. Замки могут стать очень сложными, особенно если вы получите круговые ожидания (где нить A ждет на потоке B, но нить B ждет на резьбе A). Подразделение – это супер сложная концепция, в которую я не собираюсь заходить в этом посте, но будьте уверены, что на нем много других чтений, если вам это заинтересовано.

Во всяком случае, После того, как у вас есть блокировка, вы добавляете вещи в список, а затем выпускаете блокировку, гарантируя, что ничего странного не происходит с общим доступом. Прохладный? Прохладный.

Взаимодействие с базой данных

Если вы не использовали их раньше, базы данных представляют собой системы, разработанные для простого хранения и поиска-как вы уже догадались-Data. Они используются практически во всех аспектах программного обеспечения, от управления пользователями блогов до контента Twitter до журналов доступа конфиденциальных физических областей. Это довольно сложные структуры, которые часто требуют самостоятельного сервера для работы.

Для наших целей мы можем обойтись с SQLite, супер легким двигателем базы данных, который просто хранит все данные в файле рядом с вашей программой. Это тривиально легко настроить и взаимодействовать в Python.

Теперь я объясню, что я делаю с командами SQL, которые я использую здесь, но на самом деле это не так много учебного пособия по базе данных, как урок по взаимодействию с тем.

Прохладный? Прохладный. Впереди!

class DBWriter(threading.Thread):
    def __init__(self):
        super(DBWriter, self).__init__()
        print("Starting DB writer thread...")

    def run(self):
        global threads_run
        global lock
        global shared_list

        connection = sqlite3.connect("greenhouse.db")
        connection.execute("CREATE TABLE if not exists readings "
                           "(source text, "
                           "airtemp real, "
                           "humidity real, "
                           "soil_moisture int, "
                           "timestamp text);")

        while threads_run:
            time.sleep(60)

            to_load_into_database = []
            with lock:
                if len(shared_list) > 0:
                    to_load_into_database = list(shared_list)
                    shared_list.clear()

            print(f"Condensing {len(to_load_into_database)} records...")
            by_source = defaultdict(list)
            record = namedtuple('SensorRecord', ['air_temp', 'air_hum', 'soil_moisture'])
            for line in to_load_into_database:
                line = line.split(';')
                mac = line[3].strip().split(" ")[1]
                by_source[mac].append(
                    record(
                        float(line[1].strip().split(" ")[1]),
                        float(line[2].strip().split(" ")[1]),
                        int(line[0].strip().split(" ")[1]),
                    )
                )

            for key in by_source:
                list_len = float(len(by_source[key]))

                avg_temp = 0
                avg_hum = 0
                avg_soil = 0

                for record in by_source[key]:
                    avg_temp += record.air_temp
                    avg_hum += record.air_hum
                    avg_soil += record.soil_moisture

                avg_temp /= list_len
                avg_hum /= list_len
                avg_soil /= list_len
                timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

                values = f" ('{key}', {avg_temp}, {avg_hum}, {avg_soil}, '{timestamp}')"
                print(f"Writing {values} to DB")
                connection.execute(f"INSERT INTO readings VALUES {values};")
            print("Sleeping...")

        print("DB writer thread exiting.")
        connection.commit()
        connection.close()

“Аааа! Это много кода! Что, черт возьми, Джейк, это шло так гладко?! “

Это отлично! Обещаю! Действительно!

Мы разбим его на куски. Давайте посмотрим на установку:

class DBWriter(threading.Thread):
    def __init__(self):
        super(DBWriter, self).__init__()
        print("Starting DB writer thread...")

    def run(self):
        global threads_run
        global lock
        global shared_list

        connection = sqlite3.connect("greenhouse.db")
        connection.execute("CREATE TABLE if not exists readings "
                           "(source text, "
                           "airtemp real, "
                           "humidity real, "
                           "soil_moisture int, "
                           "timestamp text);")

        while threads_run:
            time.sleep(60)

Итак, __init__ делает … почти ничего полезного.

В начале запуска мы глобали некоторые вещи, которыми мы делимся, и открываем соединение с базой данных, давая ему имя файла, в котором мы хотим, чтобы база данных жила.

«Подожди, Джейк», – говорите вы. «Почему бы вам не перенести это соединение в __init__ ? Разве это … Тебе известно… Инициализация? “

Вы были бы очень правы! И я попробовал это. Оказывается, SQLite3 Объекты могут использоваться только в потоке, в котором они созданы, и, технически говоря, __init__ Функция выполняется из другого потока, чем запустить Функция, так как __init__ вызывается, когда объект потока построен, но бежать называется только в честь тебя .start () нить. Итак, вы должны сделать объект после того, как потоки разошлись.

Как только у нас появится соединение с базой данных, мы позаботимся о том, чтобы таблица, которую мы собираемся использовать, существует. Если у нас нет Если нет Фраза там, мы получим ошибку SQL, когда попытаемся создать уже существующую таблицу. Затем мы предоставляем пар и типа всех столбцов, которые мы хотим в базе данных:

  • Источник, текст: это будет MAC -адрес датчика отправки. Я использую это позже, чтобы выяснить, какой датчик находится в теплице Так что я действительно могу понять входящие данные, и не должен догадаться, откуда они.
  • Airtemp, реальная: температура – это непрерывное значение, а не целое число. Особенно, как только мы в среднем с этим периодом посылал датчик, и, поскольку температура будет в Целсие, неотъемлемая ценность очень широкая и неточная по сравнению с реальным.
  • влажность, реальная: см. Предыдущий
  • SOUP_MOISTURE: Так что это немного странно, так как это процент. Технически, среднее может быть реальным, но я стал ленивым и сделал это Int. Подать в суд на меня.
  • TimeStamp, текст: мы собираемся сделать временную метку всех данных, иначе мы не узнаем, когда было принято чтение, но на самом деле нет консенсуса по поводу того, как следует хранить временные метки в базах данных. Я делаю текст, так как это самый гибкий формат.

Со всем этим настроенным мы можем ввести в нашу петлю. Эта ветка какое -то время дремало, так как нам нужно ждать, чтобы данные, которые мы собираемся агрегировать, чтобы прийти.

Теперь для мяса агрегации:

            to_load_into_database = []
            with lock:
                if len(shared_list) > 0:
                    to_load_into_database = list(shared_list)
                    shared_list.clear()

            print(f"Condensing {len(to_load_into_database)} records...")
            by_source = defaultdict(list)
            record = namedtuple('SensorRecord', ['air_temp', 'air_hum', 'soil_moisture'])
            for line in to_load_into_database:
                line = line.split(';')
                mac = line[3].strip().split(" ")[1]
                by_source[mac].append(
                    record(
                        float(line[1].strip().split(" ")[1]),
                        float(line[2].strip().split(" ")[1]),
                        int(line[0].strip().split(" ")[1]),
                    )
                )

Таким образом, мы составляем пустой список, в который мы скопируем общие данные, а затем захватываем блокировку и опустошаем общий список в наш локальный. Затем мы отпускаем блокировку, чтобы другие потоки могли написать в общий Lit.

Как только это будет сделано, мы начинаем вытаскивать данные из строки и загружать их в названную силу, что на самом деле является просто супер эффективным (и неизменным) словарем. Мы также группируем данные от датчика, который отправил их.

Теперь у нас есть данные, сгруппированные путем отправки датчика, что нам нужно агрегировать.

  for key in by_source:
                list_len = float(len(by_source[key]))

                avg_temp = 0
                avg_hum = 0
                avg_soil = 0

                for record in by_source[key]:
                    avg_temp += record.air_temp
                    avg_hum += record.air_hum
                    avg_soil += record.soil_moisture

                avg_temp /= list_len
                avg_hum /= list_len
                avg_soil /= list_len
                timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

                values = f" ('{key}', {avg_temp}, {avg_hum}, {avg_soil}, '{timestamp}')"
                print(f"Writing {values} to DB")
                connection.execute(f"INSERT INTO readings VALUES {values};")
                connection.commit()

Это в значительной степени просто математика. Мы собираем все показания датчиком, который послал их и строим метку времени в том, что по сути является форматом ISO, но без миллисекундов.

Как только это будет сделано, я строю строку, которую компонент «значения» вставки SQL с использованием замечательного синтаксиса строки Python (в Python 3.6 и выше, Yay!). Затем мы выполняем оператор INSERT и проводим изменение в базе данных.

Убедитесь, что однажды В то время как в запустить Из выхода из базы данных, которое вы закрываете соединение! Странные вещи могут случиться, если вы этого не сделаете.

Это здорово, но как выглядит главное?

def main():
    global threads_run
    global shared_list
    global threads
    threads = []

    threads_run = True

    threads.append(Overseer(threads_run))
    threads.append(DBWriter())
    threads[0].start()
    threads[1].start()

    while threads_run:

        instr = ""
        while instr != "stop":
            instr = input("enter 'stop' to stop: ")

        threads_run = False

    print("Waiting for threads...")

    for thread in threads:
        thread.join()

Мы начинаем надзирателя и DBWRITER, и мы убедитесь, что они добавлены в список потоков. Тогда мы начинаем их обоих. Надзиратель начинает принимать подключения и нерестовые слушателей, а поток dbwriter начинает агрегировать данные.

Затем мы начинаем цикл, где ждем ввода, поэтому пользователь может изящно остановить программу. Когда они это сделают, мы ждем, пока все потоки останутся с Thead.join А потом мы выходим из нашей программы.

Вот! Это мясо моего сервера! Имейте в виду, что это ни в коем случае не является «лучшим» способом сделать это, и это абсолютно ошибочно. Следите за git Repo, чтобы получить обновления!

Оригинал: “https://dev.to/bocajnotnef/receiving-data-from-esp8266-sensors-3n5e”