Создаем конвейер потоковой обработки данных. Часть 2

Всем привет. Делимся переводом заключительной части статьи, подготовленной специально для студентов курса «Data Engineer». С первой частью можно ознакомиться тут.

Apache Beam и DataFlow для конвейеров реального времени

Настройка Google Cloud

Примечание: Для запуска конвейера и публикации данных пользовательского лога я использовал Google Cloud Shell, поскольку у меня возникли проблемы с запуском конвейера на Python 3. Google Cloud Shell использует Python 2, который лучше согласуется с Apache Beam.

Чтобы запустить конвейер, нам нужно немного покопаться в настройках. Тем из вас, кто раньше не пользовался GCP, необходимо выполнить следующие 6 шагов, приведенных на этой странице.

После этого нам нужно будет загрузить наши скрипты в облачное хранилище Google и скопировать их в нашу Google Cloud Shel. Загрузка в облачное хранилище достаточно тривиальна (описание можно найти здесь). Чтобы скопировать наши файлы, мы можем открыть Google Cloud Shel из панели инструментов, щелкнув первый значок слева на рисунке 2 ниже.


Рисунок 2

Команды, которые нам нужны для копирования файлов и установки необходимых библиотек, перечислены ниже.

# Copy file from cloud storage gsutil cp gs://<YOUR-BUCKET>/ * . sudo pip install apache-beam[gcp] oauth2client==3.0.0 sudo pip install -U pip sudo pip install Faker==1.0.2 # Environment variables BUCKET=<YOUR-BUCKET> PROJECT=<YOUR-PROJECT>

Создание нашей базы данных и таблицы

После того, как мы выполнили все шаги, связанные с настройкой, следующее, что нам нужно сделать, это создать набор данных и таблицу в BigQuery. Есть несколько способов сделать это, но самый простой — использовать консоль Google Cloud, сначала создав набор данных. Вы можете выполнить действия, указанные по следующей ссылке, чтобы создать таблицу со схемой. Наша таблица будет иметь 7 столбцов, соответствующих компонентам каждого пользовательского лога. Для удобства мы определим все столбцы как строки (тип string), за исключением переменной timelocal, и назовем их в соответствии с переменными, которые мы сгенерировали ранее. Схема нашей таблицы должна выглядеть как на рисунке 3.


Рисунок 3. Схема таблицы

Публикация данных пользовательского лога

Pub/Sub является критически важным компонентом нашего конвейера, поскольку позволяет нескольким независимым приложениям взаимодействовать друг с другом. В частности, он работает как посредник, позволяющий нам отправлять и получать сообщения между приложениями. Первое, что нам нужно сделать, это создать тему (topic). Достаточно просто перейти в Pub/Sub в консоли и нажать CREATE TOPIC.

Приведенный ниже код вызывает наш скрипт для генерации данных лога, определенных выше, а затем подключается и отправляет журналы в Pub/Sub. Единственное, что нам нужно сделать, — это создать объект PublisherClient, указать путь к теме с помощью метода topic_path и вызвать функцию publish с topic_path и данными. Обратите внимание, что мы импортируем generate_log_line из нашего скрипта stream_logs, поэтому убедитесь, что эти файлы находятся в одной папке, иначе вы получите ошибку импорта. Затем мы можем запустить это через нашу google-консоль, используя:

python publish.py

from stream_logs import generate_log_line import logging from google.cloud import pubsub_v1 import random import time   PROJECT_ID="user-logs-237110" TOPIC = "userlogs"   publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(PROJECT_ID, TOPIC)  def publish(publisher, topic, message):     data = message.encode('utf-8')     return publisher.publish(topic_path, data = data)  def callback(message_future):     # When timeout is unspecified, the exception method waits indefinitely.     if message_future.exception(timeout=30):         print('Publishing message on {} threw an Exception {}.'.format(             topic_name, message_future.exception()))     else:         print(message_future.result())   if __name__ == '__main__':      while True:         line = generate_log_line()         print(line)         message_future = publish(publisher, topic_path, line)         message_future.add_done_callback(callback)          sleep_time = random.choice(range(1, 3, 1))         time.sleep(sleep_time)

Как только файл запустится, мы сможем наблюдать вывод данных лога на консоль, как показано на рисунке ниже. Этот скрипт будет работать до тех пор, пока мы не используем CTRL+C, чтобы завершить его.


Рисунок 4. Вывод publish_logs.py

Написание кода нашего конвейера

