dbt + BigQuery 데이터 모델링


  • 이 글은 BigQuery 환경에서 dbt 사용법에 대해 작성한 글입니다
  • 예상 독자
  • 키워드 : dbt BigQuery, dbt modeling

dbt + BigQuery를 위한 환경 설정

dbt bigquery 설치

  • dbt-core는 설치했다고 가정하고 dbt-bigquery를 설치
pip3 install dbt-bigquery

dbt init

dbt init newyork_taxi
  • 위 명령어를 입력하면 DB를 선택하는 부분이 나옴. BigQuery로 설정
  • 만약 Runtime Error(No adapters available)이 발생하면 위에 dbt-bigquery를 설치해야 함(DB와 연결하는 라이브러리가 설치되어 있지 않다는 의미)
  • 권한 설정을 어떻게 할지 물어보는 질문도 있는데, oauth와 service_account가 존재
    • 여기선 oauth로 진행. 로컬 개발할 때는 oauth도 추천
    • 회사였으면 service_account로 설정
      • dbt로 만든 것을 주기적으로 실행할 때는 service_account를 발급해서 GitHub Actions나 Airflow에 설정해줘야 하지만, 지금은 로컬 개발에 집중
  • 프로젝트 ID, 데이터셋, threads, timeout seconds, region을 입력

  • ~/.dbt/profiles.yml을 확인하면, 방금 입력한 내용을 확인할 수 있음
cat ~/.dbt/profiles.yml
newyork_taxi:
  outputs:
    dev:
      dataset: newyork_taxi
      job_execution_timeout_seconds: 300
      job_retries: 1
      location: US
      method: oauth
      priority: interactive
      project: zzsza-github-io
      threads: 4
      type: bigquery
  target: dev
  • 프로필 파일을 dbt 작업 프로젝트로 옮기려면 아래 명령어 실행(홈 디렉토리 안에 프로필 파일이 있는데, 회사에서 작업한다면 이런 프로필을 GitHub Repository에 넣고 Push)
    • dbt_project.yml : 프로젝트 설정 파일. 프로젝트 구조, 리소스 설정, 변수 등
    • profiles.yml : 프로필 설정 파일. 데이터베이스 연결 정보 등이 저장됨
cp ~/.dbt/profiles.yml .

gcloud를 사용해 oauth

  • 터미널에서 아래 명령어 입력(gcloud 사전에 설치되어야 함)
    • gcloud 설치 방법은 Docs 참고
gcloud auth application-default login \
  --scopes=https://www.googleapis.com/auth/bigquery,\
https://www.googleapis.com/auth/drive.readonly,\
https://www.googleapis.com/auth/iam.test

dbt run

  • 샘플 모델을 실행
    • models/example 폴더에 있는 my_first_dbt_model.sql, my_second_dbt_model.sql가 실행됨
dbt run
  • 실행하면 BigQuery에 새로운 테이블, 뷰가 추가됨

dbt utils 설정

  • dbt_utils는 dbt에서 유용하게 사용할 수 있는 macro와 Test를 모아둔 패키지
  • packages.yml 파일에 dbt_utils 패키지를 추가(이 파일이 없다면 생성하고 추가하면 됨)
packages:
  - package: dbt-labs/dbt_utils
    version: 1.3.0
  • 그 후, dbt deps를 실행해 디펜던시를 설치함



사용할 데이터

  • BigQuery의 Public Data인 뉴욕 택시 데이터인 tlc_green_trips_2022 테이블을 사용
  • Source 데이터 설정
    • models/sources.yml에 소스 데이터를 설정해서, dbt 모델링을 할 때 사용할 수 있음
    • BigQuery Public 데이터인 tlc_green_trips_2022을 사용할 예정이나, 이 테이블은 파티션 설정이 되어있지 않아 전체를 복사한 후 파티션을 설정할 예정

tlc_green_trips_2022 테이블 파티션 설정

  • 아래 쿼리를 실행해 Public이 아닌 자신의 프로젝트의 데이터셋에 저장
