What is data engineering?
In comes the data engineer
- 데이터가 흩어져 있음
- 데이터베이스가 어플리케이션에 최적화 되어 있고 분석에 최적화되어 있지 않음
- 레가시 코드들이 데이터를 손상시킴
What is parallel
- 메모리의 프로세싱 파워의 이득
- 아이디어: task를 subtask로 나누어 여러 컴퓨터에 분산시킨다.
병렬처리의 리스크
- 통신 오버헤드
- task가 너무 작을 때
- 프로세싱 유닛이 적을 때
- 이 오버헤드로 인해 프로세싱 유닛이 늘어나도 속도가 선형으로 증가하지 않는다(pararell slowdown)
병렬 처리 구현방법
multiprocessing.Pool
: 동일한 컴퓨터의 여러 코어에 작업을 분산처리
from multiprocessing import Pool
import pandas as pd
def take_mean_age(year_and_group):
year, group = year_and_group
return pd.DataFrame({'Age': group['Age'].mean()}, index=[year])
with Pool(4) as p:
result = p.map(take_mean_age, athlete_events.groupby('Year'))
result_df = pd.concat(result)
dask
: 추상화 제공 프레임워크
import dask.dataframe as dd
# partition dataframe into 4
athelte_events_dask = dd.from_pandas(athlete_events, npartitions=4)
result_df = athlete_events_dask.groupby('Year').Age.mean().compute()
Parallel computation frameworks
Hadoop
- HDFS: 분산 파일 시스템
- MapReduce: 빅데이터 처리 시스템
Hive
- 하둡 에코시스템의 최상위 계층
- SQL의 변형인 Hive SQL을 사용하여 구조화된 방식으로 쿼리를 할 수 있다.
- 다른 데이터 처리 툴과 통합됨
Hive: an example
- 일반 SQL 쿼리와 비슷하지만 분산처리로 작동한다
SELECT year, AVG(age)
FROM views.athlete_events
GROUP BY year
Spark
- 맵리듀스는 job 사이에 디스크를 많이 쓰지만, Spark는 가능한 한 메모리에서 작동한다.
- 맵리듀스의 단점 보완
Resilient distributed datasets(RDD)
- spark에서 사용
- 여러개의 노드에 분산되어 저장하는 데이터
- 데이터프레임과 달리 컬럼에 이름이 없음
- 튜플의 리스트와 비슷함
- Operation
- Transformation:
.map()
or.filter()
-> RDD 리턴 - Action:
.count()
or.first()
-> single result
- Transformation:
PySpark
- 스파크의 프로그래밍 언어 인터페이스
- Python interface
- pandas와 유사함
PySpark: an example
(athlete_events_spark
.groupBy('Year')
.mean('Age')
.show())
Workflow scheduling frameworks
An example pipeline
- 어떻게 스케줄링?
- 매일 수동으로: 확장하기 어려움
- cron: dependency 처리가 어려움(작업의 순서...)
DAGs(Directed Acyclic Graph)
- Set of nodes
- Directed edges
- No cycles
The tools for the job
- Linux's cron
- spotify's Luigi
- Airflow
Airflow: an example
# Create the DAG object
dag = DAG(dag_id='example_dag', ..., schedule_interval="0 * * * *")
# Define operations
start_cluster = StartClusterOperator(task_id='start_cluster', dag=dag)
ingest_customer_data = SparkJobOperator(task_id='ingest_customer_data', dag=dag)
ingest_product_data = SparkJobOperator(task_id='ingest_product_data', dag=dag)
enrich_customer_data = PythonOperator(task_id='enrich_customer_data', ..., dag=dag)
# Set up dependecy flow
start_cluster.set_downstream(ingest_customer_data)
ingest_customer_data.set_downstream(enrich_customer_data)
ingest_product_data.set_downstream(enrich_customer_data)
Extract
- 의미: 데이터 스토리지에서 데이터를 추출하는 것
Extract from text files
- unstructured
- flat file은 row와 column으로 이루어져 있다.(ex. csv, tsv)
JSON
- JavaScript Object Notation
- semi-structured
- number, string, boolean, null
- array, object
- python의 딕셔너리와 비슷, 매핑이 잘 됨(json 패키지)
- 많은 웹서비스가 이 형태로 데이터를 전달함
Data on the Web
request
- ex. Browss to Google
- Request to Google server
- Google responds with Web page
APIs
- 데이터를 JSON 포맷으로 보냄
- API: application programming interface
database
- 기존 애플리케이션 데이터베이스에서 추출
- 많은 Transaction(행 삽입)
- OLTP
- row-oriented
- Analytical database
- OLAP
- Column-oriented
- 기존 애플리케이션 데이터베이스에서 추출
connection string/uri
postgresql://[user[:password]@][host][:port]
import sqlalchemy connection_uri = 'postgresql://repl:password@localhost:5432/pagila' db_engine = sqlalchemy.create_engine(connection_uri) import pandas as pd pd.read_sql('SELECT * FROM customer', db_engine)
Transform
- selection of attribute (ex. email)
- Transaltion of code values (ex. New York -> NY)
- Data validation (ex. date input in 'created_at')
- Splitting columns into multiple columns(email -> user, domain)
- Joining from multiple sources
pyspark
import pyspark.sql
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.read.jdbc('jdbc:postgresql://localhost:5432/pagila',
properties={'user':'repl', 'password':'password'})
# jdbc: Spark가 여러 관계형 데이터베이스에 연결하는 데 도움이 되는 소프트웨어
example: join
customer_df # pyspark dataframe with customer data
rating_df # pyspark dataframe with rating data
# Groupby ratings
ratings_per_customer = ratings_df.groupBy('customer_id').mean('rating')
# Join on Customer ID
customer_df.join(
rating_per_customer,
customer_df.customer_id=ratings_per_customer.customer_id)
Loading
Analytics or applications databases
- Analytics
- Aggregate queries
- online analysis processing(OLAP)
- column-oriented
- columns subset에 대해 쿼리함
- 병렬화에 적합
- Applications
- lots of transaction
- online transaction processing(OLTP)
- row-oriented
- 레코드를 넣고 빼는 것이 쉽고 빠름
MPP Databases
- Massively parallel processing database
- 쿼리가 하위작업으로 분리되어 여러 노드로 분산됨
example: Redshift
# Pandas .to_parquet() method
df.to_parquet('./s3://path/to/bucket/customer.parquet')
# pyspark .write.parquet() method
df.write.parquet('./s3://path/to/bucket/customer.parquet')
COPY customer
FROM 's3://path/to/bucket/customer.parquet'
FORMAT as parquet
Load to PostgrSQL
# Transformation on data
recommendations = transform_find_recommecdatins(ratings_df)
# Load into PostgreSQL database
recommendations.to_sql('recommendations',
db_engine,
schema='store',
if_exists='replace')
Putting it all together
- ETL 과정을 하나의 함수로 초기화
def extract_table_to_df(tablename, db_engine):
return pd.read_sql('SELECT * FROM {}'.format(tablename), db_engine)
def split_columns_transform(df, column, pat, suffixes):
# Converts column into str and splits it on pat
def load_df_into_dwh(film_df, tablename, schema, db_engine):
return pd.to_sql(tablename, db_engine, schema=schema, if_exists='replace')
db_engine = {...}
def etl():
film_df = extract_table_to_df('film', db_engine['store'])
film_df = split_columns_transform(film_df, 'rental_rate', '.', ['_dollar', '_cents'])
load_df_into_dwh(film_df, 'film', 'store', db_engine['dwh'])
Airflow refresher
- python으로 작성된 워크플로우 스케줄러
- 비선형 그래프 작성 가능
- DAGs
scheduling with DAGs in Airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(dag_id='etl_pipeline',
...,
schedule_interval='0 0 * * *') # cron 표현식
etl_task = pythonOperator(task_id='etl_task',
python_callable=etl,
dag=dag)
etl_task.set_upstream(wait_for_this_task)
# wait_for_this_task가 끝난 후 실행
- cron 표현식: 분, 시간, 일, 월, 요일
From rating to recommendation
Our recommendation transform
- 사용자가 대부분 높게 평가한 기술의 코스를 추천
- 이미 평가 한 코스를 추천하지 않음
- 가장 높은 등급의 코스 3개를 추천
Scheduling daily jobs
The loading phase
# pandas dataframe to sql
recommendations.to_sql(
"recommendations",
db_engine,
if_exists="append")
전체 ETL
def etl(db_engines):
# Extract the data
course = extract_course_data(db_engines)
rating = extract_rating_data(db_engines)
# Clean up courses data
courses = transform_fill_programming_language(courses)
# Get the average course rating
avg_course_rating = transform_avg_rating(rating)
courses_to_recommend = transform_courses_to_recommend(
rating,
courses,
)
# Calculate the recommendation
recommendations = transform_recommendations(
avg_course_rating,
course_to_recommend,
)
# Load the recommendations into the database
load_to_dwh(recommendations, db_engine)
Creating the DAG
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(dag_id='recommendations',
scheduled_interval='0 0 * * *')
task_recommendations = PythonOperator(
task_id='recommendations_task',
python_callable=etl)
'Data Science > [DSF] Data engineering' 카테고리의 다른 글
Introduction to flat file (0) | 2022.05.15 |
---|---|
Data engineering for everyone (0) | 2022.04.10 |