What is data engineering?

In comes the data engineer

  • 데이터가 흩어져 있음
  • 데이터베이스가 어플리케이션에 최적화 되어 있고 분석에 최적화되어 있지 않음
  • 레가시 코드들이 데이터를 손상시킴

What is parallel

  • 메모리의 프로세싱 파워의 이득
  • 아이디어: task를 subtask로 나누어 여러 컴퓨터에 분산시킨다.

병렬처리의 리스크

  • 통신 오버헤드
    • task가 너무 작을 때
    • 프로세싱 유닛이 적을 때
    • 이 오버헤드로 인해 프로세싱 유닛이 늘어나도 속도가 선형으로 증가하지 않는다(pararell slowdown)

병렬 처리 구현방법

multiprocessing.Pool: 동일한 컴퓨터의 여러 코어에 작업을 분산처리

from multiprocessing import Pool
import pandas as pd

def take_mean_age(year_and_group):
    year, group = year_and_group
    return pd.DataFrame({'Age': group['Age'].mean()}, index=[year])

with Pool(4) as p:
    result = p.map(take_mean_age, athlete_events.groupby('Year'))

result_df = pd.concat(result)

dask: 추상화 제공 프레임워크

import dask.dataframe as dd

# partition dataframe into 4
athelte_events_dask = dd.from_pandas(athlete_events, npartitions=4)

result_df = athlete_events_dask.groupby('Year').Age.mean().compute()

Parallel computation frameworks

Hadoop

  • HDFS: 분산 파일 시스템
  • MapReduce: 빅데이터 처리 시스템

Hive

  • 하둡 에코시스템의 최상위 계층
  • SQL의 변형인 Hive SQL을 사용하여 구조화된 방식으로 쿼리를 할 수 있다.
  • 다른 데이터 처리 툴과 통합됨

Hive: an example

  • 일반 SQL 쿼리와 비슷하지만 분산처리로 작동한다
SELECT year, AVG(age)
FROM views.athlete_events
GROUP BY year 

Spark

  • 맵리듀스는 job 사이에 디스크를 많이 쓰지만, Spark는 가능한 한 메모리에서 작동한다.
  • 맵리듀스의 단점 보완

Resilient distributed datasets(RDD)

  • spark에서 사용
  • 여러개의 노드에 분산되어 저장하는 데이터
  • 데이터프레임과 달리 컬럼에 이름이 없음
  • 튜플의 리스트와 비슷함
  • Operation
    • Transformation: .map() or .filter() -> RDD 리턴
    • Action: .count() or .first() -> single result

PySpark

  • 스파크의 프로그래밍 언어 인터페이스
  • Python interface
  • pandas와 유사함

PySpark: an example

(athlete_events_spark
    .groupBy('Year')
    .mean('Age')
    .show())

Workflow scheduling frameworks

An example pipeline

  • 어떻게 스케줄링?
    • 매일 수동으로: 확장하기 어려움
    • cron: dependency 처리가 어려움(작업의 순서...)

DAGs(Directed Acyclic Graph)

  • Set of nodes
  • Directed edges
  • No cycles

The tools for the job

  • Linux's cron
  • spotify's Luigi
  • Airflow

Airflow: an example

# Create the DAG object
dag = DAG(dag_id='example_dag', ..., schedule_interval="0 * * * *")

# Define operations
start_cluster = StartClusterOperator(task_id='start_cluster', dag=dag)
ingest_customer_data = SparkJobOperator(task_id='ingest_customer_data', dag=dag)
ingest_product_data = SparkJobOperator(task_id='ingest_product_data', dag=dag)
enrich_customer_data = PythonOperator(task_id='enrich_customer_data', ..., dag=dag)

# Set up dependecy flow
start_cluster.set_downstream(ingest_customer_data)
ingest_customer_data.set_downstream(enrich_customer_data)
ingest_product_data.set_downstream(enrich_customer_data)

Extract

  • 의미: 데이터 스토리지에서 데이터를 추출하는 것

Extract from text files

  • unstructured
  • flat file은 row와 column으로 이루어져 있다.(ex. csv, tsv)

JSON

  • JavaScript Object Notation
  • semi-structured
  • number, string, boolean, null
  • array, object
  • python의 딕셔너리와 비슷, 매핑이 잘 됨(json 패키지)
  • 많은 웹서비스가 이 형태로 데이터를 전달함

Data on the Web

  • request

    • ex. Browss to Google
    • Request to Google server
    • Google responds with Web page
  • APIs

    • 데이터를 JSON 포맷으로 보냄
    • API: application programming interface
  • database

    • 기존 애플리케이션 데이터베이스에서 추출
      • 많은 Transaction(행 삽입)
      • OLTP
      • row-oriented
    • Analytical database
      • OLAP
      • Column-oriented
  • connection string/uri

    • postgresql://[user[:password]@][host][:port]

    • import sqlalchemy
      connection_uri = 'postgresql://repl:password@localhost:5432/pagila'
      db_engine = sqlalchemy.create_engine(connection_uri)
      
      import pandas as pd
      pd.read_sql('SELECT * FROM customer', db_engine)