CREATE OR REPLACE TABLE newyork_taxi.tlc_green_trips_2022
PARTITION BY DATE(pickup_datetime)
CLUSTER BY pickup_location_id, dropoff_location_id
AS
SELECT *
FROM bigquery-public-data.new_york_taxi_trips.tlc_green_trips_2022;

테이블 정보 저장

  • models/sources.yml에 다음과 같이 저장
    • 처음에 헷갈릴 수 있는 부분은, database, schema, tables 개념인데 핵심만 정리하면 다음과 같음
      • database는 구글 클라우드 프로젝트 ID
      • schema는 빅쿼리 데이터셋
      • tables의 name은 테이블 이름
    • tests 부분은 임의로 추가함. 실제로 잘 통과되는 test도 있고, 통과되지 않을 test도 존재
version: 2

sources:
  - name: ny_taxi # dbt에서 사용할 이름
    database: zzsza-github-io # 구글 클라우드 프로젝트 ID
    schema: newyork_taxi # 빅쿼리 데이터셋 이름
    description: "뉴욕 택시 운행 데이터의 파티션된 버전"
    
    tables:
      - name: tlc_green_trips_2022 # 빅쿼리 테이블 이름
        description: "녹색 택시(Green Taxi)의 운행 기록 데이터. 일자별로 파티션되어 있음"
        
        loaded_at_field: pickup_datetime

        columns:
          - name: vendor_id
            description: "택시 제공업체 식별자"
            tests:
              - not_null
              - accepted_values:
                  values: ['1', '2']

          - name: pickup_datetime
            description: "승객 탑승 시간"
            tests:
              - not_null

          - name: dropoff_datetime
            description: "승객 하차 시간"
            tests:
              - not_null
              - dbt_utils.expression_is_true:
                  expression: "dropoff_datetime >= pickup_datetime"

          - name: store_and_fwd_flag
            description: "저장 후 전송 여부 (Y/N)"
            tests:
              - accepted_values:
                  values: ['Y', 'N']

          - name: rate_code
            description: >
              요금 코드. 운행 유형에 따른 요금 체계를 나타냅니다:
              1.0 = 표준 요금 (Standard rate): 일반적인 시내 운행
              2.0 = JFK 공항 (JFK Airport): 고정 요금이 적용되는 JFK 공항행
              3.0 = Newark 공항 (Newark Airport): Newark 공항 운행
              4.0 = Nassau 또는 Westchester: 교외 지역 운행
              5.0 = 협상 요금 (Negotiated fare): 사전에 합의된 요금
              6.0 = 그룹 승차 (Group ride): 합승 요금
            tests:
              - not_null
              - accepted_values:
                  values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]

          - name: passenger_count
            description: "승객 수"

          - name: trip_distance
            description: "주행 거리 (마일)"
            tests:
              - not_null
              - dbt_utils.expression_is_true:
                  expression: "trip_distance >= 0"

          - name: fare_amount
            description: "기본 요금"
            tests:
              - not_null

          - name: extra
            description: "추가 요금"
            tests:
              - not_null

          - name: mta_tax
            description: "MTA 세금"
            tests:
              - not_null

          - name: tip_amount
            description: "팁"
            tests:
              - not_null

          - name: tolls_amount
            description: "통행료"
            tests:
              - not_null

          - name: ehail_fee
            description: "전자 호출 수수료"

          - name: airport_fee
            description: "공항 수수료"

          - name: total_amount
            description: "총 금액"
            tests:
              - not_null
              - dbt_utils.expression_is_true:
                  expression: "total_amount >= 0"

          - name: payment_type
            description: >
              결제 방식을 나타내는 코드:
              1.0 = 신용카드 (Credit card): 카드 결제
              2.0 = 현금 (Cash): 현금 결제
              3.0 = 무료 운행 (No charge): 요금이 부과되지 않는 운행
              4.0 = 분쟁 (Dispute): 요금 관련 분쟁이 있는 경우
              5.0 = 알 수 없음 (Unknown): 결제 방식이 불분명한 경우
              6.0 = 취소된 운행 (Voided trip): 운행이 취소된 경우
            tests:
              - not_null
              - accepted_values:
                  values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]

          - name: trip_type
            description: "운행 유형"
            tests:
              - not_null

          - name: imp_surcharge
            description: "개선 부과금"
            tests:
              - not_null

          - name: pickup_location_id
            description: "승차 위치 ID"
            tests:
              - not_null

          - name: dropoff_location_id
            description: "하차 위치 ID"
            tests:
              - not_null

          - name: data_file_year
            description: "데이터 파일의 연도"
            tests:차
              - not_null

          - name: data_file_month
            description: "데이터 파일의 월"
            tests:
              - not_null

        # 테이블 수준의 테스트
        tests:
          - dbt_utils.expression_is_true:
              expression: >
                fare_amount + COALESCE(extra, 0) + COALESCE(mta_tax, 0) + 
                COALESCE(tip_amount, 0) + COALESCE(tolls_amount, 0) + 
                COALESCE(imp_surcharge, 0) <= total_amount
              severity: warn



