dbt + BigQuery 데이터 모델링


  • 이 글은 BigQuery 환경에서 dbt 사용법에 대해 작성한 글입니다
  • 예상 독자
  • 키워드 : dbt BigQuery, dbt modeling

dbt + BigQuery를 위한 환경 설정

dbt bigquery 설치

  • dbt-core는 설치했다고 가정하고 dbt-bigquery를 설치
pip3 install dbt-bigquery

dbt init

dbt init newyork_taxi
  • 위 명령어를 입력하면 DB를 선택하는 부분이 나옴. BigQuery로 설정
  • 만약 Runtime Error(No adapters available)이 발생하면 위에 dbt-bigquery를 설치해야 함(DB와 연결하는 라이브러리가 설치되어 있지 않다는 의미)
  • 권한 설정을 어떻게 할지 물어보는 질문도 있는데, oauth와 service_account가 존재
    • 여기선 oauth로 진행. 로컬 개발할 때는 oauth도 추천
    • 회사였으면 service_account로 설정
      • dbt로 만든 것을 주기적으로 실행할 때는 service_account를 발급해서 GitHub Actions나 Airflow에 설정해줘야 하지만, 지금은 로컬 개발에 집중
  • 프로젝트 ID, 데이터셋, threads, timeout seconds, region을 입력

  • ~/.dbt/profiles.yml을 확인하면, 방금 입력한 내용을 확인할 수 있음
cat ~/.dbt/profiles.yml
newyork_taxi:
  outputs:
    dev:
      dataset: newyork_taxi
      job_execution_timeout_seconds: 300
      job_retries: 1
      location: US
      method: oauth
      priority: interactive
      project: zzsza-github-io
      threads: 4
      type: bigquery
  target: dev
  • 프로필 파일을 dbt 작업 프로젝트로 옮기려면 아래 명령어 실행(홈 디렉토리 안에 프로필 파일이 있는데, 회사에서 작업한다면 이런 프로필을 GitHub Repository에 넣고 Push)
    • dbt_project.yml : 프로젝트 설정 파일. 프로젝트 구조, 리소스 설정, 변수 등
    • profiles.yml : 프로필 설정 파일. 데이터베이스 연결 정보 등이 저장됨
cp ~/.dbt/profiles.yml .

gcloud를 사용해 oauth

  • 터미널에서 아래 명령어 입력(gcloud 사전에 설치되어야 함)
    • gcloud 설치 방법은 Docs 참고
gcloud auth application-default login \
  --scopes=https://www.googleapis.com/auth/bigquery,\
https://www.googleapis.com/auth/drive.readonly,\
https://www.googleapis.com/auth/iam.test

dbt run

  • 샘플 모델을 실행
    • models/example 폴더에 있는 my_first_dbt_model.sql, my_second_dbt_model.sql가 실행됨
dbt run
  • 실행하면 BigQuery에 새로운 테이블, 뷰가 추가됨

dbt utils 설정

  • dbt_utils는 dbt에서 유용하게 사용할 수 있는 macro와 Test를 모아둔 패키지
  • packages.yml 파일에 dbt_utils 패키지를 추가(이 파일이 없다면 생성하고 추가하면 됨)
packages:
  - package: dbt-labs/dbt_utils
    version: 1.3.0
  • 그 후, dbt deps를 실행해 디펜던시를 설치함



사용할 데이터

  • BigQuery의 Public Data인 뉴욕 택시 데이터인 tlc_green_trips_2022 테이블을 사용
  • Source 데이터 설정
    • models/sources.yml에 소스 데이터를 설정해서, dbt 모델링을 할 때 사용할 수 있음
    • BigQuery Public 데이터인 tlc_green_trips_2022을 사용할 예정이나, 이 테이블은 파티션 설정이 되어있지 않아 전체를 복사한 후 파티션을 설정할 예정

tlc_green_trips_2022 테이블 파티션 설정

  • 아래 쿼리를 실행해 Public이 아닌 자신의 프로젝트의 데이터셋에 저장