Теперь, когда мы все подготовили, мы можем приступить к самой интересной части — написанию кода нашего конвейера, используя Beam и Python. Чтобы создать Beam-конвейер, нам нужно создать объект конвейера (p). После того как мы создали объект конвейера, мы можем применить несколько функций одну за другой, используя оператор pipe (|). В общем, рабочий процесс выглядит как на рисунке ниже.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]              | [Second Transform]              | [Third Transform])

В нашем коде мы создадим две пользовательские функции. Функцию regex_clean, которая сканирует данные и извлекает соответствующую строку на основе списка PATTERNS, используя функцию re.search. Функция возвращает разделенную запятыми строку. Если вы не являетесь экспертом по регулярным выражениям, я рекомендую ознакомится с этим туториалом и попрактиковаться в блокноте, чтобы проверить код. После этого мы определяем пользовательскую ParDo-функцию под названием Split, которая является вариацией Beam-преобразования для параллельной обработки. В Python это делается особым способом — мы должны создать класс, который наследуется от класса DoFn Beam. Функция Split принимает распаршенную строку из предыдущей функции и возвращает список словарей с ключами, соответствующими именам столбцов в нашей таблице BigQuery. Есть кое-что, что следует отметить про эту функцию: мне пришлось импортировать datetime внутри функции, чтобы она работала. Я получал сообщение об ошибке при импорте в начале файла, что было странно. Этот список затем передается в функцию WriteToBigQuery, которая просто добавляет наши данные в таблицу. Код для Batch DataFlow Job и Streaming DataFlow Job приведен ниже. Единственное отличие между пакетным и потоковым кодом заключается в том, что в пакетной обработке мы читаем CSV из src_path, используя функцию ReadFromText из Beam.

Batch DataFlow Job (обработка пакетов)

import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from google.cloud import bigquery import re import logging import sys  PROJECT='user-logs-237110' schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'   src_path = "user_log_fileC.txt"  def regex_clean(data):      PATTERNS =  [r'(^\S+\.[\S+\.]+\S+)\s',r'(?<=\[).+?(?=\])',            r'\"(\S+)\s(\S+)\s*(\S*)\"',r'\s(\d+)\s',r"(?<=\[).\d+(?=\])",            r'\"[A-Z][a-z]+', r'\"(http|https)://[a-z]+.[a-z]+.[a-z]+']     result = []     for match in PATTERNS:       try:         reg_match = re.search(match, data).group()         if reg_match:           result.append(reg_match)         else:           result.append(" ")       except:         print("There was an error with the regex search")     result = [x.strip() for x in result]     result = [x.replace('"', "") for x in result]     res = ','.join(result)     return res   class Split(beam.DoFn):      def process(self, element):         from datetime import datetime         element = element.split(",")         d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")         date_string = d.strftime("%Y-%m-%d %H:%M:%S")          return [{              'remote_addr': element[0],             'timelocal': date_string,             'request_type': element[2],             'status': element[3],             'body_bytes_sent': element[4],             'http_referer': element[5],             'http_user_agent': element[6]              }]  def main():     p = beam.Pipeline(options=PipelineOptions())     (p       | 'ReadData' >> beam.io.textio.ReadFromText(src_path)       | "clean address" >> beam.Map(regex_clean)       | 'ParseCSV' >> beam.ParDo(Split())       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,         write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)    )     p.run()  if __name__ == '__main__':   logger = logging.getLogger().setLevel(logging.INFO)   main()

Streaming DataFlow Job (обработка потока)

from apache_beam.options.pipeline_options import PipelineOptions from google.cloud import pubsub_v1 from google.cloud import bigquery import apache_beam as beam import logging import argparse import sys import re   PROJECT="user-logs-237110" schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING' TOPIC = "projects/user-logs-237110/topics/userlogs"   def regex_clean(data):      PATTERNS =  [r'(^\S+\.[\S+\.]+\S+)\s',r'(?<=\[).+?(?=\])',            r'\"(\S+)\s(\S+)\s*(\S*)\"',r'\s(\d+)\s',r"(?<=\[).\d+(?=\])",            r'\"[A-Z][a-z]+', r'\"(http|https)://[a-z]+.[a-z]+.[a-z]+']     result = []     for match in PATTERNS:       try:         reg_match = re.search(match, data).group()         if reg_match:           result.append(reg_match)         else:           result.append(" ")       except:         print("There was an error with the regex search")     result = [x.strip() for x in result]     result = [x.replace('"', "") for x in result]     res = ','.join(result)     return res   class Split(beam.DoFn):      def process(self, element):         from datetime import datetime         element = element.split(",")         d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")         date_string = d.strftime("%Y-%m-%d %H:%M:%S")                  return [{              'remote_addr': element[0],             'timelocal': date_string,             'request_type': element[2],             'body_bytes_sent': element[3],             'status': element[4],             'http_referer': element[5],             'http_user_agent': element[6]              }]  def main(argv=None):     parser = argparse.ArgumentParser()    parser.add_argument("--input_topic")    parser.add_argument("--output")    known_args = parser.parse_known_args(argv)      p = beam.Pipeline(options=PipelineOptions())     (p       | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)       | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))       | "Clean Data" >> beam.Map(regex_clean)       | 'ParseCSV' >> beam.ParDo(Split())       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,         write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)    )    result = p.run()    result.wait_until_finish()  if __name__ == '__main__':   logger = logging.getLogger().setLevel(logging.INFO)   main() 