모델링

  • locationId 별로 집계하는 간단한 모델을 만들 예정

차원(dimension) 모델링

  • pickup_location_id의 특정 차원을 저장
  • models/dimensions/dim_locations.sql

{{ config( materialized='table', cluster_by=['location_id'] ) }}

SELECT DISTINCT
    pickup_location_id AS location_id,
    -- 위치의 고유한 특성을 정의
    CASE 
        WHEN pickup_location_id IN ("1", "2") THEN 'airport'  -- 공항 위치 ID
        WHEN pickup_location_id BETWEEN "100" AND "200" THEN 'manhattan'
        ELSE 'outer_borough'
    END AS borough_type,
    case
        WHEN pickup_location_id IN ("1", "2") THEN 'restricted'
        ELSE 'general'
    END AS service_area_type
from {{ source('ny_taxi', 'tlc_green_trips_2022') }}


팩트(fact) 모델링

  • 팩트 테이블은 비즈니스에서 발생하는 이벤트나 트랜잭션을 기록하는 테이블
    • 한 번의 택시 운행과 관련된 정보를 저장함

-- models/facts/fact_trips.sql
{{ config(
    materialized='incremental',
    partition_by={
        "field": "pickup_datetime",
        "data_type": "timestamp",
        "granularity": "day"
    },
    cluster_by=['pickup_location_id', 'dropoff_location_id']
) }}

select
    -- 운행 식별 정보
    
        concat(
        vendor_id, '_',
        cast(pickup_datetime as string), '_',
        cast(pickup_location_id as string)
    ) as trip_id,
    vendor_id,
    
    -- 시간 관련 측정값
    pickup_datetime,
    dropoff_datetime,
    TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) as duration_minutes,    
    -- 위치 관련 측정값
    pickup_location_id,
    dropoff_location_id,
    trip_distance,
    
    -- 승객 정보
    passenger_count,
    trip_type,
    rate_code,
    store_and_fwd_flag,
    
    -- 요금 관련 측정값
    fare_amount,
    extra,
    mta_tax,
    tip_amount,
    tolls_amount,
    ehail_fee,
    airport_fee,
    imp_surcharge,
    total_amount,
    payment_type,
    
    -- 성과 지표 계산
    case 
        when fare_amount > 0 then tip_amount / fare_amount 
        else 0 
    end as tip_ratio,
    
    case 
        when TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) > 0 
        then total_amount / TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE)
        else 0 
    end as revenue_per_minute,
    
    -- 파일 정보
    data_file_year,
    data_file_month

from {{ source('ny_taxi', 'tlc_green_trips_2022') }}

{% if is_incremental() %}
where pickup_datetime > (select max(pickup_datetime) from {{ this }})
{% endif %}

마트(mart) 모델링

  • 마트 테이블 : 특정 기준으로 집계한 데이터
    • 예 : 일자별, 지역별 운행 통계

-- models/marts/mart_location_patterns.sql
{{
    config(
        materialized='incremental',
        partition_by={
            "field": "data_file_month",
            "data_type": "int64",
            "granularity": "month",
            "range": {
              "start": 1,
              "end": 12,
              "interval": 1
            }
        },
        cluster_by="location_id,borough_type"
    )
}}

