dbt + BigQuery 데이터 모델링
in Data Engineering on dbt
- 이 글은 BigQuery 환경에서 dbt 사용법에 대해 작성한 글입니다
- 예상 독자
- BigQuery에서 dbt를 사용하는 방법에 궁금하신 분
- 앞선 dbt 사용법과 기본 개념 글을 읽으신 분
- 키워드 : dbt BigQuery, dbt modeling
dbt + BigQuery를 위한 환경 설정
dbt bigquery 설치
- dbt-core는 설치했다고 가정하고 dbt-bigquery를 설치
pip3 install dbt-bigquery
dbt init
dbt init newyork_taxi
- 위 명령어를 입력하면 DB를 선택하는 부분이 나옴. BigQuery로 설정
- 만약 Runtime Error(No adapters available)이 발생하면 위에 dbt-bigquery를 설치해야 함(DB와 연결하는 라이브러리가 설치되어 있지 않다는 의미)
- 권한 설정을 어떻게 할지 물어보는 질문도 있는데, oauth와 service_account가 존재
- 여기선 oauth로 진행. 로컬 개발할 때는 oauth도 추천
- 회사였으면 service_account로 설정
- dbt로 만든 것을 주기적으로 실행할 때는 service_account를 발급해서 GitHub Actions나 Airflow에 설정해줘야 하지만, 지금은 로컬 개발에 집중
- 프로젝트 ID, 데이터셋, threads, timeout seconds, region을 입력
~/.dbt/profiles.yml
을 확인하면, 방금 입력한 내용을 확인할 수 있음
cat ~/.dbt/profiles.yml
newyork_taxi:
outputs:
dev:
dataset: newyork_taxi
job_execution_timeout_seconds: 300
job_retries: 1
location: US
method: oauth
priority: interactive
project: zzsza-github-io
threads: 4
type: bigquery
target: dev
- 프로필 파일을 dbt 작업 프로젝트로 옮기려면 아래 명령어 실행(홈 디렉토리 안에 프로필 파일이 있는데, 회사에서 작업한다면 이런 프로필을 GitHub Repository에 넣고 Push)
- dbt_project.yml : 프로젝트 설정 파일. 프로젝트 구조, 리소스 설정, 변수 등
- profiles.yml : 프로필 설정 파일. 데이터베이스 연결 정보 등이 저장됨
cp ~/.dbt/profiles.yml .
gcloud를 사용해 oauth
- 터미널에서 아래 명령어 입력(gcloud 사전에 설치되어야 함)
- gcloud 설치 방법은 Docs 참고
gcloud auth application-default login \
--scopes=https://www.googleapis.com/auth/bigquery,\
https://www.googleapis.com/auth/drive.readonly,\
https://www.googleapis.com/auth/iam.test
dbt run
- 샘플 모델을 실행
models/example
폴더에 있는 my_first_dbt_model.sql, my_second_dbt_model.sql가 실행됨
dbt run
- 실행하면 BigQuery에 새로운 테이블, 뷰가 추가됨
dbt utils 설정
- dbt_utils는 dbt에서 유용하게 사용할 수 있는 macro와 Test를 모아둔 패키지
- packages.yml 파일에 dbt_utils 패키지를 추가(이 파일이 없다면 생성하고 추가하면 됨)
packages:
- package: dbt-labs/dbt_utils
version: 1.3.0
- 그 후,
dbt deps
를 실행해 디펜던시를 설치함
사용할 데이터
- BigQuery의 Public Data인 뉴욕 택시 데이터인 tlc_green_trips_2022 테이블을 사용
- Source 데이터 설정
models/sources.yml
에 소스 데이터를 설정해서, dbt 모델링을 할 때 사용할 수 있음- BigQuery Public 데이터인
tlc_green_trips_2022
을 사용할 예정이나, 이 테이블은 파티션 설정이 되어있지 않아 전체를 복사한 후 파티션을 설정할 예정
tlc_green_trips_2022 테이블 파티션 설정
- 아래 쿼리를 실행해 Public이 아닌 자신의 프로젝트의 데이터셋에 저장
CREATE OR REPLACE TABLE newyork_taxi.tlc_green_trips_2022
PARTITION BY DATE(pickup_datetime)
CLUSTER BY pickup_location_id, dropoff_location_id
AS
SELECT *
FROM bigquery-public-data.new_york_taxi_trips.tlc_green_trips_2022;
테이블 정보 저장
models/sources.yml
에 다음과 같이 저장- 처음에 헷갈릴 수 있는 부분은, database, schema, tables 개념인데 핵심만 정리하면 다음과 같음
- database는 구글 클라우드 프로젝트 ID
- schema는 빅쿼리 데이터셋
- tables의 name은 테이블 이름
- tests 부분은 임의로 추가함. 실제로 잘 통과되는 test도 있고, 통과되지 않을 test도 존재
- 처음에 헷갈릴 수 있는 부분은, database, schema, tables 개념인데 핵심만 정리하면 다음과 같음
version: 2
sources:
- name: ny_taxi # dbt에서 사용할 이름
database: zzsza-github-io # 구글 클라우드 프로젝트 ID
schema: newyork_taxi # 빅쿼리 데이터셋 이름
description: "뉴욕 택시 운행 데이터의 파티션된 버전"
tables:
- name: tlc_green_trips_2022 # 빅쿼리 테이블 이름
description: "녹색 택시(Green Taxi)의 운행 기록 데이터. 일자별로 파티션되어 있음"
loaded_at_field: pickup_datetime
columns:
- name: vendor_id
description: "택시 제공업체 식별자"
tests:
- not_null
- accepted_values:
values: ['1', '2']
- name: pickup_datetime
description: "승객 탑승 시간"
tests:
- not_null
- name: dropoff_datetime
description: "승객 하차 시간"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "dropoff_datetime >= pickup_datetime"
- name: store_and_fwd_flag
description: "저장 후 전송 여부 (Y/N)"
tests:
- accepted_values:
values: ['Y', 'N']
- name: rate_code
description: >
요금 코드. 운행 유형에 따른 요금 체계를 나타냅니다:
1.0 = 표준 요금 (Standard rate): 일반적인 시내 운행
2.0 = JFK 공항 (JFK Airport): 고정 요금이 적용되는 JFK 공항행
3.0 = Newark 공항 (Newark Airport): Newark 공항 운행
4.0 = Nassau 또는 Westchester: 교외 지역 운행
5.0 = 협상 요금 (Negotiated fare): 사전에 합의된 요금
6.0 = 그룹 승차 (Group ride): 합승 요금
tests:
- not_null
- accepted_values:
values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
- name: passenger_count
description: "승객 수"
- name: trip_distance
description: "주행 거리 (마일)"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "trip_distance >= 0"
- name: fare_amount
description: "기본 요금"
tests:
- not_null
- name: extra
description: "추가 요금"
tests:
- not_null
- name: mta_tax
description: "MTA 세금"
tests:
- not_null
- name: tip_amount
description: "팁"
tests:
- not_null
- name: tolls_amount
description: "통행료"
tests:
- not_null
- name: ehail_fee
description: "전자 호출 수수료"
- name: airport_fee
description: "공항 수수료"
- name: total_amount
description: "총 금액"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "total_amount >= 0"
- name: payment_type
description: >
결제 방식을 나타내는 코드:
1.0 = 신용카드 (Credit card): 카드 결제
2.0 = 현금 (Cash): 현금 결제
3.0 = 무료 운행 (No charge): 요금이 부과되지 않는 운행
4.0 = 분쟁 (Dispute): 요금 관련 분쟁이 있는 경우
5.0 = 알 수 없음 (Unknown): 결제 방식이 불분명한 경우
6.0 = 취소된 운행 (Voided trip): 운행이 취소된 경우
tests:
- not_null
- accepted_values:
values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
- name: trip_type
description: "운행 유형"
tests:
- not_null
- name: imp_surcharge
description: "개선 부과금"
tests:
- not_null
- name: pickup_location_id
description: "승차 위치 ID"
tests:
- not_null
- name: dropoff_location_id
description: "하차 위치 ID"
tests:
- not_null
- name: data_file_year
description: "데이터 파일의 연도"
tests:차
- not_null
- name: data_file_month
description: "데이터 파일의 월"
tests:
- not_null
# 테이블 수준의 테스트
tests:
- dbt_utils.expression_is_true:
expression: >
fare_amount + COALESCE(extra, 0) + COALESCE(mta_tax, 0) +
COALESCE(tip_amount, 0) + COALESCE(tolls_amount, 0) +
COALESCE(imp_surcharge, 0) <= total_amount
severity: warn
모델링
- locationId 별로 집계하는 간단한 모델을 만들 예정
차원(dimension) 모델링
- pickup_location_id의 특정 차원을 저장
models/dimensions/dim_locations.sql
{{ config( materialized='table', cluster_by=['location_id'] ) }}
SELECT DISTINCT
pickup_location_id AS location_id,
-- 위치의 고유한 특성을 정의
CASE
WHEN pickup_location_id IN ("1", "2") THEN 'airport' -- 공항 위치 ID
WHEN pickup_location_id BETWEEN "100" AND "200" THEN 'manhattan'
ELSE 'outer_borough'
END AS borough_type,
case
WHEN pickup_location_id IN ("1", "2") THEN 'restricted'
ELSE 'general'
END AS service_area_type
from {{ source('ny_taxi', 'tlc_green_trips_2022') }}
팩트(fact) 모델링
- 팩트 테이블은 비즈니스에서 발생하는 이벤트나 트랜잭션을 기록하는 테이블
- 한 번의 택시 운행과 관련된 정보를 저장함
-- models/facts/fact_trips.sql
{{ config(
materialized='incremental',
partition_by={
"field": "pickup_datetime",
"data_type": "timestamp",
"granularity": "day"
},
cluster_by=['pickup_location_id', 'dropoff_location_id']
) }}
select
-- 운행 식별 정보
concat(
vendor_id, '_',
cast(pickup_datetime as string), '_',
cast(pickup_location_id as string)
) as trip_id,
vendor_id,
-- 시간 관련 측정값
pickup_datetime,
dropoff_datetime,
TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) as duration_minutes,
-- 위치 관련 측정값
pickup_location_id,
dropoff_location_id,
trip_distance,
-- 승객 정보
passenger_count,
trip_type,
rate_code,
store_and_fwd_flag,
-- 요금 관련 측정값
fare_amount,
extra,
mta_tax,
tip_amount,
tolls_amount,
ehail_fee,
airport_fee,
imp_surcharge,
total_amount,
payment_type,
-- 성과 지표 계산
case
when fare_amount > 0 then tip_amount / fare_amount
else 0
end as tip_ratio,
case
when TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) > 0
then total_amount / TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE)
else 0
end as revenue_per_minute,
-- 파일 정보
data_file_year,
data_file_month
from {{ source('ny_taxi', 'tlc_green_trips_2022') }}
{% if is_incremental() %}
where pickup_datetime > (select max(pickup_datetime) from {{ this }})
{% endif %}
마트(mart) 모델링
- 마트 테이블 : 특정 기준으로 집계한 데이터
- 예 : 일자별, 지역별 운행 통계
-- models/marts/mart_location_patterns.sql
{{
config(
materialized='incremental',
partition_by={
"field": "data_file_month",
"data_type": "int64",
"granularity": "month",
"range": {
"start": 1,
"end": 12,
"interval": 1
}
},
cluster_by="location_id,borough_type"
)
}}
select
f.data_file_year,
f.data_file_month,
date_trunc(f.pickup_datetime, day) as pickup_date,
l.location_id,
l.borough_type,
l.service_area_type,
count(distinct f.trip_id) as total_trips, -- 고유 trip_id 개수
avg(f.fare_amount) as avg_fare,
avg(case when f.rate_code = '2.0' then 1 else 0 end) as airport_trip_ratio
from {{ ref('fact_trips') }} f
join {{ ref('dim_locations') }} l
on f.pickup_location_id = l.location_id
{% if is_incremental() %}
where f.data_file_month >= (select max(data_file_month) from {{ this }})
{% endif %}
group by
f.data_file_year,
f.data_file_month,
pickup_date,
l.location_id,
l.borough_type,
l.service_area_type
모델 실행
- mart_location_patterns과 앞선 의존성을 모두 실행
- 실행하면 빅쿼리 테이블이 생성됨
dbt run --select +mart_location_patterns
모델 컴파일 확인
- 만약 모델 실행 전에, 컴파일을 통해 확인하고 싶다면 아래 명령어로 가능함
- 컴파일 : 실제 실행 가능한 SQL로 변환하는 명령어
- 변환된 SQL만 생성함
- 생성된 SQL은
target/compiled/{project_name}/models/
에 저장됨
dbt compile --models mart_location_patterns
dbt run을 실행할 때, 오류가 발생한다면
- 컴파일 후 나오는 쿼리문을 BigQuery Console에서 실행 => 오류가 생기면 쿼리가 실행되지 않을 것
- 다 수정했는데도 안된다면 dbt run을 하면서 생성하는 옵션에서 이슈가 있을 수 있음
logs/dbt.log
에서 보면 실행될 시점의 쿼리문을 볼 수 있음- 이 쿼리를 보면서 왜 이런 코드가 실행되는지 확인하면 됨
dbt docs 생성
- dbt 문서를 생성하는 명령어는 크게 2단계로 진행
- generate
- serve
dbt docs generate
dbt docs serve
- localhost:8080에서 확인할 수 있음
- 그리고 우측 아래의 View Lineage Graph를 클릭하면 리니지 형태를 볼 수 있음
모델 Test
- dbt test는 sources.yml과 schema.yml에 정의된 테스트를 실행함
- sources.yml : source의 테이블과 컬럼에 대한 테스트
- schema.yml : 모델의 테이블과 컬럼에 대한 테스트
- 여기선 sources.yml에 정의된 테스트를 실행함
dbt test
- 명령어를 실행하면, 통과되지 못한 Test가 나옴. 이 Test를 수정하면 됨
정리
- dbt + bigquery를 연결해 마트 모델링을 진행
- dbt run model
- dbt compile : SQL로 컴파일
- 의존성을 설치하고 싶을 땐
- dbt deps
- dbt docs는 generate와 serve를 사용
- dbt docs generate
- dbt docs serve
- 테스트는 test
- dbt test
- 모델링을 어떻게 하는지는 별도로 글을 작성할 예정. 이 글에선 dbt + bigquery를 사용하는 방법을 주로 다룸
- 글 작성하는데 걸린 시간 : 3시간 47분
- 하고자 하는 이야기, 개요 정리 : 32분
- 초안 글 작성 : 3시간 15분
카일스쿨 유튜브 채널을 만들었습니다. 데이터 사이언스, 성장, 리더십, BigQuery 등을 이야기할 예정이니, 관심 있으시면 구독 부탁드립니다 :)
PM을 위한 데이터 리터러시 강의를 만들었습니다. 문제 정의, 지표, 실험 설계, 문화 만들기, 로그 설계, 회고 등을 담은 강의입니다
이 글이 도움이 되셨거나 다양한 의견이 있다면 댓글 부탁드립니다 :)