메뉴
BL
MarkTechPost 48일 전

DuckDB-파이썬 분석 파이프라인 실전 구축 가이드

IMP
7/10
핵심 요약

본 튜토리얼은 DuckDB와 파이썬을 활용하여 고성능 데이터 분석 파이프라인을 구축하는 방법을 다룹니다. SQL, 데이터프레임, Parquet 포맷 통합부터 대용량 데이터 삽입 및 병렬 처리 등 실무적인 성능 최적화 기법까지 코드와 함께 상세히 설명합니다. 데이터 엔지니어와 과학자들에게 빠르고 효율적인 인메모리 분석 엔진인 DuckDB의 활용법을 심도 있게 제공합니다.

번역된 본문

기술, 인공지능, 빅데이터, 데이터 과학, 기술 뉴스, 데이터베이스, 에디터 추천, 머신러닝, 스태프 튜토리얼

이 튜토리얼에서는 구글 Colab 환경에서 코드를 직접 실행해보며 DuckDB와 파이썬의 통합 기능에 대한 종합적이고 실전적인 이해를 구축합니다. 연결 관리와 데이터 생성의 기본 사항으로 시작하여, 수동 로딩 없이 Pandas, Polars, Arrow 객체를 쿼리하고 결과를 다양한 형식으로 변환하며, 윈도우 함수(Window Functions), 피벗(Pivot), 매크로(Macros), 재귀적 CTE(Recursive CTEs), 조인(Joins)을 위한 표현력이 풍부한 SQL을 작성하는 등 실제 분석 워크플로우를 다룹니다.

나아가 대량 삽입(Bulk Insertion), 프로파일링(Profiling), 파티션 스토리지(Partitioned Storage), 멀티스레드 접근(Multi-threaded Access), 원격 파일 쿼리(Remote File Querying), 효율적인 내보내기 패턴(Export Patterns)과 같은 성능 지향적 기능도 살펴봅니다. 이를 통해 DuckDB가 무엇을 할 수 있는지뿐만 아니라 파이썬 내에서 본격적인 분석 엔진으로 어떻게 사용하는지까지 배우게 됩니다.

코드 복사 완료 다른 브라우저를 사용해 보세요.

import subprocess, sys for pkg in ["duckdb", "pandas", "pyarrow", "polars"]: try: subprocess.check_call( [sys.executable, "-m", "pip", "install", "-q", pkg], stderr=subprocess.DEVNULL, ) except subprocess.CalledProcessError: subprocess.check_call( [sys.executable, "-m", "pip", "install", "-q", "--break-system-packages", pkg], stderr=subprocess.DEVNULL, )

import duckdb import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import polars as pl import numpy as np import json, os, time, threading, tempfile from datetime import date, datetime, timedelta

print(f"DuckDB 버전 : {duckdb.version}") print(f"Pandas 버전 : {pd.version}") print(f"PyArrow 버전: {pa.version}") print(f"Polars 버전 : {pl.version}") print("=" * 72)

WORKDIR = tempfile.mkdtemp(prefix="duckdb_tutorial_") os.chdir(WORKDIR) print(f"작업 디렉토리: {WORKDIR}\n")

print("=" * 72) print("섹션 1: 연결 관리 (Connection Management)") print("=" * 72)

con = duckdb.connect() print(con.sql("SELECT '인메모리 DuckDB에서 안녕하세요!' AS greeting").fetchone()[0])

DB_PATH = os.path.join(WORKDIR, "tutorial.duckdb") pcon = duckdb.connect(DB_PATH) pcon.sql("CREATE OR REPLACE TABLE persisted(id INT, val TEXT)") pcon.sql("INSERT INTO persisted VALUES (1,'alpha'), (2,'beta')") print("저장된 행 수:", pcon.sql("SELECT count(*) FROM persisted").fetchone()[0]) pcon.close()

pcon2 = duckdb.connect(DB_PATH) print("다시 연결 후 :", pcon2.sql("SELECT * FROM persisted ORDER BY id").fetchall()) pcon2.close()

con_cfg = duckdb.connect(config={ "threads": 2, "memory_limit": "512MB", "default_order": "DESC", }) print("설정된 스레드 수:", con_cfg.sql("SELECT current_setting('threads')").fetchone()[0]) con_cfg.close()

with duckdb.connect() as ctx: ctx.sql("SELECT 42 AS answer").show()

print() print("=" * 72) print("섹션 2: 합성 데이터 생성 (Synthetic Data Generation)") print("=" * 72)

con = duckdb.connect() con.sql(""" CREATE OR REPLACE TABLE sales AS SELECT i AS order_id, '2023-01-01'::DATE + (i % 365)::INT AS order_date, CASE (i % 5) WHEN 0 THEN 'Electronics' WHEN 1 THEN 'Clothing' WHEN 2 THEN 'Groceries' WHEN 3 THEN 'Furniture' ELSE 'Books' END AS category, ROUND(10 + random() * 990, 2) AS amount, CASE (i % 3) WHEN 0 THEN 'US' WHEN 1 THEN 'EU' ELSE 'APAC' END AS region, CASE WHEN random() < 0.1 THEN TRUE ELSE FALSE END AS returned FROM generate_series(1, 100000) t(i) """) con.sql("SUMMARIZE sales").show()