select
    f.data_file_year,
    f.data_file_month,
    date_trunc(f.pickup_datetime, day) as pickup_date, 
    l.location_id,
    l.borough_type,
    l.service_area_type,
    count(distinct f.trip_id) as total_trips,  -- 고유 trip_id 개수
    avg(f.fare_amount) as avg_fare,
    avg(case when f.rate_code = '2.0' then 1 else 0 end) as airport_trip_ratio
from {{ ref('fact_trips') }} f
join {{ ref('dim_locations') }} l
    on f.pickup_location_id = l.location_id
{% if is_incremental() %}
  where f.data_file_month >= (select max(data_file_month) from {{ this }})
{% endif %}
group by
    f.data_file_year,
    f.data_file_month,
    pickup_date,
    l.location_id,
    l.borough_type,
    l.service_area_type

모델 실행

  • mart_location_patterns과 앞선 의존성을 모두 실행
    • 실행하면 빅쿼리 테이블이 생성됨
dbt run --select +mart_location_patterns

모델 컴파일 확인

  • 만약 모델 실행 전에, 컴파일을 통해 확인하고 싶다면 아래 명령어로 가능함
    • 컴파일 : 실제 실행 가능한 SQL로 변환하는 명령어
    • 변환된 SQL만 생성함
    • 생성된 SQL은 target/compiled/{project_name}/models/ 에 저장됨
dbt compile --models mart_location_patterns

dbt run을 실행할 때, 오류가 발생한다면

  • 컴파일 후 나오는 쿼리문을 BigQuery Console에서 실행 => 오류가 생기면 쿼리가 실행되지 않을 것
  • 다 수정했는데도 안된다면 dbt run을 하면서 생성하는 옵션에서 이슈가 있을 수 있음
    • logs/dbt.log에서 보면 실행될 시점의 쿼리문을 볼 수 있음
    • 이 쿼리를 보면서 왜 이런 코드가 실행되는지 확인하면 됨



dbt docs 생성

  • dbt 문서를 생성하는 명령어는 크게 2단계로 진행
    • generate
    • serve
dbt docs generate
dbt docs serve
  • localhost:8080에서 확인할 수 있음

  • 그리고 우측 아래의 View Lineage Graph를 클릭하면 리니지 형태를 볼 수 있음



모델 Test

  • dbt test는 sources.yml과 schema.yml에 정의된 테스트를 실행함
    • sources.yml : source의 테이블과 컬럼에 대한 테스트
    • schema.yml : 모델의 테이블과 컬럼에 대한 테스트
    • 여기선 sources.yml에 정의된 테스트를 실행함
dbt test
  • 명령어를 실행하면, 통과되지 못한 Test가 나옴. 이 Test를 수정하면 됨



정리

  • dbt + bigquery를 연결해 마트 모델링을 진행
    • dbt run model
    • dbt compile : SQL로 컴파일
  • 의존성을 설치하고 싶을 땐
    • dbt deps
  • dbt docs는 generate와 serve를 사용
    • dbt docs generate
    • dbt docs serve
  • 테스트는 test
    • dbt test
  • 모델링을 어떻게 하는지는 별도로 글을 작성할 예정. 이 글에선 dbt + bigquery를 사용하는 방법을 주로 다룸




  • 글 작성하는데 걸린 시간 : 3시간 47분
    • 하고자 하는 이야기, 개요 정리 : 32분
    • 초안 글 작성 : 3시간 15분

카일스쿨 유튜브 채널을 만들었습니다. 데이터 사이언스, 성장, 리더십, BigQuery 등을 이야기할 예정이니, 관심 있으시면 구독 부탁드립니다 :)

PM을 위한 데이터 리터러시 강의를 만들었습니다. 문제 정의, 지표, 실험 설계, 문화 만들기, 로그 설계, 회고 등을 담은 강의입니다

이 글이 도움이 되셨거나 다양한 의견이 있다면 댓글 부탁드립니다 :)

Buy me a coffeeBuy me a coffee





© 2017. by Seongyun Byeon

Powered by zzsza