Transform

  • selection of attribute (ex. email)
  • Transaltion of code values (ex. New York -> NY)
  • Data validation (ex. date input in 'created_at')
  • Splitting columns into multiple columns(email -> user, domain)
  • Joining from multiple sources

pyspark

import pyspark.sql

spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.read.jdbc('jdbc:postgresql://localhost:5432/pagila',
               properties={'user':'repl', 'password':'password'})

# jdbc: Spark가 여러 관계형 데이터베이스에 연결하는 데 도움이 되는 소프트웨어

example: join

customer_df # pyspark dataframe with customer data
rating_df # pyspark dataframe with rating data

# Groupby ratings
ratings_per_customer = ratings_df.groupBy('customer_id').mean('rating')

# Join on Customer ID
customer_df.join(
rating_per_customer,
customer_df.customer_id=ratings_per_customer.customer_id)

Loading

Analytics or applications databases

  • Analytics
    • Aggregate queries
    • online analysis processing(OLAP)
    • column-oriented
    • columns subset에 대해 쿼리함
    • 병렬화에 적합
  • Applications
    • lots of transaction
    • online transaction processing(OLTP)
    • row-oriented
    • 레코드를 넣고 빼는 것이 쉽고 빠름

MPP Databases

  • Massively parallel processing database
  • 쿼리가 하위작업으로 분리되어 여러 노드로 분산됨

example: Redshift

# Pandas .to_parquet() method
df.to_parquet('./s3://path/to/bucket/customer.parquet')
# pyspark .write.parquet() method
df.write.parquet('./s3://path/to/bucket/customer.parquet')
COPY customer
FROM 's3://path/to/bucket/customer.parquet'
FORMAT as parquet

Load to PostgrSQL

# Transformation on data
recommendations = transform_find_recommecdatins(ratings_df)

# Load into PostgreSQL database
recommendations.to_sql('recommendations',
                      db_engine,
                      schema='store',
                      if_exists='replace')

Putting it all together

  • ETL 과정을 하나의 함수로 초기화
def extract_table_to_df(tablename, db_engine):
    return pd.read_sql('SELECT * FROM {}'.format(tablename), db_engine)

def split_columns_transform(df, column, pat, suffixes):
    # Converts column into str and splits it on pat

def load_df_into_dwh(film_df, tablename, schema, db_engine):
    return pd.to_sql(tablename, db_engine, schema=schema, if_exists='replace')

db_engine = {...}

def etl():
    film_df = extract_table_to_df('film', db_engine['store'])
    film_df = split_columns_transform(film_df, 'rental_rate', '.', ['_dollar', '_cents'])
    load_df_into_dwh(film_df, 'film', 'store', db_engine['dwh'])

Airflow refresher

  • python으로 작성된 워크플로우 스케줄러
  • 비선형 그래프 작성 가능
  • DAGs

scheduling with DAGs in Airflow

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(dag_id='etl_pipeline',
         ...,
         schedule_interval='0 0 * * *') # cron 표현식

etl_task = pythonOperator(task_id='etl_task',
                         python_callable=etl,
                         dag=dag)

etl_task.set_upstream(wait_for_this_task)
# wait_for_this_task가 끝난 후 실행
  • cron 표현식: 분, 시간, 일, 월, 요일

From rating to recommendation

Our recommendation transform

  • 사용자가 대부분 높게 평가한 기술의 코스를 추천
  • 이미 평가 한 코스를 추천하지 않음
  • 가장 높은 등급의 코스 3개를 추천

Scheduling daily jobs

The loading phase

# pandas dataframe to sql
recommendations.to_sql(
"recommendations",
db_engine,
if_exists="append")

전체 ETL

def etl(db_engines):
    # Extract the data
    course = extract_course_data(db_engines)
    rating = extract_rating_data(db_engines)

    # Clean up courses data
    courses = transform_fill_programming_language(courses)

    # Get the average course rating
    avg_course_rating = transform_avg_rating(rating)
    courses_to_recommend = transform_courses_to_recommend(
    rating,
    courses,
    )

    # Calculate the recommendation
    recommendations = transform_recommendations(
    avg_course_rating,
    course_to_recommend,
    )

    # Load the recommendations into the database
    load_to_dwh(recommendations, db_engine)

Creating the DAG

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(dag_id='recommendations',
         scheduled_interval='0 0 * * *')

task_recommendations = PythonOperator(
task_id='recommendations_task',
python_callable=etl)

'Data Science > [DSF] Data engineering' 카테고리의 다른 글

Introduction to flat file  (0) 2022.05.15
Data engineering for everyone  (0) 2022.04.10

+ Recent posts