print() print("=" * 72) print("섹션 3: 제로카피 데이터프레임 통합 (Zero-Copy DataFrame Integration)") print("=" * 72)

pdf = pd.DataFrame({ "product": ["Widget", "Gadget", "Doohickey", "Thingamajig"], "price": [9.99, 24.50, 4.75, 15.00], "stock": [120, 45, 300, 78], }) print("Pandas 데이터프레임 직접 쿼리:") con.sql("SELECT product, price * stock AS inventory_value FROM pdf ORDER BY inventory_value DESC").show()

plf = pl.DataFrame({ "city": ["Montreal", "Toronto", "Vancouver", "Calgary"], "temp_c": [-12.5, -5.0, 3.0, -18.0], }) print("Polars 데이터프레임 직접 쿼리:") con.sql("SELECT city, temp_c, temp_c * 9/5 + 32 AS temp_f FROM plf WHERE temp_c < 0").show()

arrow_tbl = pa.table({ "se

원문 보기
원문 보기 (영어)
Technology Artificial Intelligence Big Data Data Science Tech News Databases Editors Pick Machine Learning Staff Tutorials In this tutorial, we build a comprehensive, hands-on understanding of DuckDB-Python by working through its features directly in code on Colab. We start with the fundamentals of connection management and data generation, then move into real analytical workflows, including querying Pandas, Polars, and Arrow objects without manual loading, transforming results across multiple formats, and writing expressive SQL for window functions, pivots, macros, recursive CTEs, and joins. As we progress, we also explore performance-oriented capabilities such as bulk insertion, profiling, partitioned storage, multi-threaded access, remote file querying, and efficient export patterns, so we not only learn what DuckDB can do, but also how to use it as a serious analytical engine within Python. Copy Code Copied Use a different Browser import subprocess, sys for pkg in ["duckdb", "pandas", "pyarrow", "polars"]: try: subprocess.check_call( [sys.executable, "-m", "pip", "install", "-q", pkg], stderr=subprocess.DEVNULL, ) except subprocess.CalledProcessError: subprocess.check_call( [sys.executable, "-m", "pip", "install", "-q", "--break-system-packages", pkg], stderr=subprocess.DEVNULL, ) import duckdb import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import polars as pl import numpy as np import json, os, time, threading, tempfile from datetime import date, datetime, timedelta print(f"DuckDB version : {duckdb.__version__}") print(f"Pandas version : {pd.__version__}") print(f"PyArrow version: {pa.__version__}") print(f"Polars version : {pl.__version__}") print("=" * 72) WORKDIR = tempfile.mkdtemp(prefix="duckdb_tutorial_") os.chdir(WORKDIR) print(f"Working directory: {WORKDIR}\n") print("=" * 72) print("SECTION 1: Connection Management") print("=" * 72) con = duckdb.connect() print(con.sql("SELECT 'Hello from in-memory DuckDB!' AS greeting").fetchone()[0]) DB_PATH = os.path.join(WORKDIR, "tutorial.duckdb") pcon = duckdb.connect(DB_PATH) pcon.sql("CREATE OR REPLACE TABLE persisted(id INT, val TEXT)") pcon.sql("INSERT INTO persisted VALUES (1,'alpha'), (2,'beta')") print("Persisted rows:", pcon.sql("SELECT count(*) FROM persisted").fetchone()[0]) pcon.close() pcon2 = duckdb.connect(DB_PATH) print("After re-open :", pcon2.sql("SELECT * FROM persisted ORDER BY id").fetchall()) pcon2.close() con_cfg = duckdb.connect(config={ "threads": 2, "memory_limit": "512MB", "default_order": "DESC", }) print("Configured threads:", con_cfg.sql("SELECT current_setting('threads')").fetchone()[0]) con_cfg.close() with duckdb.connect() as ctx: ctx.sql("SELECT 42 AS answer").show() print() print("=" * 72) print("SECTION 2: Synthetic Data Generation") print("=" * 72) con = duckdb.connect() con.sql(""" CREATE OR REPLACE TABLE sales AS SELECT i AS order_id, '2023-01-01'::DATE + (i % 365)::INT AS order_date, CASE (i % 5) WHEN 0 THEN 'Electronics' WHEN 1 THEN 'Clothing' WHEN 2 THEN 'Groceries' WHEN 3 THEN 'Furniture' ELSE 'Books' END AS category, ROUND(10 + random() * 990, 2) AS amount, CASE (i % 3) WHEN 0 THEN 'US' WHEN 1 THEN 'EU' ELSE 'APAC' END AS region, CASE WHEN random() < 0.1 THEN TRUE ELSE FALSE END AS returned FROM generate_series(1, 100000) t(i) """) con.sql("SUMMARIZE sales").show() print() print("=" * 72) print("SECTION 3: Zero-Copy DataFrame Integration") print("=" * 72) pdf = pd.DataFrame({ "product": ["Widget", "Gadget", "Doohickey", "Thingamajig"], "price": [9.99, 24.50, 4.75, 15.00], "stock": [120, 45, 300, 78], }) print("Query Pandas DF directly:") con.sql("SELECT product, price * stock AS inventory_value FROM pdf ORDER BY inventory_value DESC").show() plf = pl.DataFrame({ "city": ["Montreal", "Toronto", "Vancouver", "Calgary"], "temp_c": [-12.5, -5.0, 3.0, -18.0], }) print("Query Polars DF directly:") con.sql("SELECT city, temp_c, temp_c * 9/5 + 32 AS temp_f FROM plf WHERE temp_c < 0").show() arrow_tbl = pa.table({ "sensor_id": [1, 2, 3, 4, 5], "reading": [23.1, 47.8, 12.3, 99.0, 55.5], }) print("Query Arrow table directly:") con.sql("SELECT sensor_id, reading FROM arrow_tbl WHERE reading > 30").show() print() print("=" * 72) print("SECTION 4: Result Conversion") print("=" * 72) q = "SELECT category, SUM(amount) AS total FROM sales GROUP BY category ORDER BY total DESC" print("→ Python list :", con.sql(q).fetchall()[:2], "...") print("→ Pandas DF :\n", con.sql(q).df().head(3)) print("→ Polars DF :\n", con.sql(q).pl().head(3)) print("→ Arrow Table :", con.sql(q).arrow().schema) print("→ NumPy arrays :", {k: v[:2] for k, v in con.sql(q).fetchnumpy().items()}) print() We set up the full DuckDB-Python environment by installing the required libraries and importing all the necessary modules for the tutorial. We create our working directory, initialize DuckDB connections, and explore both in-memory and persistent database usage along with basic configuration options. We also generate a large synthetic sales dataset and begin working with DuckDB’s direct integration with Pandas, Polars, and PyArrow, which shows us how naturally DuckDB fits into Python-based data workflows. Copy Code Copied Use a different Browser print("=" * 72) print("SECTION 5: Relational API") print("=" * 72) rel = ( con.table("sales") .filter("NOT returned") .aggregate("category, region, SUM(amount) AS revenue, COUNT(*) AS orders") .filter("revenue > 1000000") .order("revenue DESC") .limit(10) ) print("Relational API result:") rel.show() top_cats = con.sql("SELECT DISTINCT category FROM sales ORDER BY category LIMIT 3") print("Top categories relation fed into next query:") con.sql("SELECT s.* FROM sales s SEMI JOIN top_cats ON s.category = top_cats.category LIMIT 5").show() print() print("=" * 72) print("SECTION 6: Window Functions & Advanced SQL") print("=" * 72) con.sql(""" WITH daily AS ( SELECT order_date, region, SUM(amount) AS daily_rev FROM sales WHERE NOT returned GROUP BY order_date, region ) SELECT order_date, region, daily_rev, SUM(daily_rev) OVER ( PARTITION BY region ORDER BY order_date ) AS cum_revenue, AVG(daily_rev) OVER ( PARTITION BY region ORDER BY order_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW ) AS rolling_7d_avg FROM daily QUALIFY row_number() OVER (PARTITION BY region ORDER BY order_date DESC) <= 3 ORDER BY region, order_date DESC """).show() print("PIVOT table:") con.sql(""" PIVOT sales ON region USING SUM(amount) GROUP BY category ORDER BY category """).show() print() print("=" * 72) print("SECTION 7: Complex / Nested Types") print("=" * 72) con.sql(""" CREATE OR REPLACE TABLE users AS SELECT i AS user_id, {'first': 'User_' || i::TEXT, 'last': 'Surname_' || (i % 100)::TEXT} AS name, [i * 10, i * 20, i * 30] AS scores, MAP {'tier': CASE WHEN i % 2 = 0 THEN 'gold' ELSE 'silver' END, 'region': CASE WHEN i % 3 = 0 THEN 'US' ELSE 'EU' END} AS metadata FROM generate_series(1, 5) t(i) """) print("Struct field access, list indexing, map extraction:") con.sql(""" SELECT user_id, name.first AS first_name, scores[1] AS first_score, list_aggregate(scores,'sum') AS total_score, metadata['tier'] AS tier FROM users """).show() print("Unnesting a list column:") con.sql(""" SELECT user_id, unnest(scores) AS individual_score FROM users WHERE user_id <= 3 """).show() print() print("=" * 72) print("SECTION 8: Python UDFs") print("=" * 72) def celsius_to_fahrenheit(c): return c * 9 / 5 + 32 con.create_function("c2f", celsius_to_fahrenheit, ["DOUBLE"], "DOUBLE") con.sql("SELECT city, temp_c, c2f(temp_c) AS temp_f FROM plf").show() import pyarrow.compute as pc def vectorized_discount(prices): """Apply a 15% discount to all prices.""" return pc.multiply(prices, 0.85) con.create_function( "discount", vectorized_discount, ["DOUBLE"], "DOUBLE", type="arrow", ) print("Vectorized UDF (discount):") con.sql("SELECT product, price, discount(price) AS sale_price FROM pdf").show() print() print("=" * 72) print("SECTION 9: File I/O") print("=" * 72) con.sql("COPY (SELECT * FROM sales