DuckDB-파이썬 분석 파이프라인 실전 구축 가이드
본 튜토리얼은 DuckDB와 파이썬을 활용하여 고성능 데이터 분석 파이프라인을 구축하는 방법을 다룹니다. SQL, 데이터프레임, Parquet 포맷 통합부터 대용량 데이터 삽입 및 병렬 처리 등 실무적인 성능 최적화 기법까지 코드와 함께 상세히 설명합니다. 데이터 엔지니어와 과학자들에게 빠르고 효율적인 인메모리 분석 엔진인 DuckDB의 활용법을 심도 있게 제공합니다.
기술, 인공지능, 빅데이터, 데이터 과학, 기술 뉴스, 데이터베이스, 에디터 추천, 머신러닝, 스태프 튜토리얼
이 튜토리얼에서는 구글 Colab 환경에서 코드를 직접 실행해보며 DuckDB와 파이썬의 통합 기능에 대한 종합적이고 실전적인 이해를 구축합니다. 연결 관리와 데이터 생성의 기본 사항으로 시작하여, 수동 로딩 없이 Pandas, Polars, Arrow 객체를 쿼리하고 결과를 다양한 형식으로 변환하며, 윈도우 함수(Window Functions), 피벗(Pivot), 매크로(Macros), 재귀적 CTE(Recursive CTEs), 조인(Joins)을 위한 표현력이 풍부한 SQL을 작성하는 등 실제 분석 워크플로우를 다룹니다.
나아가 대량 삽입(Bulk Insertion), 프로파일링(Profiling), 파티션 스토리지(Partitioned Storage), 멀티스레드 접근(Multi-threaded Access), 원격 파일 쿼리(Remote File Querying), 효율적인 내보내기 패턴(Export Patterns)과 같은 성능 지향적 기능도 살펴봅니다. 이를 통해 DuckDB가 무엇을 할 수 있는지뿐만 아니라 파이썬 내에서 본격적인 분석 엔진으로 어떻게 사용하는지까지 배우게 됩니다.
코드 복사 완료 다른 브라우저를 사용해 보세요.
import subprocess, sys for pkg in ["duckdb", "pandas", "pyarrow", "polars"]: try: subprocess.check_call( [sys.executable, "-m", "pip", "install", "-q", pkg], stderr=subprocess.DEVNULL, ) except subprocess.CalledProcessError: subprocess.check_call( [sys.executable, "-m", "pip", "install", "-q", "--break-system-packages", pkg], stderr=subprocess.DEVNULL, )
import duckdb import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import polars as pl import numpy as np import json, os, time, threading, tempfile from datetime import date, datetime, timedelta
print(f"DuckDB 버전 : {duckdb.version}") print(f"Pandas 버전 : {pd.version}") print(f"PyArrow 버전: {pa.version}") print(f"Polars 버전 : {pl.version}") print("=" * 72)
WORKDIR = tempfile.mkdtemp(prefix="duckdb_tutorial_") os.chdir(WORKDIR) print(f"작업 디렉토리: {WORKDIR}\n")
print("=" * 72) print("섹션 1: 연결 관리 (Connection Management)") print("=" * 72)
con = duckdb.connect() print(con.sql("SELECT '인메모리 DuckDB에서 안녕하세요!' AS greeting").fetchone()[0])
DB_PATH = os.path.join(WORKDIR, "tutorial.duckdb") pcon = duckdb.connect(DB_PATH) pcon.sql("CREATE OR REPLACE TABLE persisted(id INT, val TEXT)") pcon.sql("INSERT INTO persisted VALUES (1,'alpha'), (2,'beta')") print("저장된 행 수:", pcon.sql("SELECT count(*) FROM persisted").fetchone()[0]) pcon.close()
pcon2 = duckdb.connect(DB_PATH) print("다시 연결 후 :", pcon2.sql("SELECT * FROM persisted ORDER BY id").fetchall()) pcon2.close()
con_cfg = duckdb.connect(config={ "threads": 2, "memory_limit": "512MB", "default_order": "DESC", }) print("설정된 스레드 수:", con_cfg.sql("SELECT current_setting('threads')").fetchone()[0]) con_cfg.close()
with duckdb.connect() as ctx: ctx.sql("SELECT 42 AS answer").show()
print() print("=" * 72) print("섹션 2: 합성 데이터 생성 (Synthetic Data Generation)") print("=" * 72)
con = duckdb.connect() con.sql(""" CREATE OR REPLACE TABLE sales AS SELECT i AS order_id, '2023-01-01'::DATE + (i % 365)::INT AS order_date, CASE (i % 5) WHEN 0 THEN 'Electronics' WHEN 1 THEN 'Clothing' WHEN 2 THEN 'Groceries' WHEN 3 THEN 'Furniture' ELSE 'Books' END AS category, ROUND(10 + random() * 990, 2) AS amount, CASE (i % 3) WHEN 0 THEN 'US' WHEN 1 THEN 'EU' ELSE 'APAC' END AS region, CASE WHEN random() < 0.1 THEN TRUE ELSE FALSE END AS returned FROM generate_series(1, 100000) t(i) """) con.sql("SUMMARIZE sales").show()
print() print("=" * 72) print("섹션 3: 제로카피 데이터프레임 통합 (Zero-Copy DataFrame Integration)") print("=" * 72)
pdf = pd.DataFrame({ "product": ["Widget", "Gadget", "Doohickey", "Thingamajig"], "price": [9.99, 24.50, 4.75, 15.00], "stock": [120, 45, 300, 78], }) print("Pandas 데이터프레임 직접 쿼리:") con.sql("SELECT product, price * stock AS inventory_value FROM pdf ORDER BY inventory_value DESC").show()
plf = pl.DataFrame({ "city": ["Montreal", "Toronto", "Vancouver", "Calgary"], "temp_c": [-12.5, -5.0, 3.0, -18.0], }) print("Polars 데이터프레임 직접 쿼리:") con.sql("SELECT city, temp_c, temp_c * 9/5 + 32 AS temp_f FROM plf WHERE temp_c < 0").show()
arrow_tbl = pa.table({ "se