CREATE OR REPLACE TABLE newyork_taxi.tlc_green_trips_2022
PARTITION BY DATE(pickup_datetime)
CLUSTER BY pickup_location_id, dropoff_location_id
AS
SELECT *
FROM bigquery-public-data.new_york_taxi_trips.tlc_green_trips_2022;

테이블 정보 저장

  • models/sources.yml에 다음과 같이 저장
    • 처음에 헷갈릴 수 있는 부분은, database, schema, tables 개념인데 핵심만 정리하면 다음과 같음
      • database는 구글 클라우드 프로젝트 ID
      • schema는 빅쿼리 데이터셋
      • tables의 name은 테이블 이름
    • tests 부분은 임의로 추가함. 실제로 잘 통과되는 test도 있고, 통과되지 않을 test도 존재
version: 2

sources:
  - name: ny_taxi # dbt에서 사용할 이름
    database: zzsza-github-io # 구글 클라우드 프로젝트 ID
    schema: newyork_taxi # 빅쿼리 데이터셋 이름
    description: "뉴욕 택시 운행 데이터의 파티션된 버전"
    
    tables:
      - name: tlc_green_trips_2022 # 빅쿼리 테이블 이름
        description: "녹색 택시(Green Taxi)의 운행 기록 데이터. 일자별로 파티션되어 있음"
        
        loaded_at_field: pickup_datetime

        columns:
          - name: vendor_id
            description: "택시 제공업체 식별자"
            tests:
              - not_null
              - accepted_values:
                  values: ['1', '2']

          - name: pickup_datetime
            description: "승객 탑승 시간"
            tests:
              - not_null

          - name: dropoff_datetime
            description: "승객 하차 시간"
            tests:
              - not_null
              - dbt_utils.expression_is_true:
                  expression: "dropoff_datetime >= pickup_datetime"

          - name: store_and_fwd_flag
            description: "저장 후 전송 여부 (Y/N)"
            tests:
              - accepted_values:
                  values: ['Y', 'N']

          - name: rate_code
            description: >
              요금 코드. 운행 유형에 따른 요금 체계를 나타냅니다:
              1.0 = 표준 요금 (Standard rate): 일반적인 시내 운행
              2.0 = JFK 공항 (JFK Airport): 고정 요금이 적용되는 JFK 공항행
              3.0 = Newark 공항 (Newark Airport): Newark 공항 운행
              4.0 = Nassau 또는 Westchester: 교외 지역 운행
              5.0 = 협상 요금 (Negotiated fare): 사전에 합의된 요금
              6.0 = 그룹 승차 (Group ride): 합승 요금
            tests:
              - not_null
              - accepted_values:
                  values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]

          - name: passenger_count
            description: "승객 수"

          - name: trip_distance
            description: "주행 거리 (마일)"
            tests:
              - not_null
              - dbt_utils.expression_is_true:
                  expression: "trip_distance >= 0"

          - name: fare_amount
            description: "기본 요금"
            tests:
              - not_null

          - name: extra
            description: "추가 요금"
            tests:
              - not_null

          - name: mta_tax
            description: "MTA 세금"
            tests:
              - not_null

          - name: tip_amount
            description: "팁"
            tests:
              - not_null

          - name: tolls_amount
            description: "통행료"
            tests:
              - not_null

          - name: ehail_fee
            description: "전자 호출 수수료"

          - name: airport_fee
            description: "공항 수수료"

          - name: total_amount
            description: "총 금액"
            tests:
              - not_null
              - dbt_utils.expression_is_true:
                  expression: "total_amount >= 0"

          - name: payment_type
            description: >
              결제 방식을 나타내는 코드:
              1.0 = 신용카드 (Credit card): 카드 결제
              2.0 = 현금 (Cash): 현금 결제
              3.0 = 무료 운행 (No charge): 요금이 부과되지 않는 운행
              4.0 = 분쟁 (Dispute): 요금 관련 분쟁이 있는 경우
              5.0 = 알 수 없음 (Unknown): 결제 방식이 불분명한 경우
              6.0 = 취소된 운행 (Voided trip): 운행이 취소된 경우
            tests:
              - not_null
              - accepted_values:
                  values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]

          - name: trip_type
            description: "운행 유형"
            tests:
              - not_null

          - name: imp_surcharge
            description: "개선 부과금"
            tests:
              - not_null

          - name: pickup_location_id
            description: "승차 위치 ID"
            tests:
              - not_null

          - name: dropoff_location_id
            description: "하차 위치 ID"
            tests:
              - not_null

          - name: data_file_year
            description: "데이터 파일의 연도"
            tests:차
              - not_null

          - name: data_file_month
            description: "데이터 파일의 월"
            tests:
              - not_null

        # 테이블 수준의 테스트
        tests:
          - dbt_utils.expression_is_true:
              expression: >
                fare_amount + COALESCE(extra, 0) + COALESCE(mta_tax, 0) + 
                COALESCE(tip_amount, 0) + COALESCE(tolls_amount, 0) + 
                COALESCE(imp_surcharge, 0) <= total_amount
              severity: warn



