Рубрики
Без рубрики

Мониторинг президентских выборов в Твиттере с Python

Автор оригинала: FreeCodeCapm Team.

Romain Thalineau

Данное назад я читаю Эта хорошая статья Из Лоран Люси, где он объяснил, как он реализовал систему, которая собрала твиты, связанные с президентскими выборами в 2012 году. Статья очень хорошо написана, и я настоятельно рекомендую прочитать его.

Это дало мне идею осуществить что-то подобное для выборов 2017 года. Но я хотел добавить некоторые особенности:

  • Вместо того, чтобы использовать базу данных SQL для хранения данных, я хотел использовать базу данных графов. Основная причина заключалась в эксперименте с такой системой, но это довольно легко посмотреть, как это хорошо подходит для данных социальных сетей.
  • Я хотел иметь возможность следить за данными в режиме реального времени. Практически говоря, это означает, что данные должны быть обработаны, когда они прибывают. Это также включает в себя обслуживание анализируемых данных на веб-сайт с визуализациями данных.
  • В идеале я хотел запустить анализ настроений на твитах. Я бы тренировал алгоритм обучения и реализую его по трубопроводу по данным для обслуживания его результатов в режиме реального времени.

Ну, мне удалось построить все это. Вы можете пойти посмотреть, как это выглядит на Мой личный сайт Отказ До сих пор есть два простых анализа:

  • Первый Это анализ временной серии, который показывает численность твитов на кандидатов как функцию даты. Помимо возможности выбрать дату начала/окончания и периода, вы также можете отображать только кандидаты, которые вы хотели бы видеть, нажав на их имена в визуализации.
  • Второй анализ Отображает геолокацию твитов. Варианты относительно аналогичны первым анализе.

Для сбора данных из Twitter я использовал подход, похожий на Luster Luce. Вместо того, чтобы сосредоточиться на сходстве, я покажу вам подходы, которые я взял, это были разные.

Хранение твитов в базе данных графа

Как я уже сказал, я хотел сохранить данные в диаграмме базы данных. Я решил использовать Neo4j Отказ В базе данных графа данных моделируются с использованием комбинации узлов, ребер и структур свойств.

В нашем случае узлы могут представлять Tweet, пользователь или даже хэштег. Их можно отличить с помощью этикетки. Связь между узлами обрабатывается путем соединения их по краям. Например, пользовательский узел может быть подключен к узлу Tweet через отношения с постами.

Отношения направленные. Tweet не может опубликовать пользователя, но он может упомянуть пользователя.

Наконец, узлы и края (отношения) могут удерживать свойства. Например, у пользователя есть имя, а Tweet имеет текст.

При взаимодействии с графовой базой данных, график объекта Mapper (OGM) особенно полезен. В этом проекте я использовал Неомодель Отказ Он открывает API относительно аналогично API моделей Django. Вы определяете свои модели, такие как:

Как вы можете видеть, как имущество, и отношения определены. Я приглашаю, проверяя файл моделей в мой github repo Чтобы увидеть полное определение модели данных.

Neo4j – это база данных NoSQL, она использует язык запросов не SQL, называемый Cypher. Это довольно простой язык. Например, следующий запрос вернет все твиты, размещенные пользователем, содержащим слово «FILLON» (один из кандидатов):

MATCH (u:User)-[:POSTS]->(t:Tweet) WHERE t.text contains "fillon" return t

Неомодель – это OGM, он предоставляет API, поэтому вам не нужно писать очень много запросов вручную. Вы можете получить те же результаты, что и выше, работая:

Tweet.nodes.filter(text__contains="fillon")

Потоковое из Twitter

Twitter предоставляет два способа получить их данные. Первый – через стандартный API отдыха. Каждый доступ конечной точки ограничен, поэтому в нашем случае не является предпочтительным решением.

К счастью, Twitter также предоставляет потоковую API. Установив фильтр, мы можем получить все твиты, которые проходят этот фильтр (с ограничением 1% от общего объема твитов, опубликованных в моменте T). Библиотека Tweepy облегчает этот процесс.

Как вы можете видеть в мой репо , вам нужно определить класс слушателя, который будет вызвать некоторые действия во время потоковой передачи. Например, метод «on_Status» называется в любое время в Tweet Tooded.

Кроме того, я определил класс потоковой передачи, обязанности которого должны аутентифицироваться в Twitter, для создания Tweepy Stream с вышеупомянутым прослушивателем, а также раскрыть метод для начала потоковой передачи. Метод «start_streaming» принимает аргумент «TO_TRACK», который является списком слов, на которых вы хотите фильтровать.

Вы должны создать экземпляр потокового класса с кучей аргументов. Помимо полномочий API Twitter, вам нужен «конвейер» и аргументы «Batch_size». Последнее число, указав количество твитов, которые обрабатываются одновременно.

Поскольку обработка Tweet включает в себя сохранение его в Neo4J, делая его одним за другим, очень дорогостоящая операция. Сохранение их партиями 100 (или даже больше в некоторых случаях) значительно улучшает производительность.

Аргумент «Трубопровод» должен быть ссылкой на функцию, которая получит пакет твитов. Внутри этого вы можете делать все, что вы хотите. Я привел пример этого в Utils.py модуль.

Как видите, эта функция обращается к асинхронной задаче сельдерея, определенной в Tasks.py модуль. Сельдерей это библиотека очереди распределенного задач Python. Я использовал это с Rabbitmq как сообщение брокера. Итак, как это работает? Давайте вернемся к функции «Streaming_Pipeline» в Utils.py Модуль и сосредоточиться на этой строке:

bulk_parsing.delay(users_attributes, tweets_attributes)

Когда эта строка обрабатывается, вместо обработки функции «Bulk_Parsing» синхронно, сообщение будет опубликовано сообщение для брокера (здесь roblitmq). Это позволяет потребителям (работникам) получать эти сообщения, и, следовательно, для обработки задачи «Bulk_Parsing» асинхронно и параллельно. Почему это? Потому что он позволяет горизонтальным масштабированием Tweet Processing. Если сообщения накапливаются быстрее, чем вы можете их обработать, вы можете добавить больше работников, чтобы помочь их потреблять.

Одно окончательное замечание. Я хотел, чтобы процесс был максимально универсальным, в том смысле, что если обработка должна быть изменена – или если что-то нужно было добавить – это должно быть легко сделать это. В этом случае я могу просто изменить функцию «Streaming_Pipeline» и добавить некоторые асинхронные задачи. Это быстро и легко изменить.

Спасибо за прочтение!

  • Обязательно проверите код В моем Github Repo Отказ
  • Вы можете увидеть все это в действии На моем сайте Где я использовал это, чтобы накормить некоторый анализ.