Запуск конвейера

Мы можем запустить конвейер несколькими различными способами. Если бы мы захотели, мы могли бы просто запустить его локально с терминала, удаленно войдя в GCP.

python -m main_pipeline_stream.py \  --input_topic "projects/user-logs-237110/topics/userlogs" \  --streaming

Однако мы собираемся запустить его с помощью DataFlow. Мы можем сделать это с помощью нижеприведенной команды, установив следующие обязательные параметры.

  • project — ID вашего проекта GCP.
  • runner — средство запуска конвейера, которое проанализирует вашу программу и сконструирует ваш конвейер. Для выполнения в облаке вы должны указать DataflowRunner.
  • staging_location — путь к облачному хранилищу Cloud Dataflow для индексировани пакетов кода, необходимых обработчикам, выполняющим работу.
  • temp_location — путь к облачному хранилищу Cloud Dataflow для размещения временных файлов заданий, созданных во время работы конвейера.
  • streaming

python main_pipeline_stream.py \ --runner DataFlow \ --project $PROJECT \ --temp_location $BUCKET/tmp \ --staging_location $BUCKET/staging --streaming 

Пока эта команда выполняется, мы можем перейти на вкладку DataFlow в google-консоли и просмотреть наш конвейер. Кликнув по конвейеру, мы должны увидеть что-то похожее на рисунок 4. В целях отладки может быть очень полезно перейти в логи, а затем в Stackdriver для просмотра подробных логов. Это помогло мне разрешить проблемы с конвейером в ряде случаев.


Рисунок 4: Beam-конвейер

Доступ к нашим данным в BigQuery

Итак, у нас уже должен быть запущен конвейер с данными, поступающими в нашу таблицу. Чтобы проверить это, мы можем перейти к BigQuery и просмотреть данные. После использования команды ниже вы должны увидеть первые несколько строк набора данных. Теперь, когда у нас есть данные, хранящиеся в BigQuery, мы можем провести дальнейший анализ, а также поделиться данными с коллегами и начать отвечать на бизнес-вопросы.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;


Рисунок 5: BigQuery

Заключение

Надеемся, что этот пост послужит полезным примером создания потокового конвейера данных, а также поиска способов сделать данные более доступными. Хранение данных в таком формате дает нам много преимуществ. Теперь мы можем начать отвечать на важные вопросы, например, сколько людей используют наш продукт? Растет ли со временем база пользователей? С какими аспектами продукта люди взаимодействуют больше всего? И есть ли ошибки, там где их быть не должно? Это те вопросы, которые будут интересны для организации. На основе идей, вытекающих из ответов на эти вопросы, мы сможем усовершенствовать продукт и повысить заинтересованность пользователей.

Beam действительно полезен для такого типа упражнений, а также имеет ряд других интересных случаев использования. Например, вы можете анализировать данные по биржевым тикам в режиме реального времени и совершать сделки на основе анализа, возможно, у вас есть данные датчиков, поступающие с транспортных средств, и вы хотите вычислить расчет уровня трафика. Вы также можете, например, быть игровой компанией, собирающей данные о пользователях и использующей ее для создания информационных панелей для отслеживания ключевых показателей. Ладно, господа, это тема уже для другого поста, спасибо за чтение, а для тех, кто хочет увидеть полный код, ниже ссылка на мой GitHub.

https://github.com/DFoly/User_log_pipeline

На этом все. Читать первую часть.

FavoriteLoadingДобавить в избранное
Posted in Без рубрики

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *