dbt + BigQuery 데이터 모델링
in Data Engineering on dbt
- 이 글은 BigQuery 환경에서 dbt 사용법에 대해 작성한 글입니다
- 예상 독자
- BigQuery에서 dbt를 사용하는 방법에 궁금하신 분
- 앞선 dbt 사용법과 기본 개념 글을 읽으신 분
- 키워드 : dbt BigQuery, dbt modeling
dbt + BigQuery를 위한 환경 설정
dbt bigquery 설치
- dbt-core는 설치했다고 가정하고 dbt-bigquery를 설치
pip3 install dbt-bigquery
dbt init
dbt init newyork_taxi
- 위 명령어를 입력하면 DB를 선택하는 부분이 나옴. BigQuery로 설정
- 만약 Runtime Error(No adapters available)이 발생하면 위에 dbt-bigquery를 설치해야 함(DB와 연결하는 라이브러리가 설치되어 있지 않다는 의미)
- 권한 설정을 어떻게 할지 물어보는 질문도 있는데, oauth와 service_account가 존재
- 여기선 oauth로 진행. 로컬 개발할 때는 oauth도 추천
- 회사였으면 service_account로 설정
- dbt로 만든 것을 주기적으로 실행할 때는 service_account를 발급해서 GitHub Actions나 Airflow에 설정해줘야 하지만, 지금은 로컬 개발에 집중
- 프로젝트 ID, 데이터셋, threads, timeout seconds, region을 입력
~/.dbt/profiles.yml
을 확인하면, 방금 입력한 내용을 확인할 수 있음
cat ~/.dbt/profiles.yml
newyork_taxi:
outputs:
dev:
dataset: newyork_taxi
job_execution_timeout_seconds: 300
job_retries: 1
location: US
method: oauth
priority: interactive
project: zzsza-github-io
threads: 4
type: bigquery
target: dev
- 프로필 파일을 dbt 작업 프로젝트로 옮기려면 아래 명령어 실행(홈 디렉토리 안에 프로필 파일이 있는데, 회사에서 작업한다면 이런 프로필을 GitHub Repository에 넣고 Push)
- dbt_project.yml : 프로젝트 설정 파일. 프로젝트 구조, 리소스 설정, 변수 등
- profiles.yml : 프로필 설정 파일. 데이터베이스 연결 정보 등이 저장됨
cp ~/.dbt/profiles.yml .
gcloud를 사용해 oauth
- 터미널에서 아래 명령어 입력(gcloud 사전에 설치되어야 함)
- gcloud 설치 방법은 Docs 참고
gcloud auth application-default login \
--scopes=https://www.googleapis.com/auth/bigquery,\
https://www.googleapis.com/auth/drive.readonly,\
https://www.googleapis.com/auth/iam.test
dbt run
- 샘플 모델을 실행
models/example
폴더에 있는 my_first_dbt_model.sql, my_second_dbt_model.sql가 실행됨
dbt run
- 실행하면 BigQuery에 새로운 테이블, 뷰가 추가됨
dbt utils 설정
- dbt_utils는 dbt에서 유용하게 사용할 수 있는 macro와 Test를 모아둔 패키지
- packages.yml 파일에 dbt_utils 패키지를 추가(이 파일이 없다면 생성하고 추가하면 됨)
packages:
- package: dbt-labs/dbt_utils
version: 1.3.0
- 그 후,
dbt deps
를 실행해 디펜던시를 설치함
사용할 데이터
- BigQuery의 Public Data인 뉴욕 택시 데이터인 tlc_green_trips_2022 테이블을 사용
- Source 데이터 설정
models/sources.yml
에 소스 데이터를 설정해서, dbt 모델링을 할 때 사용할 수 있음- BigQuery Public 데이터인
tlc_green_trips_2022
을 사용할 예정이나, 이 테이블은 파티션 설정이 되어있지 않아 전체를 복사한 후 파티션을 설정할 예정
tlc_green_trips_2022 테이블 파티션 설정
- 아래 쿼리를 실행해 Public이 아닌 자신의 프로젝트의 데이터셋에 저장
CREATE OR REPLACE TABLE newyork_taxi.tlc_green_trips_2022
PARTITION BY DATE(pickup_datetime)
CLUSTER BY pickup_location_id, dropoff_location_id
AS
SELECT *
FROM bigquery-public-data.new_york_taxi_trips.tlc_green_trips_2022;
테이블 정보 저장
models/sources.yml
에 다음과 같이 저장- 처음에 헷갈릴 수 있는 부분은, database, schema, tables 개념인데 핵심만 정리하면 다음과 같음
- database는 구글 클라우드 프로젝트 ID
- schema는 빅쿼리 데이터셋
- tables의 name은 테이블 이름
- tests 부분은 임의로 추가함. 실제로 잘 통과되는 test도 있고, 통과되지 않을 test도 존재
- 처음에 헷갈릴 수 있는 부분은, database, schema, tables 개념인데 핵심만 정리하면 다음과 같음
version: 2
sources:
- name: ny_taxi # dbt에서 사용할 이름
database: zzsza-github-io # 구글 클라우드 프로젝트 ID
schema: newyork_taxi # 빅쿼리 데이터셋 이름
description: "뉴욕 택시 운행 데이터의 파티션된 버전"
tables:
- name: tlc_green_trips_2022 # 빅쿼리 테이블 이름
description: "녹색 택시(Green Taxi)의 운행 기록 데이터. 일자별로 파티션되어 있음"
loaded_at_field: pickup_datetime
columns:
- name: vendor_id
description: "택시 제공업체 식별자"
tests:
- not_null
- accepted_values:
values: ['1', '2']
- name: pickup_datetime
description: "승객 탑승 시간"
tests:
- not_null
- name: dropoff_datetime
description: "승객 하차 시간"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "dropoff_datetime >= pickup_datetime"
- name: store_and_fwd_flag
description: "저장 후 전송 여부 (Y/N)"
tests:
- accepted_values:
values: ['Y', 'N']
- name: rate_code
description: >
요금 코드. 운행 유형에 따른 요금 체계를 나타냅니다:
1.0 = 표준 요금 (Standard rate): 일반적인 시내 운행
2.0 = JFK 공항 (JFK Airport): 고정 요금이 적용되는 JFK 공항행
3.0 = Newark 공항 (Newark Airport): Newark 공항 운행
4.0 = Nassau 또는 Westchester: 교외 지역 운행
5.0 = 협상 요금 (Negotiated fare): 사전에 합의된 요금
6.0 = 그룹 승차 (Group ride): 합승 요금
tests:
- not_null
- accepted_values:
values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
- name: passenger_count
description: "승객 수"
- name: trip_distance
description: "주행 거리 (마일)"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "trip_distance >= 0"
- name: fare_amount
description: "기본 요금"
tests:
- not_null
- name: extra
description: "추가 요금"
tests:
- not_null
- name: mta_tax
description: "MTA 세금"
tests:
- not_null
- name: tip_amount
description: "팁"
tests:
- not_null
- name: tolls_amount
description: "통행료"
tests:
- not_null
- name: ehail_fee
description: "전자 호출 수수료"
- name: airport_fee
description: "공항 수수료"
- name: total_amount
description: "총 금액"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "total_amount >= 0"
- name: payment_type
description: >
결제 방식을 나타내는 코드:
1.0 = 신용카드 (Credit card): 카드 결제
2.0 = 현금 (Cash): 현금 결제
3.0 = 무료 운행 (No charge): 요금이 부과되지 않는 운행
4.0 = 분쟁 (Dispute): 요금 관련 분쟁이 있는 경우
5.0 = 알 수 없음 (Unknown): 결제 방식이 불분명한 경우
6.0 = 취소된 운행 (Voided trip): 운행이 취소된 경우
tests:
- not_null
- accepted_values:
values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
- name: trip_type
description: "운행 유형"
tests:
- not_null
- name: imp_surcharge
description: "개선 부과금"
tests:
- not_null
- name: pickup_location_id
description: "승차 위치 ID"
tests:
- not_null
- name: dropoff_location_id
description: "하차 위치 ID"
tests:
- not_null
- name: data_file_year
description: "데이터 파일의 연도"
tests:차
- not_null
- name: data_file_month
description: "데이터 파일의 월"
tests:
- not_null
# 테이블 수준의 테스트
tests:
- dbt_utils.expression_is_true:
expression: >
fare_amount + COALESCE(extra, 0) + COALESCE(mta_tax, 0) +
COALESCE(tip_amount, 0) + COALESCE(tolls_amount, 0) +
COALESCE(imp_surcharge, 0) <= total_amount
severity: warn
모델링
- locationId 별로 집계하는 간단한 모델을 만들 예정
차원(dimension) 모델링
- pickup_location_id의 특정 차원을 저장
models/dimensions/dim_locations.sql
{{ config( materialized='table', cluster_by=['location_id'] ) }}
SELECT DISTINCT
pickup_location_id AS location_id,
-- 위치의 고유한 특성을 정의
CASE
WHEN pickup_location_id IN ("1", "2") THEN 'airport' -- 공항 위치 ID
WHEN pickup_location_id BETWEEN "100" AND "200" THEN 'manhattan'
ELSE 'outer_borough'
END AS borough_type,
case
WHEN pickup_location_id IN ("1", "2") THEN 'restricted'
ELSE 'general'
END AS service_area_type
from {{ source('ny_taxi', 'tlc_green_trips_2022') }}
팩트(fact) 모델링
- 팩트 테이블은 비즈니스에서 발생하는 이벤트나 트랜잭션을 기록하는 테이블
- 한 번의 택시 운행과 관련된 정보를 저장함
-- models/facts/fact_trips.sql
{{ config(
materialized='incremental',
partition_by={
"field": "pickup_datetime",
"data_type": "timestamp",
"granularity": "day"
},
cluster_by=['pickup_location_id', 'dropoff_location_id']
) }}
select
-- 운행 식별 정보
concat(
vendor_id, '_',
cast(pickup_datetime as string), '_',
cast(pickup_location_id as string)
) as trip_id,
vendor_id,
-- 시간 관련 측정값
pickup_datetime,
dropoff_datetime,
TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) as duration_minutes,
-- 위치 관련 측정값
pickup_location_id,
dropoff_location_id,
trip_distance,
-- 승객 정보
passenger_count,
trip_type,
rate_code,
store_and_fwd_flag,
-- 요금 관련 측정값
fare_amount,
extra,
mta_tax,
tip_amount,
tolls_amount,
ehail_fee,
airport_fee,
imp_surcharge,
total_amount,
payment_type,
-- 성과 지표 계산
case
when fare_amount > 0 then tip_amount / fare_amount
else 0
end as tip_ratio,
case
when TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) > 0
then total_amount / TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE)
else 0
end as revenue_per_minute,
-- 파일 정보
data_file_year,
data_file_month
from {{ source('ny_taxi', 'tlc_green_trips_2022') }}
{% if is_incremental() %}
where pickup_datetime > (select max(pickup_datetime) from {{ this }})
{% endif %}
마트(mart) 모델링
- 마트 테이블 : 특정 기준으로 집계한 데이터
- 예 : 일자별, 지역별 운행 통계
-- models/marts/mart_location_patterns.sql
{{
config(
materialized='incremental',
partition_by={
"field": "data_file_month",
"data_type": "int64",
"granularity": "month",
"range": {
"start": 1,
"end": 12,
"interval": 1
}
},
cluster_by="location_id,borough_type"
)
}}
select
f.data_file_year,
f.data_file_month,
date_trunc(f.pickup_datetime, day) as pickup_date,
l.location_id,
l.borough_type,
l.service_area_type,
count(distinct f.trip_id) as total_trips, -- 고유 trip_id 개수
avg(f.fare_amount) as avg_fare,
avg(case when f.rate_code = '2.0' then 1 else 0 end) as airport_trip_ratio
from {{ ref('fact_trips') }} f
join {{ ref('dim_locations') }} l
on f.pickup_location_id = l.location_id
{% if is_incremental() %}
where f.data_file_month >= (select max(data_file_month) from {{ this }})
{% endif %}
group by
f.data_file_year,
f.data_file_month,
pickup_date,
l.location_id,
l.borough_type,
l.service_area_type
모델 실행
- mart_location_patterns과 앞선 의존성을 모두 실행
- 실행하면 빅쿼리 테이블이 생성됨
dbt run --select +mart_location_patterns
모델 컴파일 확인
- 만약 모델 실행 전에, 컴파일을 통해 확인하고 싶다면 아래 명령어로 가능함
- 컴파일 : 실제 실행 가능한 SQL로 변환하는 명령어
- 변환된 SQL만 생성함
- 생성된 SQL은
target/compiled/{project_name}/models/
에 저장됨
dbt compile --models mart_location_patterns
dbt run을 실행할 때, 오류가 발생한다면
- 컴파일 후 나오는 쿼리문을 BigQuery Console에서 실행 => 오류가 생기면 쿼리가 실행되지 않을 것
- 다 수정했는데도 안된다면 dbt run을 하면서 생성하는 옵션에서 이슈가 있을 수 있음
logs/dbt.log
에서 보면 실행될 시점의 쿼리문을 볼 수 있음- 이 쿼리를 보면서 왜 이런 코드가 실행되는지 확인하면 됨
dbt docs 생성
- dbt 문서를 생성하는 명령어는 크게 2단계로 진행
- generate
- serve
dbt docs generate
dbt docs serve
- localhost:8080에서 확인할 수 있음
- 그리고 우측 아래의 View Lineage Graph를 클릭하면 리니지 형태를 볼 수 있음
모델 Test
- dbt test는 sources.yml과 schema.yml에 정의된 테스트를 실행함
- sources.yml : source의 테이블과 컬럼에 대한 테스트
- schema.yml : 모델의 테이블과 컬럼에 대한 테스트
- 여기선 sources.yml에 정의된 테스트를 실행함
dbt test
- 명령어를 실행하면, 통과되지 못한 Test가 나옴. 이 Test를 수정하면 됨
정리
- dbt + bigquery를 연결해 마트 모델링을 진행
- dbt run model
- dbt compile : SQL로 컴파일
- 의존성을 설치하고 싶을 땐
- dbt deps
- dbt docs는 generate와 serve를 사용
- dbt docs generate
- dbt docs serve
- 테스트는 test
- dbt test
- 모델링을 어떻게 하는지는 별도로 글을 작성할 예정. 이 글에선 dbt + bigquery를 사용하는 방법을 주로 다룸
- 글 작성하는데 걸린 시간 : 3시간 47분
- 하고자 하는 이야기, 개요 정리 : 32분
- 초안 글 작성 : 3시간 15분
카일스쿨 유튜브 채널을 만들었습니다. 데이터 분석, 커리어에 대한 내용을 공유드릴 예정입니다.
PM을 위한 데이터 리터러시 강의를 만들었습니다. 문제 정의, 지표, 실험 설계, 문화 만들기, 로그 설계, 회고 등을 담은 강의입니다
이 글이 도움이 되셨거나 의견이 있으시면 댓글 남겨주셔요.