Pandas + SQL + JSON — AI 엔지니어의 일상 "주식"
AI 엔지니어는 80%의 시간을 모델 훈련이 아닌 데이터 처리에 씁니다. 훈련 데이터 정리, API 반환값 파싱, 사용자 로그 정리, RAG 문서 파이프라인 구축 — 이 모든 것이 데이터 처리입니다.
Pandas는 당신의 "데이터 만능 도구"입니다: CSV, Excel, JSON, SQL 데이터베이스를 읽을 수 있고, 필터링, 그룹화, 집계, 피벗, 병합이 가능하며, 한 줄의 코드가 수십 줄의 for 루프를 대체합니다. Pandas를 익히면 생산성이 비약적으로 향상됩니다.
pip install pandas numpy openpyxl # openpyxl은 Excel 읽기/쓰기에 사용
import pandas as pd import numpy as np print(pd.__version__) # 설치 확인
이것은 전 세계 Python 커뮤니티의 관례입니다. pd.XXX를 보면 Pandas라는 것을 알고, np.XXX를 보면 NumPy라는 것을 알 수 있습니다. 모든 튜토리얼과 프로젝트에서 이렇게 씁니다.
Series는 Excel의 "한 열"과 같습니다. 데이터와 인덱스가 있습니다. Series를 이해하는 것이 DataFrame을 이해하는 기초입니다.
import pandas as pd # ---- 리스트에서 Series 생성 ---- scores = pd.Series([85, 92, 78, 96, 88]) print(scores) # 0 85 # 1 92 # 2 78 # 3 96 # 4 88 # ---- 커스텀 인덱스 ---- scores = pd.Series( [85, 92, 78, 96, 88], index=["철수", "영희", "민수", "지훈", "수진"] ) print(scores["영희"]) # 92 # ---- 자주 쓰는 통계 ---- print(scores.mean()) # 평균값: 87.8 print(scores.max()) # 최대값: 96 print(scores.min()) # 최소값: 78 print(scores.std()) # 표준편차 print(scores.describe()) # 한 번에 전체 통계 정보 보기 # ---- 벡터화 연산 (for 루프 불필요!) ---- print(scores + 5) # 모든 사람에게 5점 추가 print(scores > 90) # 불리언 Series 반환 print(scores[scores > 90]) # 90점 초과 필터링
DataFrame은 행과 열이 있는 테이블로, Excel의 시트와 같습니다. Pandas에서 95%의 시간은 DataFrame을 다루는 데 씁니다.
import pandas as pd # ---- 딕셔너리에서 생성 (가장 일반적인 방법) ---- data = { "name": ["철수", "영희", "민수", "지훈", "수진"], "age": [22, 21, 23, 22, 24], "score": [85, 92, 78, 96, 88], "city": ["서울", "부산", "서울", "대전", "부산"] } df = pd.DataFrame(data) print(df) # name age score city # 0 철수 22 85 서울 # 1 영희 21 92 부산 # 2 민수 23 78 서울 # 3 지훈 22 96 대전 # 4 수진 24 88 부산 # ---- 기본 속성 ---- print(df.shape) # (5, 4) → 5행 4열 print(df.columns) # Index(['name', 'age', 'score', 'city']) print(df.dtypes) # 각 열의 데이터 타입 print(df.info()) # 전체 정보 요약 # ---- 열 추출 (Series 반환) ---- print(df["name"]) # 한 열 추출 print(df[["name", "score"]]) # 여러 열 추출 (이중 대괄호) # ---- 행 추출 ---- print(df.head(3)) # 앞 3행 print(df.tail(2)) # 뒤 2행 # ---- 빠른 통계 ---- print(df.describe()) # 수치 열의 통계 요약 print(df["score"].mean()) # 평균 점수 print(df["city"].value_counts()) # 각 도시 출현 횟수
pd.DataFrame(json_list)로 바로 테이블로 변환 가능.describe()로 수치 열 요약 보기, .value_counts()로 도시 분포 통계import pandas as pd # ---- CSV 읽기 (가장 일반적) ---- df = pd.read_csv("data.csv") df = pd.read_csv("data.csv", encoding="utf-8") # 인코딩 지정 df = pd.read_csv("data.csv", usecols=["name", "score"]) # 특정 열만 읽기 df = pd.read_csv("data.csv", nrows=100) # 앞 100행만 읽기 # ---- Excel 읽기 ---- df = pd.read_excel("data.xlsx") df = pd.read_excel("data.xlsx", sheet_name="Sheet2") # 워크시트 지정 # ---- JSON 읽기 ---- df = pd.read_json("data.json") # ---- 딕셔너리 리스트에서 생성 (API 반환값에서 자주 사용) ---- api_data = [ {"id": 1, "text": "안녕하세요", "score": 0.95}, {"id": 2, "text": "세계", "score": 0.87}, ] df = pd.DataFrame(api_data) # ---- 데이터 저장 ---- df.to_csv("output.csv", index=False) # 행 번호 저장 안 함 df.to_excel("output.xlsx", index=False) df.to_json("output.json", orient="records", force_ascii=False)
to_csv()는 기본적으로 행 인덱스를 저장하여 숫자 열이 하나 추가됩니다. 반드시 index=False를 추가하세요. 한국어 파일은 encoding="utf-8-sig"에 주의하세요 (Excel에서 깨지지 않게).
어떤 데이터든 받으면 첫 번째로 할 일은 "탐색"입니다 — 어떤 열이 있는지, 몇 행인지, 결측값이 있는지, 데이터 분포가 어떤지 확인합니다.
import pandas as pd df = pd.read_csv("data.csv") # ========== 첫 번째: 구조 보기 ========== print(df.shape) # 몇 행 몇 열? print(df.columns.tolist()) # 열 이름 리스트 print(df.dtypes) # 각 열 데이터 타입 print(df.head()) # 앞 5행이 어떻게 생겼는지 보기 # ========== 두 번째: 결측값 보기 ========== print(df.isnull().sum()) # 각 열의 결측값 개수 print(df.isnull().sum() / len(df) * 100) # 결측률 (%) # ========== 세 번째: 분포 보기 ========== print(df.describe()) # 수치 열 통계 print(df["city"].value_counts()) # 범주형 열 빈도 print(df["score"].hist()) # 분포 히스토그램 (선택)
import pandas as pd df = pd.DataFrame({ "name": ["철수", "영희", "민수", "지훈", "수진"], "age": [22, 21, 23, 22, 24], "score": [85, 92, 78, 96, 88], "city": ["서울", "부산", "서울", "대전", "부산"] }) # ---- 단일 조건 필터링 ---- high_score = df[df["score"] > 90] # 성적 > 90 seoul = df[df["city"] == "서울"] # 서울 학생 # ---- 다중 조건 필터링 (& 그리고 | 또는) ---- result = df[(df["score"] > 80) & (df["city"] == "부산")] result = df[(df["city"] == "서울") | (df["city"] == "부산")] # ---- isin() 다중 값 매칭 ---- big_city = df[df["city"].isin(["서울", "부산"])] # ---- 문자열 필터링 ---- df[df["name"].str.contains("수")] # 이름에 "수" 포함 df[df["name"].str.startswith("철")] # 이름이 "철"로 시작 # ---- between() 범위 필터링 ---- df[df["score"].between(80, 95)] # 80 ≤ score ≤ 95
잘못된 작성법: df[df["score"] > 80 & df["city"] == "부산"]
올바른 작성법: df[(df["score"] > 80) & (df["city"] == "부산")]
&의 우선순위가 >보다 높기 때문에 괄호가 없으면 에러가 발생합니다.
# ---- 단일 열 정렬 ---- df.sort_values("score", ascending=False) # 성적 높은 순 # ---- 다중 열 정렬 ---- df.sort_values(["city", "score"], ascending=[True, False]) # 도시 오름차순, 같은 도시 내에서 성적 내림차순 # ---- Top N 추출 ---- df.nlargest(3, "score") # 성적 상위 3명 df.nsmallest(3, "age") # 나이 가장 어린 3명
# ---- loc: 라벨(이름)로 선택 ---- df.loc[0, "name"] # 0번째 행의 name → "철수" df.loc[0:2, ["name", "score"]] # 0~2행의 name과 score df.loc[df["score"] > 90, "name"] # 성적>90인 사람의 이름 # ---- iloc: 위치(숫자)로 선택 ---- df.iloc[0, 0] # 0행 0열 df.iloc[0:3, 0:2] # 앞 3행, 앞 2열 df.iloc[-1] # 마지막 행 # ---- 값 수정 ---- df.loc[0, "score"] = 90 # 철수의 성적 수정 df.loc[df["city"] == "서울", "score"] += 5 # 서울 학생 5점 추가
loc → 이름으로 찾기 (label-based), 조건 필터링 지원iloc → 위치 번호로 찾기 (integer-based), 순수 숫자 인덱스import pandas as pd import numpy as np df = pd.DataFrame({ "name": ["철수", "영희", "민수", np.nan, "수진"], "score": [85, np.nan, 78, 96, np.nan], "city": ["서울", "부산", np.nan, "대전", "부산"] }) # ---- 결측값 확인 ---- print(df.isnull()) # 각 셀이 비어 있는지 print(df.isnull().sum()) # 각 열의 결측 수량 # ---- 결측값 삭제 ---- df.dropna() # 결측값이 있는 행 삭제 df.dropna(subset=["name"]) # name이 비어 있는 행만 삭제 df.dropna(thresh=2) # 비어 있지 않은 값이 최소 2개인 행만 유지 # ---- 결측값 채우기 ---- df["score"].fillna(0) # 0으로 채우기 df["score"].fillna(df["score"].mean()) # 평균값으로 채우기 (자주 사용) df["city"].fillna("미정") # 커스텀 텍스트로 채우기 df.fillna(method="ffill") # 이전 행의 값으로 채우기
# ---- 중복 감지 ---- print(df.duplicated().sum()) # 중복 행이 몇 개인지 print(df[df.duplicated()]) # 중복 행 보기 print(df.duplicated(subset=["name"])) # 특정 열 기준으로 중복 판단 # ---- 중복 삭제 ---- df.drop_duplicates() # 완전히 동일한 행 df.drop_duplicates(subset=["name"], keep="first") # name 기준 중복 제거, 첫 번째 유지
# ---- 타입 변환 ---- df["score"] = df["score"].astype(float) # 부동소수점으로 변환 df["age"] = pd.to_numeric(df["age"], errors="coerce") # 안전하게 숫자로 변환, 불가능한 값은 NaN df["date"] = pd.to_datetime(df["date"]) # 날짜로 변환 # ---- 텍스트 정제 (AI 개발에서 매우 자주 사용) ---- df["name"] = df["name"].str.strip() # 앞뒤 공백 제거 df["name"] = df["name"].str.lower() # 소문자로 변환 df["text"] = df["text"].str.replace("\n", " ") # 줄바꿈을 공백으로 df["text"] = df["text"].str.replace(r"\s+", " ", regex=True) # 다중 공백을 하나로 # ---- 계산 열 추가 ---- df["pass"] = df["score"] >= 60 # 불리언 열: 합격 여부 df["level"] = df["score"].apply( lambda x: "우수" if x >= 90 else ("합격" if x >= 60 else "불합격") ) # ---- rename 열 이름 변경 ---- df = df.rename(columns={"name": "이름", "score": "성적"})
apply()는 각 행/각 값에 대해 커스텀 함수를 실행lambda와 함께 사용하면 복잡한 변환 로직도 한 줄로 해결df["prompt"] = df["text"].apply(lambda x: f"번역해 주세요: {x}")groupby의 사고 방식: 분할 → 적용 → 결합. 먼저 특정 열로 그룹화하고, 각 그룹에 통계를 적용한 후, 결과를 결합합니다.
import pandas as pd df = pd.DataFrame({ "name": ["철수","영희","민수","지훈","수진","현우"], "city": ["서울","부산","서울","대전","부산","서울"], "department": ["기술","기술","제품","기술","제품","기술"], "salary": [15000,18000,12000,20000,13000,16000], "age": [25,28,30,26,32,24] }) # ---- 도시별 그룹화, 평균 급여 계산 ---- print(df.groupby("city")["salary"].mean()) # city # 부산 15500 # 서울 14333 # 대전 20000 # ---- 여러 집계 함수 ---- print(df.groupby("city")["salary"].agg(["mean", "max", "min", "count"])) # ---- 다중 열 그룹화 ---- print(df.groupby(["city", "department"])["salary"].mean()) # ---- agg()로 열마다 다른 통계 적용 ---- result = df.groupby("city").agg({ "salary": ["mean", "max"], "age": "mean", "name": "count" # 카운트 }) print(result) # ---- 그룹화 후 정렬 ---- city_salary = df.groupby("city")["salary"].mean().sort_values(ascending=False) print(city_salary)
피벗 테이블은 Excel의 "데이터 피벗 테이블" 기능과 같습니다 — 행과 열을 교차 통계하며, 한 줄의 코드로 해결됩니다.
# ---- 기본 피벗 테이블 ---- pivot = df.pivot_table( values="salary", # 통계할 값 index="city", # 행 columns="department", # 열 aggfunc="mean" # 집계 방식 ) print(pivot) # department 제품 기술 # city # 부산 13000.0 18000.0 # 서울 12000.0 15500.0 # 대전 NaN 20000.0 # ---- 행/열 합계 추가 ---- pivot = df.pivot_table( values="salary", index="city", columns="department", aggfunc="mean", margins=True, margins_name="합계" )
groupby + idxmax 사용)import pandas as pd # 두 개의 테이블 users = pd.DataFrame({ "user_id": [1, 2, 3, 4], "name": ["철수", "영희", "민수", "지훈"] }) orders = pd.DataFrame({ "order_id": [101, 102, 103, 104], "user_id": [1, 2, 2, 5], "amount": [100, 200, 150, 300] }) # ---- inner join (양쪽 모두에 있는 것만 유지) ---- result = pd.merge(users, orders, on="user_id", how="inner") # ---- left join (왼쪽 테이블 전체 유지) ---- result = pd.merge(users, orders, on="user_id", how="left") # ---- right join (오른쪽 테이블 전체 유지) ---- result = pd.merge(users, orders, on="user_id", how="right") # ---- outer join (양쪽 모두 유지) ---- result = pd.merge(users, orders, on="user_id", how="outer")
| 연결 방식 | 유지 데이터 | 비유 |
|---|---|---|
inner | 양쪽 모두에 있는 것 | 교집합 |
left | 왼쪽 테이블 전부 + 오른쪽 매칭 | 왼쪽 테이블 기준 |
right | 오른쪽 테이블 전부 + 왼쪽 매칭 | 오른쪽 테이블 기준 |
outer | 양쪽 전부 | 합집합 |
# ---- 상하 이어붙이기 (행 추가) ---- df1 = pd.DataFrame({"name": ["A", "B"], "score": [80, 90]}) df2 = pd.DataFrame({"name": ["C", "D"], "score": [85, 95]}) combined = pd.concat([df1, df2], ignore_index=True) # ignore_index=True 인덱스 재번호 매기기 # ---- 좌우 이어붙이기 (열 추가) ---- info = pd.DataFrame({"city": ["서울", "부산"]}) combined = pd.concat([df1, info], axis=1) # ---- 실제 시나리오: 여러 CSV 합치기 ---- import glob files = glob.glob("data/*.csv") all_dfs = [pd.read_csv(f) for f in files] merged = pd.concat(all_dfs, ignore_index=True)
merge = 공통 열을 기준으로 관련 (SQL JOIN과 유사)concat = 단순 연결 (상하로 행 추가 또는 좌우로 열 추가)
SQL의 핵심은 하나의 패턴입니다: 어떤 테이블에서(FROM) 어떤 열을 선택하고(SELECT), 어떤 조건을 만족하는지(WHERE).
-- ---- 기본 조회 ---- SELECT * FROM users; -- 전체 조회 SELECT name, age FROM users; -- 열 선택 SELECT * FROM users WHERE age > 20; -- 조건 필터링 SELECT * FROM users WHERE city = '서울'; -- 텍스트 조건 -- ---- AND / OR / IN ---- SELECT * FROM users WHERE age > 20 AND city = '서울'; SELECT * FROM users WHERE city IN ('서울', '부산'); -- ---- 정렬 ---- SELECT * FROM users ORDER BY age DESC; -- 내림차순 SELECT * FROM users ORDER BY age ASC; -- 오름차순 (기본값) -- ---- LIMIT ---- SELECT * FROM users ORDER BY score DESC LIMIT 10; -- 상위 10명 -- ---- 중복 제거 ---- SELECT DISTINCT city FROM users; -- 어떤 도시들이 있는지
-- ---- 집계 함수 ---- SELECT COUNT(*) FROM users; -- 총 행 수 SELECT AVG(score) FROM users; -- 평균 점수 SELECT MAX(score), MIN(score) FROM users; -- 최고/최저 점수 SELECT SUM(salary) FROM users; -- 급여 합계 -- ---- GROUP BY 그룹화 ---- SELECT city, AVG(salary) AS avg_salary FROM users GROUP BY city; -- ---- HAVING 그룹 결과 필터링 ---- SELECT city, COUNT(*) AS cnt FROM users GROUP BY city HAVING cnt > 2; -- 인원 > 2인 도시
-- ---- INNER JOIN ---- SELECT u.name, o.amount FROM users u INNER JOIN orders o ON u.user_id = o.user_id; -- ---- LEFT JOIN ---- SELECT u.name, o.amount FROM users u LEFT JOIN orders o ON u.user_id = o.user_id; -- 모든 사용자를 유지하고, 주문이 없는 사용자의 amount는 NULL
import sqlite3 import pandas as pd # ---- 메모리 데이터베이스 생성 + 데이터 임포트 ---- conn = sqlite3.connect(":memory:") df = pd.DataFrame({ "name": ["철수","영희","민수"], "city": ["서울","부산","서울"], "score": [85, 92, 78] }) df.to_sql("students", conn, index=False) # ---- SQL로 쿼리, 결과가 바로 DataFrame으로 ---- result = pd.read_sql("SELECT * FROM students WHERE score > 80", conn) print(result) # ---- 그룹 통계 ---- result = pd.read_sql(""" SELECT city, AVG(score) AS avg_score, COUNT(*) AS cnt FROM students GROUP BY city """, conn) print(result) conn.close()
Pandas를 배운 후 SQL을 배우면 매우 빠릅니다. 개념이 일대일로 대응되기 때문입니다: df[조건] = WHERE, df.groupby() = GROUP BY, pd.merge() = JOIN, df.sort_values() = ORDER BY.
| 작업 | Pandas | SQL |
|---|---|---|
| 필터링 | df[df["score"] > 80] | WHERE score > 80 |
| 열 선택 | df[["name","score"]] | SELECT name, score |
| 정렬 | df.sort_values("score") | ORDER BY score |
| 그룹화 | df.groupby("city").mean() | GROUP BY city |
| 병합 | pd.merge(a, b, on="id") | JOIN b ON a.id = b.id |
| 중복 제거 | df["city"].unique() | SELECT DISTINCT city |
| 카운트 | df["city"].value_counts() | COUNT(*) GROUP BY city |
대규모 언어 모델 API의 반환값은 보통 여러 겹으로 중첩된 JSON입니다. "양파 껍질을 벗기듯" 데이터를 추출하는 것이 핵심 기술입니다.
import json import pandas as pd # AI API 반환값 시뮬레이션 api_response = { "id": "chatcmpl-abc123", "model": "gpt-4", "choices": [ { "index": 0, "message": { "role": "assistant", "content": "Python은 프로그래밍 언어입니다." }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": 10, "completion_tokens": 15, "total_tokens": 25 } } # ---- 단계별 추출 ---- content = api_response["choices"][0]["message"]["content"] tokens = api_response["usage"]["total_tokens"] print(f"답변: {content}") print(f"소비 token: {tokens}") # ---- 배치 API 결과 → DataFrame ---- results = [ {"question": "Q1", "answer": "A1", "tokens": 20}, {"question": "Q2", "answer": "A2", "tokens": 35}, {"question": "Q3", "answer": "A3", "tokens": 18}, ] df = pd.DataFrame(results) print(f"총 token 소비: {df['tokens'].sum()}") print(f"평균 token: {df['tokens'].mean():.1f}") # ---- json_normalize: 중첩 JSON 자동 평탄화 ---- nested_data = [ {"name": "철수", "info": {"city": "서울", "age": 25}}, {"name": "영희", "info": {"city": "부산", "age": 28}}, ] df = pd.json_normalize(nested_data) print(df) # name info.city info.age # 0 철수 서울 25 # 1 영희 부산 28
실제 시나리오 시뮬레이션: 당신은 AI 애플리케이션 개발자이며, 사용자와 AI의 대화 로그를 분석하여 보고서를 생성해야 합니다.
import pandas as pd import json from datetime import datetime # ---- Step 1: 시뮬레이션 데이터 로드 ---- chat_logs = [ {"user_id":"u1", "time":"2026-03-28 10:00", "role":"user", "content":"Python으로 CSV는 어떻게 읽나요?", "tokens":8}, {"user_id":"u1", "time":"2026-03-28 10:00", "role":"assistant", "content":"pandas의 read_csv를 사용하세요...", "tokens":120}, {"user_id":"u2", "time":"2026-03-28 11:30", "role":"user", "content":"크롤러 작성을 도와주세요", "tokens":6}, # ... 더 많은 데이터 ] df = pd.DataFrame(chat_logs) df["time"] = pd.to_datetime(df["time"]) # ---- Step 2: 데이터 정제 ---- df["content"] = df["content"].str.strip() df = df.dropna(subset=["content"]) df["hour"] = df["time"].dt.hour # ---- Step 3: 통계 분석 ---- # 각 사용자의 대화 횟수 user_stats = df[df["role"]=="user"].groupby("user_id").agg( 대화횟수=("content", "count"), 총token=("tokens", "sum") ).reset_index() # AI 응답의 token 통계 ai_stats = df[df["role"]=="assistant"]["tokens"].describe() # 시간대별 활성도 통계 hourly = df.groupby("hour").size() # ---- Step 4: 보고서 내보내기 ---- report = { "총 대화 수": len(df), "고유 사용자 수": df["user_id"].nunique(), "총 token 소비": int(df["tokens"].sum()), "평균 응답 token": round(float(ai_stats["mean"]), 1), "생성 시간": datetime.now().strftime("%Y-%m-%d %H:%M") } with open("chat_report.json", "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2) user_stats.to_csv("user_stats.csv", index=False) print("보고서 생성 완료!")
아래 항목을 모두 완료하면, Phase 3으로 진입할 준비가 된 것입니다:
pd.read_csv() / read_excel() / read_json()으로 다양한 형식의 데이터를 읽을 수 있다데이터 처리의 핵심 능력을 갖추셨습니다! 다음으로 대규모 언어 모델 API를 본격적으로 다루게 되며, Phase 1과 Phase 2에서 배운 모든 기술이 API 호출, 반환값 처리, 데이터 파이프라인 구축에 활용됩니다.