모델링

  • locationId 별로 집계하는 간단한 모델을 만들 예정

차원(dimension) 모델링

  • pickup_location_id의 특정 차원을 저장
  • models/dimensions/dim_locations.sql

{{ config( materialized='table', cluster_by=['location_id'] ) }}

SELECT DISTINCT
    pickup_location_id AS location_id,
    -- 위치의 고유한 특성을 정의
    CASE 
        WHEN pickup_location_id IN ("1", "2") THEN 'airport'  -- 공항 위치 ID
        WHEN pickup_location_id BETWEEN "100" AND "200" THEN 'manhattan'
        ELSE 'outer_borough'
    END AS borough_type,
    case
        WHEN pickup_location_id IN ("1", "2") THEN 'restricted'
        ELSE 'general'
    END AS service_area_type
from {{ source('ny_taxi', 'tlc_green_trips_2022') }}


팩트(fact) 모델링

  • 팩트 테이블은 비즈니스에서 발생하는 이벤트나 트랜잭션을 기록하는 테이블
    • 한 번의 택시 운행과 관련된 정보를 저장함

-- models/facts/fact_trips.sql
{{ config(
    materialized='incremental',
    partition_by={
        "field": "pickup_datetime",
        "data_type": "timestamp",
        "granularity": "day"
    },
    cluster_by=['pickup_location_id', 'dropoff_location_id']
) }}

select
    -- 운행 식별 정보
    
        concat(
        vendor_id, '_',
        cast(pickup_datetime as string), '_',
        cast(pickup_location_id as string)
    ) as trip_id,
    vendor_id,
    
    -- 시간 관련 측정값
    pickup_datetime,
    dropoff_datetime,
    TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) as duration_minutes,    
    -- 위치 관련 측정값
    pickup_location_id,
    dropoff_location_id,
    trip_distance,
    
    -- 승객 정보
    passenger_count,
    trip_type,
    rate_code,
    store_and_fwd_flag,
    
    -- 요금 관련 측정값
    fare_amount,
    extra,
    mta_tax,
    tip_amount,
    tolls_amount,
    ehail_fee,
    airport_fee,
    imp_surcharge,
    total_amount,
    payment_type,
    
    -- 성과 지표 계산
    case 
        when fare_amount > 0 then tip_amount / fare_amount 
        else 0 
    end as tip_ratio,
    
    case 
        when TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) > 0 
        then total_amount / TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE)
        else 0 
    end as revenue_per_minute,
    
    -- 파일 정보
    data_file_year,
    data_file_month

from {{ source('ny_taxi', 'tlc_green_trips_2022') }}

{% if is_incremental() %}
where pickup_datetime > (select max(pickup_datetime) from {{ this }})
{% endif %}

마트(mart) 모델링

  • 마트 테이블 : 특정 기준으로 집계한 데이터
    • 예 : 일자별, 지역별 운행 통계

-- models/marts/mart_location_patterns.sql
{{
    config(
        materialized='incremental',
        partition_by={
            "field": "data_file_month",
            "data_type": "int64",
            "granularity": "month",
            "range": {
              "start": 1,
              "end": 12,
              "interval": 1
            }
        },
        cluster_by="location_id,borough_type"
    )
}}

select
    f.data_file_year,
    f.data_file_month,
    date_trunc(f.pickup_datetime, day) as pickup_date, 
    l.location_id,
    l.borough_type,
    l.service_area_type,
    count(distinct f.trip_id) as total_trips,  -- 고유 trip_id 개수
    avg(f.fare_amount) as avg_fare,
    avg(case when f.rate_code = '2.0' then 1 else 0 end) as airport_trip_ratio
from {{ ref('fact_trips') }} f
join {{ ref('dim_locations') }} l
    on f.pickup_location_id = l.location_id
{% if is_incremental() %}
  where f.data_file_month >= (select max(data_file_month) from {{ this }})
{% endif %}
group by
    f.data_file_year,
    f.data_file_month,
    pickup_date,
    l.location_id,
    l.borough_type,
    l.service_area_type

모델 실행

  • mart_location_patterns과 앞선 의존성을 모두 실행
    • 실행하면 빅쿼리 테이블이 생성됨
dbt run --select +mart_location_patterns

모델 컴파일 확인

  • 만약 모델 실행 전에, 컴파일을 통해 확인하고 싶다면 아래 명령어로 가능함
    • 컴파일 : 실제 실행 가능한 SQL로 변환하는 명령어
    • 변환된 SQL만 생성함
    • 생성된 SQL은 target/compiled/{project_name}/models/ 에 저장됨
dbt compile --models mart_location_patterns

dbt run을 실행할 때, 오류가 발생한다면

  • 컴파일 후 나오는 쿼리문을 BigQuery Console에서 실행 => 오류가 생기면 쿼리가 실행되지 않을 것
  • 다 수정했는데도 안된다면 dbt run을 하면서 생성하는 옵션에서 이슈가 있을 수 있음
    • logs/dbt.log에서 보면 실행될 시점의 쿼리문을 볼 수 있음
    • 이 쿼리를 보면서 왜 이런 코드가 실행되는지 확인하면 됨



dbt docs 생성

  • dbt 문서를 생성하는 명령어는 크게 2단계로 진행
    • generate
    • serve
dbt docs generate
dbt docs serve
  • localhost:8080에서 확인할 수 있음

  • 그리고 우측 아래의 View Lineage Graph를 클릭하면 리니지 형태를 볼 수 있음



모델 Test

  • dbt test는 sources.yml과 schema.yml에 정의된 테스트를 실행함
    • sources.yml : source의 테이블과 컬럼에 대한 테스트
    • schema.yml : 모델의 테이블과 컬럼에 대한 테스트
    • 여기선 sources.yml에 정의된 테스트를 실행함
dbt test
  • 명령어를 실행하면, 통과되지 못한 Test가 나옴. 이 Test를 수정하면 됨



정리

  • dbt + bigquery를 연결해 마트 모델링을 진행
    • dbt run model
    • dbt compile : SQL로 컴파일
  • 의존성을 설치하고 싶을 땐
    • dbt deps
  • dbt docs는 generate와 serve를 사용
    • dbt docs generate
    • dbt docs serve
  • 테스트는 test
    • dbt test
  • 모델링을 어떻게 하는지는 별도로 글을 작성할 예정. 이 글에선 dbt + bigquery를 사용하는 방법을 주로 다룸




  • 글 작성하는데 걸린 시간 : 3시간 47분
    • 하고자 하는 이야기, 개요 정리 : 32분
    • 초안 글 작성 : 3시간 15분

카일스쿨 유튜브 채널을 만들었습니다. 데이터 분석, 커리어에 대한 내용을 공유드릴 예정입니다.

PM을 위한 데이터 리터러시 강의를 만들었습니다. 문제 정의, 지표, 실험 설계, 문화 만들기, 로그 설계, 회고 등을 담은 강의입니다

이 글이 도움이 되셨거나 의견이 있으시면 댓글 남겨주셔요.

Buy me a coffeeBuy me a coffee





© 2017. by Seongyun Byeon

Powered by zzsza