MLOps 실전 구축 가이드 - 2026년 엔터프라이즈 AI 운영 전략
1. MLOps 개념과 필요성
1.1 MLOps란 무엇인가?
MLOps(Machine Learning Operations)는 머신러닝 모델의 개발, 배포, 운영을 자동화하고 표준화하는 방법론입니다.
# MLOps 성숙도 단계
class MLOpsMaturity:
def __init__(self):
self.stages = {
"Level 0": "Manual Process",
"Level 1": "ML Pipeline Automation",
"Level 2": "CI/CD Pipeline Automation",
"Level 3": "Advanced MLOps with Monitoring"
}
def assess_maturity(self, organization):
# 조직의 MLOps 성숙도 평가
criteria = {
"automation": self.check_automation_level(organization),
"monitoring": self.check_monitoring_capabilities(organization),
"governance": self.check_ml_governance(organization),
"collaboration": self.check_team_collaboration(organization)
}
return self.calculate_maturity_score(criteria)
1.2 전통적인 ML과 MLOps의 차이점
| 전통적인 ML | MLOps |
|---|---|
| 일회성 모델 개발 | 지속적인 모델 개선 |
| 수동 배포 | 자동화된 배포 파이프라인 |
| 모델 성능 추적 부재 | 실시간 모니터링 |
| 실험 재현 어려움 | 실험 추적 및 버전 관리 |
2. MLOps 아키텍처 설계
2.1 핵심 컴포넌트 구조
# mlops-architecture.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: mlops-architecture
data:
components: |
# 데이터 파이프라인
data_pipeline:
ingestion: Apache Kafka, Apache Airflow
preprocessing: Apache Spark, Dask
storage: MinIO, AWS S3, Delta Lake
# 모델 개발 환경
development:
experimentation: Jupyter, MLflow
training: Kubernetes, Ray
versioning: DVC, Git LFS
# 모델 서빙
serving:
inference: Seldon Core, KServe
api_gateway: Istio, Ambassador
monitoring: Prometheus, Grafana
# 모델 관리
management:
registry: MLflow Model Registry
metadata: ML Metadata
governance: Apache Atlas
2.2 MLOps 파이프라인 구성
# mlops_pipeline.py
import mlflow
import mlflow.sklearn
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import joblib
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score
class MLOpsPipeline:
def __init__(self, model_name, experiment_name):
self.model_name = model_name
self.experiment_name = experiment_name
mlflow.set_experiment(experiment_name)
def data_validation(self, **context):
"""데이터 품질 검증"""
data = pd.read_csv("/data/input/training_data.csv")
# 데이터 품질 체크
quality_checks = {
"missing_values": data.isnull().sum().sum(),
"duplicate_rows": data.duplicated().sum(),
"data_shape": data.shape,
"data_types": data.dtypes.to_dict()
}
# 품질 임계값 검증
if quality_checks["missing_values"] > 1000:
raise ValueError("Too many missing values")
# MLflow에 데이터 메트릭 로깅
with mlflow.start_run():
mlflow.log_metrics(quality_checks)
return quality_checks
def feature_engineering(self, **context):
"""피처 엔지니어링"""
data = pd.read_csv("/data/input/training_data.csv")
# 피처 생성
data['feature_interaction'] = data['feature1'] * data['feature2']
data['feature_ratio'] = data['feature1'] / (data['feature2'] + 1e-8)
# 피처 선택
important_features = ['feature1', 'feature2', 'feature_interaction', 'feature_ratio']
processed_data = data[important_features + ['target']]
# 처리된 데이터 저장
processed_data.to_csv("/data/processed/features.csv", index=False)
return {"features_created": len(important_features)}
def model_training(self, **context):
"""모델 훈련"""
data = pd.read_csv("/data/processed/features.csv")
X = data.drop('target', axis=1)
y = data['target']
with mlflow.start_run():
# 하이퍼파라미터
params = {
"n_estimators": 100,
"max_depth": 10,
"random_state": 42
}
mlflow.log_params(params)
# 모델 훈련
model = RandomForestClassifier(**params)
model.fit(X, y)
# 모델 평가
predictions = model.predict(X)
metrics = {
"accuracy": accuracy_score(y, predictions),
"precision": precision_score(y, predictions, average='weighted'),
"recall": recall_score(y, predictions, average='weighted')
}
mlflow.log_metrics(metrics)
# 모델 저장
mlflow.sklearn.log_model(model, "model")
joblib.dump(model, "/models/trained_model.pkl")
return metrics
def model_validation(self, **context):
"""모델 검증"""
model = joblib.load("/models/trained_model.pkl")
validation_data = pd.read_csv("/data/validation/test_data.csv")
X_val = validation_data.drop('target', axis=1)
y_val = validation_data['target']
# 모델 성능 평가
predictions = model.predict(X_val)
validation_metrics = {
"val_accuracy": accuracy_score(y_val, predictions),
"val_precision": precision_score(y_val, predictions, average='weighted'),
"val_recall": recall_score(y_val, predictions, average='weighted')
}
# 성능 임계값 검증
if validation_metrics["val_accuracy"] < 0.8:
raise ValueError("Model performance below threshold")
with mlflow.start_run():
mlflow.log_metrics(validation_metrics)
return validation_metrics
def model_deployment(self, **context):
"""모델 배포"""
# MLflow Model Registry에 모델 등록
model_uri = f"runs:/{context['task_instance'].xcom_pull(task_ids='model_training')}/model"
model_version = mlflow.register_model(
model_uri=model_uri,
name=self.model_name
)
# 프로덕션으로 승격
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name=self.model_name,
version=model_version.version,
stage="Production"
)
return {"model_version": model_version.version}
# Airflow DAG 정의
default_args = {
'owner': 'mlops-team',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'mlops_pipeline',
default_args=default_args,
description='MLOps training and deployment pipeline',
schedule_interval='@daily',
catchup=False
)
pipeline = MLOpsPipeline("production_model", "model_experiment")
# 태스크 정의
data_validation_task = PythonOperator(
task_id='data_validation',
python_callable=pipeline.data_validation,
dag=dag
)
feature_engineering_task = PythonOperator(
task_id='feature_engineering',
python_callable=pipeline.feature_engineering,
dag=dag
)
model_training_task = PythonOperator(
task_id='model_training',
python_callable=pipeline.model_training,
dag=dag
)
model_validation_task = PythonOperator(
task_id='model_validation',
python_callable=pipeline.model_validation,
dag=dag
)
model_deployment_task = PythonOperator(
task_id='model_deployment',
python_callable=pipeline.model_deployment,
dag=dag
)
# 태스크 의존성 설정
data_validation_task >> feature_engineering_task >> model_training_task >> model_validation_task >> model_deployment_task
3. MLflow를 활용한 실험 관리
3.1 MLflow 설정 및 구성
# mlflow_setup.py
import mlflow
import mlflow.tracking
from mlflow.tracking.client import MlflowClient
import os
class MLflowManager:
def __init__(self, tracking_uri="http://localhost:5000"):
mlflow.set_tracking_uri(tracking_uri)
self.client = MlflowClient()
def setup_experiment(self, experiment_name, artifact_location=None):
"""실험 환경 설정"""
try:
experiment_id = mlflow.create_experiment(
name=experiment_name,
artifact_location=artifact_location
)
except mlflow.exceptions.MlflowException:
experiment = mlflow.get_experiment_by_name(experiment_name)
experiment_id = experiment.experiment_id
mlflow.set_experiment(experiment_name)
return experiment_id
def log_model_comparison(self, models_dict, test_data):
"""여러 모델 성능 비교"""
X_test, y_test = test_data
comparison_results = []
for model_name, model in models_dict.items():
with mlflow.start_run(run_name=f"{model_name}_comparison"):
# 모델 예측
predictions = model.predict(X_test)
# 메트릭 계산
metrics = {
"accuracy": accuracy_score(y_test, predictions),
"precision": precision_score(y_test, predictions, average='weighted'),
"recall": recall_score(y_test, predictions, average='weighted'),
"f1_score": f1_score(y_test, predictions, average='weighted')
}
# MLflow 로깅
mlflow.log_params(model.get_params())
mlflow.log_metrics(metrics)
mlflow.sklearn.log_model(model, f"{model_name}_model")
# 혼동 행렬 저장
cm = confusion_matrix(y_test, predictions)
self.save_confusion_matrix(cm, model_name)
comparison_results.append({
"model": model_name,
"metrics": metrics,
"run_id": mlflow.active_run().info.run_id
})
return comparison_results
def save_confusion_matrix(self, cm, model_name):
"""혼동 행렬 시각화 저장"""
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title(f'Confusion Matrix - {model_name}')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
# MLflow에 아티팩트로 저장
plt.savefig(f"confusion_matrix_{model_name}.png")
mlflow.log_artifact(f"confusion_matrix_{model_name}.png")
plt.close()
def auto_hyperparameter_tuning(self, model_class, param_grid, X_train, y_train, X_val, y_val):
"""자동 하이퍼파라미터 튜닝"""
from sklearn.model_selection import ParameterGrid
best_score = 0
best_params = None
best_model = None
for params in ParameterGrid(param_grid):
with mlflow.start_run():
# 모델 훈련
model = model_class(**params)
model.fit(X_train, y_train)
# 검증 성능 평가
val_predictions = model.predict(X_val)
val_score = accuracy_score(y_val, val_predictions)
# MLflow 로깅
mlflow.log_params(params)
mlflow.log_metric("validation_accuracy", val_score)
mlflow.sklearn.log_model(model, "model")
# 최고 성능 모델 추적
if val_score > best_score:
best_score = val_score
best_params = params
best_model = model
return best_model, best_params, best_score
# MLflow 사용 예제
mlflow_manager = MLflowManager()
experiment_id = mlflow_manager.setup_experiment("production_experiments")
# 모델 비교 실행
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
models = {
"RandomForest": RandomForestClassifier(n_estimators=100),
"GradientBoosting": GradientBoostingClassifier(n_estimators=100),
"LogisticRegression": LogisticRegression()
}
# comparison_results = mlflow_manager.log_model_comparison(models, (X_test, y_test))
3.2 모델 레지스트리 관리
# model_registry.py
class ModelRegistry:
def __init__(self):
self.client = MlflowClient()
def register_model(self, model_uri, model_name, description=""):
"""모델을 레지스트리에 등록"""
model_version = mlflow.register_model(
model_uri=model_uri,
name=model_name
)
# 모델 설명 추가
self.client.update_model_version(
name=model_name,
version=model_version.version,
description=description
)
return model_version
def promote_model(self, model_name, version, stage):
"""모델을 특정 스테이지로 승격"""
self.client.transition_model_version_stage(
name=model_name,
version=version,
stage=stage
)
def compare_model_versions(self, model_name, version1, version2):
"""모델 버전 간 성능 비교"""
# 버전별 메트릭 조회
mv1 = self.client.get_model_version(model_name, version1)
mv2 = self.client.get_model_version(model_name, version2)
# 런 정보 조회
run1 = self.client.get_run(mv1.run_id)
run2 = self.client.get_run(mv2.run_id)
comparison = {
"version1": {
"version": version1,
"metrics": run1.data.metrics,
"params": run1.data.params
},
"version2": {
"version": version2,
"metrics": run2.data.metrics,
"params": run2.data.params
}
}
return comparison
def get_best_model(self, model_name, metric_name="accuracy"):
"""최고 성능 모델 버전 조회"""
versions = self.client.search_model_versions(f"name='{model_name}'")
best_version = None
best_score = 0
for version in versions:
run = self.client.get_run(version.run_id)
if metric_name in run.data.metrics:
score = run.data.metrics[metric_name]
if score > best_score:
best_score = score
best_version = version
return best_version
4. Kubernetes 기반 모델 서빙
4.1 KServe를 활용한 모델 배포
# model-serving.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: sklearn-model-serving
namespace: production
spec:
predictor:
sklearn:
storageUri: s3://mlops-models/sklearn-model/
resources:
requests:
cpu: "0.1"
memory: "512Mi"
limits:
cpu: "1"
memory: "1Gi"
transformer:
custom:
image: "myregistry.com/feature-transformer:latest"
env:
- name: STORAGE_URI
value: "s3://mlops-models/transformers/"
---
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: pytorch-model-serving
namespace: production
spec:
predictor:
pytorch:
storageUri: s3://mlops-models/pytorch-model/
resources:
requests:
cpu: "0.5"
memory: "1Gi"
nvidia.com/gpu: "1"
limits:
cpu: "2"
memory: "4Gi"
nvidia.com/gpu: "1"
env:
- name: CUDA_VISIBLE_DEVICES
value: "0"
4.2 커스텀 예측 서버 구현
# custom_predictor.py
from kserve import KServeClient, V1beta1InferenceService
import joblib
import numpy as np
import pandas as pd
from typing import Dict, List
import json
import logging
class CustomPredictor:
def __init__(self, model_path: str):
self.model = joblib.load(model_path)
self.feature_names = None
def preprocess(self, inputs: Dict) -> np.ndarray:
"""입력 데이터 전처리"""
if isinstance(inputs, dict):
df = pd.DataFrame([inputs])
else:
df = pd.DataFrame(inputs)
# 피처 엔지니어링
df['feature_interaction'] = df['feature1'] * df['feature2']
df['feature_ratio'] = df['feature1'] / (df['feature2'] + 1e-8)
# 필요한 컬럼만 선택
required_features = ['feature1', 'feature2', 'feature_interaction', 'feature_ratio']
processed_df = df[required_features]
return processed_df.values
def predict(self, inputs: Dict) -> Dict:
"""모델 예측"""
try:
# 전처리
processed_inputs = self.preprocess(inputs)
# 예측 수행
predictions = self.model.predict(processed_inputs)
probabilities = self.model.predict_proba(processed_inputs)
# 결과 포맷팅
results = {
"predictions": predictions.tolist(),
"probabilities": probabilities.tolist(),
"model_version": "1.0.0",
"status": "success"
}
return results
except Exception as e:
logging.error(f"Prediction error: {str(e)}")
return {
"error": str(e),
"status": "error"
}
def explain(self, inputs: Dict) -> Dict:
"""모델 예측 설명"""
try:
# SHAP을 사용한 설명
import shap
processed_inputs = self.preprocess(inputs)
explainer = shap.TreeExplainer(self.model)
shap_values = explainer.shap_values(processed_inputs)
explanation = {
"shap_values": shap_values.tolist(),
"feature_importance": dict(zip(
['feature1', 'feature2', 'feature_interaction', 'feature_ratio'],
shap_values[0]
)),
"base_value": explainer.expected_value
}
return explanation
except Exception as e:
return {"error": f"Explanation error: {str(e)}"}
# Flask API 서버
from flask import Flask, request, jsonify
import time
app = Flask(__name__)
predictor = CustomPredictor("/models/trained_model.pkl")
# 모델 서빙 메트릭
class ModelMetrics:
def __init__(self):
self.request_count = 0
self.error_count = 0
self.response_times = []
def record_request(self, response_time: float, error: bool = False):
self.request_count += 1
self.response_times.append(response_time)
if error:
self.error_count += 1
def get_metrics(self):
return {
"total_requests": self.request_count,
"error_rate": self.error_count / max(self.request_count, 1),
"avg_response_time": np.mean(self.response_times) if self.response_times else 0,
"p95_response_time": np.percentile(self.response_times, 95) if self.response_times else 0
}
metrics = ModelMetrics()
@app.route('/predict', methods=['POST'])
def predict():
start_time = time.time()
try:
data = request.get_json()
result = predictor.predict(data)
response_time = time.time() - start_time
metrics.record_request(response_time, "error" in result)
return jsonify(result)
except Exception as e:
response_time = time.time() - start_time
metrics.record_request(response_time, True)
return jsonify({
"error": str(e),
"status": "error"
}), 500
@app.route('/explain', methods=['POST'])
def explain():
try:
data = request.get_json()
explanation = predictor.explain(data)
return jsonify(explanation)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
return jsonify({
"status": "healthy",
"model_loaded": predictor.model is not None,
"timestamp": time.time()
})
@app.route('/metrics', methods=['GET'])
def get_metrics():
return jsonify(metrics.get_metrics())
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
5. 모델 모니터링 시스템
5.1 데이터 드리프트 감지
# drift_detection.py
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.metrics import accuracy_score
import warnings
from typing import Dict, List, Tuple
import logging
class DriftDetector:
def __init__(self, reference_data: pd.DataFrame, threshold: float = 0.05):
self.reference_data = reference_data
self.threshold = threshold
self.feature_stats = self._compute_reference_stats()
def _compute_reference_stats(self) -> Dict:
"""참조 데이터의 통계량 계산"""
stats = {}
for column in self.reference_data.columns:
if self.reference_data[column].dtype in ['int64', 'float64']:
stats[column] = {
'mean': self.reference_data[column].mean(),
'std': self.reference_data[column].std(),
'min': self.reference_data[column].min(),
'max': self.reference_data[column].max(),
'quantiles': self.reference_data[column].quantile([0.25, 0.5, 0.75]).to_dict()
}
else:
stats[column] = {
'value_counts': self.reference_data[column].value_counts().to_dict()
}
return stats
def detect_covariate_shift(self, new_data: pd.DataFrame) -> Dict:
"""공변량 시프트 감지"""
drift_results = {}
for column in self.reference_data.columns:
if column in new_data.columns:
if self.reference_data[column].dtype in ['int64', 'float64']:
# 수치형 변수: KS 테스트
ks_stat, p_value = stats.ks_2samp(
self.reference_data[column].dropna(),
new_data[column].dropna()
)
drift_results[column] = {
'test': 'kolmogorov_smirnov',
'statistic': ks_stat,
'p_value': p_value,
'drift_detected': p_value < self.threshold,
'severity': self._calculate_severity(p_value)
}
else:
# 범주형 변수: Chi-square 테스트
ref_counts = self.reference_data[column].value_counts()
new_counts = new_data[column].value_counts()
# 공통 카테고리만 고려
common_categories = set(ref_counts.index) & set(new_counts.index)
if len(common_categories) > 1:
ref_aligned = [ref_counts.get(cat, 0) for cat in common_categories]
new_aligned = [new_counts.get(cat, 0) for cat in common_categories]
chi2_stat, p_value = stats.chisquare(new_aligned, ref_aligned)
drift_results[column] = {
'test': 'chi_square',
'statistic': chi2_stat,
'p_value': p_value,
'drift_detected': p_value < self.threshold,
'severity': self._calculate_severity(p_value)
}
return drift_results
def detect_concept_drift(self, new_data: pd.DataFrame, new_labels: pd.Series,
model) -> Dict:
"""개념 드리프트 감지"""
# 모델 성능 비교
ref_predictions = model.predict(self.reference_data.drop('target', axis=1, errors='ignore'))
new_predictions = model.predict(new_data.drop('target', axis=1, errors='ignore'))
ref_accuracy = accuracy_score(
self.reference_data.get('target', ref_predictions),
ref_predictions
)
new_accuracy = accuracy_score(new_labels, new_predictions)
performance_drop = ref_accuracy - new_accuracy
return {
'reference_accuracy': ref_accuracy,
'current_accuracy': new_accuracy,
'performance_drop': performance_drop,
'concept_drift_detected': performance_drop > 0.05, # 5% 성능 저하 임계값
'severity': 'high' if performance_drop > 0.1 else 'medium' if performance_drop > 0.05 else 'low'
}
def _calculate_severity(self, p_value: float) -> str:
"""드리프트 심각도 계산"""
if p_value < 0.001:
return 'high'
elif p_value < 0.01:
return 'medium'
elif p_value < self.threshold:
return 'low'
else:
return 'none'
def generate_drift_report(self, new_data: pd.DataFrame,
new_labels: pd.Series = None, model=None) -> Dict:
"""종합 드리프트 리포트 생성"""
report = {
'timestamp': pd.Timestamp.now().isoformat(),
'data_size': len(new_data),
'covariate_shift': self.detect_covariate_shift(new_data)
}
if new_labels is not None and model is not None:
report['concept_drift'] = self.detect_concept_drift(new_data, new_labels, model)
# 전체 드리프트 점수 계산
drift_scores = [
result['drift_detected'] for result in report['covariate_shift'].values()
]
report['overall_drift_score'] = sum(drift_scores) / len(drift_scores) if drift_scores else 0
return report
# 실시간 모니터링 시스템
class RealTimeMonitor:
def __init__(self, model, reference_data: pd.DataFrame):
self.model = model
self.drift_detector = DriftDetector(reference_data)
self.prediction_buffer = []
self.alert_thresholds = {
'drift_score': 0.3,
'performance_drop': 0.05,
'error_rate': 0.1
}
def log_prediction(self, input_data: Dict, prediction: float,
actual: float = None, response_time: float = None):
"""예측 로그 기록"""
log_entry = {
'timestamp': pd.Timestamp.now(),
'input_data': input_data,
'prediction': prediction,
'actual': actual,
'response_time': response_time
}
self.prediction_buffer.append(log_entry)
# 버퍼 크기 제한
if len(self.prediction_buffer) > 10000:
self.prediction_buffer = self.prediction_buffer[-5000:]
# 주기적 모니터링
if len(self.prediction_buffer) % 100 == 0:
self._check_model_health()
def _check_model_health(self):
"""모델 헬스 체크"""
recent_data = pd.DataFrame([
entry['input_data'] for entry in self.prediction_buffer[-100:]
])
# 드리프트 감지
drift_report = self.drift_detector.generate_drift_report(recent_data)
# 알림 발생
if drift_report['overall_drift_score'] > self.alert_thresholds['drift_score']:
self._send_alert('drift_detected', drift_report)
# 성능 저하 체크
recent_predictions = [entry for entry in self.prediction_buffer[-100:]
if entry['actual'] is not None]
if len(recent_predictions) > 10:
actuals = [entry['actual'] for entry in recent_predictions]
predictions = [entry['prediction'] for entry in recent_predictions]
current_accuracy = accuracy_score(actuals, predictions)
if current_accuracy < 0.8: # 성능 임계값
self._send_alert('performance_degradation', {
'current_accuracy': current_accuracy,
'samples_evaluated': len(recent_predictions)
})
def _send_alert(self, alert_type: str, details: Dict):
"""알림 발송"""
alert_message = {
'alert_type': alert_type,
'timestamp': pd.Timestamp.now().isoformat(),
'details': details,
'severity': 'high' if alert_type == 'performance_degradation' else 'medium'
}
logging.warning(f"MLOps Alert: {alert_message}")
# 실제 환경에서는 Slack, PagerDuty 등으로 알림 발송
# self.slack_client.send_message(f"🚨 MLOps Alert: {alert_type}")
# self.pagerduty_client.create_incident(alert_message)
5.2 모델 성능 대시보드
# monitoring_dashboard.py
import streamlit as st
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import mlflow
from mlflow.tracking import MlflowClient
class MLOpsDebboardApp:
def __init__(self):
self.client = MlflowClient()
st.set_page_config(
page_title="MLOps Monitoring Dashboard",
page_icon="🚀",
layout="wide"
)
def run(self):
st.title("🚀 MLOps Monitoring Dashboard")
# 사이드바 설정
st.sidebar.header("설정")
# 모델 선택
models = [mv.name for mv in self.client.search_registered_models()]
selected_model = st.sidebar.selectbox("모델 선택", models)
# 시간 범위 선택
time_range = st.sidebar.selectbox(
"시간 범위",
["지난 1시간", "지난 24시간", "지난 7일", "지난 30일"]
)
# 메인 대시보드
col1, col2, col3 = st.columns(3)
with col1:
self._show_model_performance_metrics(selected_model, time_range)
with col2:
self._show_prediction_volume_chart(selected_model, time_range)
with col3:
self._show_error_rate_chart(selected_model, time_range)
# 드리프트 모니터링
st.header("📊 드리프트 모니터링")
col4, col5 = st.columns(2)
with col4:
self._show_feature_drift_chart(selected_model, time_range)
with col5:
self._show_prediction_distribution(selected_model, time_range)
# 모델 버전 비교
st.header("🔄 모델 버전 비교")
self._show_model_version_comparison(selected_model)
# 알림 및 이벤트
st.header("🚨 최근 알림 및 이벤트")
self._show_recent_alerts()
def _show_model_performance_metrics(self, model_name: str, time_range: str):
"""모델 성능 메트릭 표시"""
st.subheader("모델 성능")
# 더미 데이터 (실제로는 모니터링 시스템에서 가져옴)
metrics = {
"정확도": 0.87,
"정밀도": 0.89,
"재현율": 0.85,
"F1 스코어": 0.87
}
for metric, value in metrics.items():
st.metric(
label=metric,
value=f"{value:.3f}",
delta=f"{np.random.uniform(-0.02, 0.02):.3f}"
)
def _show_prediction_volume_chart(self, model_name: str, time_range: str):
"""예측 요청 볼륨 차트"""
st.subheader("예측 요청 볼륨")
# 더미 시계열 데이터 생성
dates = pd.date_range(end=datetime.now(), periods=24, freq='H')
volumes = np.random.poisson(100, 24) + np.random.randint(50, 150, 24)
fig = go.Figure()
fig.add_trace(go.Scatter(
x=dates,
y=volumes,
mode='lines+markers',
name='예측 요청 수',
line=dict(color='blue')
))
fig.update_layout(
title="시간별 예측 요청 수",
xaxis_title="시간",
yaxis_title="요청 수",
height=300
)
st.plotly_chart(fig, use_container_width=True)
def _show_error_rate_chart(self, model_name: str, time_range: str):
"""에러율 차트"""
st.subheader("에러율")
# 더미 에러율 데이터
dates = pd.date_range(end=datetime.now(), periods=24, freq='H')
error_rates = np.random.uniform(0.01, 0.05, 24)
fig = go.Figure()
fig.add_trace(go.Scatter(
x=dates,
y=error_rates * 100,
mode='lines+markers',
name='에러율 (%)',
line=dict(color='red')
))
# 임계값 라인 추가
fig.add_hline(y=3, line_dash="dash", line_color="orange",
annotation_text="임계값 (3%)")
fig.update_layout(
title="시간별 에러율",
xaxis_title="시간",
yaxis_title="에러율 (%)",
height=300
)
st.plotly_chart(fig, use_container_width=True)
def _show_feature_drift_chart(self, model_name: str, time_range: str):
"""피처 드리프트 차트"""
st.subheader("피처 드리프트 점수")
# 더미 드리프트 데이터
features = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5']
drift_scores = np.random.uniform(0, 0.3, len(features))
fig = go.Figure(data=[
go.Bar(x=features, y=drift_scores,
marker_color=['red' if score > 0.2 else 'orange' if score > 0.1 else 'green'
for score in drift_scores])
])
fig.add_hline(y=0.1, line_dash="dash", line_color="orange",
annotation_text="주의 (0.1)")
fig.add_hline(y=0.2, line_dash="dash", line_color="red",
annotation_text="경고 (0.2)")
fig.update_layout(
title="피처별 드리프트 점수",
xaxis_title="피처",
yaxis_title="드리프트 점수",
height=300
)
st.plotly_chart(fig, use_container_width=True)
def _show_prediction_distribution(self, model_name: str, time_range: str):
"""예측 분포 차트"""
st.subheader("예측 분포")
# 더미 예측 분포 데이터
reference_predictions = np.random.normal(0.5, 0.2, 1000)
current_predictions = np.random.normal(0.55, 0.25, 1000)
fig = go.Figure()
fig.add_trace(go.Histogram(
x=reference_predictions,
name='참조 기간',
opacity=0.7,
nbinsx=30
))
fig.add_trace(go.Histogram(
x=current_predictions,
name='현재 기간',
opacity=0.7,
nbinsx=30
))
fig.update_layout(
title="예측값 분포 비교",
xaxis_title="예측값",
yaxis_title="빈도",
barmode='overlay',
height=300
)
st.plotly_chart(fig, use_container_width=True)
def _show_model_version_comparison(self, model_name: str):
"""모델 버전 비교"""
# 더미 모델 버전 데이터
versions_data = {
'Version': ['v1.0', 'v1.1', 'v1.2', 'v2.0'],
'Accuracy': [0.85, 0.87, 0.88, 0.89],
'Precision': [0.83, 0.86, 0.87, 0.88],
'Recall': [0.87, 0.88, 0.89, 0.90],
'Training Date': ['2026-01-01', '2026-01-05', '2026-01-10', '2026-01-15']
}
df = pd.DataFrame(versions_data)
col1, col2 = st.columns(2)
with col1:
# 메트릭 비교 차트
fig = go.Figure()
for metric in ['Accuracy', 'Precision', 'Recall']:
fig.add_trace(go.Scatter(
x=df['Version'],
y=df[metric],
mode='lines+markers',
name=metric
))
fig.update_layout(
title="버전별 성능 메트릭 비교",
xaxis_title="모델 버전",
yaxis_title="점수",
height=300
)
st.plotly_chart(fig, use_container_width=True)
with col2:
# 버전 정보 테이블
st.dataframe(df, use_container_width=True)
def _show_recent_alerts(self):
"""최근 알림 표시"""
# 더미 알림 데이터
alerts_data = {
'Time': ['2026-01-24 14:30', '2026-01-24 12:15', '2026-01-24 09:45'],
'Type': ['성능 저하', '데이터 드리프트', '높은 에러율'],
'Severity': ['High', 'Medium', 'Low'],
'Message': [
'모델 정확도가 임계값 아래로 떨어졌습니다',
'feature1에서 유의미한 드리프트가 감지되었습니다',
'에러율이 평소보다 높습니다'
]
}
alerts_df = pd.DataFrame(alerts_data)
# 심각도별 색상 적용
def highlight_severity(row):
if row.Severity == 'High':
return ['background-color: #ffcccc'] * len(row)
elif row.Severity == 'Medium':
return ['background-color: #fff2cc'] * len(row)
else:
return ['background-color: #ccffcc'] * len(row)
st.dataframe(
alerts_df.style.apply(highlight_severity, axis=1),
use_container_width=True
)
# Streamlit 앱 실행
if __name__ == "__main__":
app = MLOpsDebboardApp()
app.run()
6. CI/CD 파이프라인 구성
6.1 GitHub Actions를 활용한 MLOps 파이프라인
# .github/workflows/mlops-pipeline.yml
name: MLOps Pipeline
on:
push:
branches: [main]
paths:
- 'models/**'
- 'data/**'
- 'src/**'
pull_request:
branches: [main]
schedule:
# 매일 새벽 2시에 모델 재훈련
- cron: '0 2 * * *'
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
KUBERNETES_NAMESPACE: production
jobs:
data-validation:
runs-on: ubuntu-latest
outputs:
data-valid: ${{ steps.validate.outputs.valid }}
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Validate data
id: validate
run: |
python scripts/validate_data.py
echo "valid=$?" >> $GITHUB_OUTPUT
- name: Upload data validation report
uses: actions/upload-artifact@v3
with:
name: data-validation-report
path: reports/data_validation.json
model-training:
needs: data-validation
if: needs.data-validation.outputs.data-valid == 'true'
runs-on: ubuntu-latest
outputs:
model-id: ${{ steps.training.outputs.model-id }}
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Train model
id: training
run: |
python scripts/train_model.py
echo "model-id=$(cat model_id.txt)" >> $GITHUB_OUTPUT
- name: Upload model artifacts
uses: actions/upload-artifact@v3
with:
name: trained-model
path: models/
model-evaluation:
needs: model-training
runs-on: ubuntu-latest
outputs:
performance-score: ${{ steps.evaluate.outputs.score }}
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Download model
uses: actions/download-artifact@v3
with:
name: trained-model
path: models/
- name: Evaluate model
id: evaluate
run: |
python scripts/evaluate_model.py
echo "score=$(cat evaluation_score.txt)" >> $GITHUB_OUTPUT
- name: Generate evaluation report
run: |
python scripts/generate_report.py
- name: Upload evaluation report
uses: actions/upload-artifact@v3
with:
name: evaluation-report
path: reports/
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run security scan
uses: securecodewarrior/github-action-add-sarif@v1
with:
sarif-file: security-report.sarif
- name: Scan dependencies
run: |
pip install safety
safety check --json --output safety-report.json
model-deployment:
needs: [model-evaluation, security-scan]
if: needs.model-evaluation.outputs.performance-score > '0.85'
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-west-2
- name: Configure kubectl
run: |
aws eks update-kubeconfig --name production-cluster
- name: Download model
uses: actions/download-artifact@v3
with:
name: trained-model
path: models/
- name: Build and push Docker image
run: |
docker build -t ${{ secrets.ECR_REGISTRY }}/ml-model:${{ github.sha }} .
docker push ${{ secrets.ECR_REGISTRY }}/ml-model:${{ github.sha }}
- name: Deploy to Kubernetes
run: |
envsubst < k8s/deployment.yaml | kubectl apply -f -
env:
IMAGE_TAG: ${{ github.sha }}
MODEL_ID: ${{ needs.model-training.outputs.model-id }}
- name: Wait for deployment
run: |
kubectl rollout status deployment/ml-model-service -n $KUBERNETES_NAMESPACE
- name: Run smoke tests
run: |
python scripts/smoke_tests.py --endpoint https://api.production.com/predict
notification:
needs: [model-deployment]
if: always()
runs-on: ubuntu-latest
steps:
- name: Notify Slack
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
text: |
MLOps Pipeline ${{ job.status }}!
Model ID: ${{ needs.model-training.outputs.model-id }}
Performance: ${{ needs.model-evaluation.outputs.performance-score }}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
6.2 테스트 자동화
# tests/test_model_pipeline.py
import pytest
import pandas as pd
import numpy as np
import joblib
from unittest.mock import Mock, patch
import tempfile
import os
from src.mlops_pipeline import MLOpsPipeline
from src.drift_detection import DriftDetector
class TestMLOpsPipeline:
@pytest.fixture
def sample_data(self):
"""테스트용 샘플 데이터 생성"""
np.random.seed(42)
data = {
'feature1': np.random.normal(0, 1, 1000),
'feature2': np.random.normal(0, 1, 1000),
'target': np.random.randint(0, 2, 1000)
}
return pd.DataFrame(data)
@pytest.fixture
def pipeline(self):
"""MLOps 파이프라인 인스턴스"""
return MLOpsPipeline("test_model", "test_experiment")
def test_data_validation_success(self, pipeline, sample_data):
"""데이터 검증 성공 케이스"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
sample_data.to_csv(f.name, index=False)
with patch('pandas.read_csv', return_value=sample_data):
result = pipeline.data_validation()
assert result['missing_values'] == 0
assert result['duplicate_rows'] == 0
assert result['data_shape'] == (1000, 3)
os.unlink(f.name)
def test_data_validation_failure(self, pipeline):
"""데이터 검증 실패 케이스"""
# 결측값이 많은 잘못된 데이터
bad_data = pd.DataFrame({
'feature1': [np.nan] * 1500, # 임계값 초과
'feature2': [1, 2, 3] * 500,
'target': [0, 1] * 750
})
with patch('pandas.read_csv', return_value=bad_data):
with pytest.raises(ValueError, match="Too many missing values"):
pipeline.data_validation()
def test_feature_engineering(self, pipeline, sample_data):
"""피처 엔지니어링 테스트"""
with patch('pandas.read_csv', return_value=sample_data):
result = pipeline.feature_engineering()
assert result['features_created'] == 4
# 생성된 피처 파일 확인
with patch('pandas.read_csv') as mock_read:
mock_read.return_value = sample_data
processed_data = pd.read_csv("/data/processed/features.csv")
expected_columns = ['feature1', 'feature2', 'feature_interaction', 'feature_ratio', 'target']
# assert list(processed_data.columns) == expected_columns
def test_model_training_performance(self, pipeline, sample_data):
"""모델 훈련 성능 테스트"""
with patch('pandas.read_csv', return_value=sample_data):
with patch('mlflow.start_run'):
with patch('mlflow.log_params'):
with patch('mlflow.log_metrics'):
with patch('mlflow.sklearn.log_model'):
result = pipeline.model_training()
# 성능 메트릭 확인
assert 'accuracy' in result
assert 'precision' in result
assert 'recall' in result
assert 0 <= result['accuracy'] <= 1
def test_model_validation_threshold(self, pipeline):
"""모델 검증 임계값 테스트"""
# 성능이 낮은 모델 시뮬레이션
mock_model = Mock()
mock_model.predict.return_value = np.array([0] * 100) # 모든 예측을 0으로
validation_data = pd.DataFrame({
'feature1': np.random.normal(0, 1, 100),
'feature2': np.random.normal(0, 1, 100),
'target': np.array([1] * 100) # 실제 값은 모두 1
})
with patch('joblib.load', return_value=mock_model):
with patch('pandas.read_csv', return_value=validation_data):
with pytest.raises(ValueError, match="Model performance below threshold"):
pipeline.model_validation()
class TestDriftDetection:
@pytest.fixture
def reference_data(self):
"""참조 데이터"""
np.random.seed(42)
return pd.DataFrame({
'feature1': np.random.normal(0, 1, 1000),
'feature2': np.random.normal(0, 1, 1000),
'category': np.random.choice(['A', 'B', 'C'], 1000)
})
@pytest.fixture
def drift_detector(self, reference_data):
"""드리프트 감지기"""
return DriftDetector(reference_data)
def test_no_drift_detection(self, drift_detector, reference_data):
"""드리프트 없는 경우 테스트"""
# 참조 데이터와 동일한 분포의 새 데이터
new_data = pd.DataFrame({
'feature1': np.random.normal(0, 1, 500),
'feature2': np.random.normal(0, 1, 500),
'category': np.random.choice(['A', 'B', 'C'], 500)
})
drift_results = drift_detector.detect_covariate_shift(new_data)
# 드리프트가 감지되지 않아야 함
for feature, result in drift_results.items():
assert not result['drift_detected']
def test_drift_detection(self, drift_detector):
"""드리프트 있는 경우 테스트"""
# 분포가 다른 새 데이터
new_data = pd.DataFrame({
'feature1': np.random.normal(2, 1, 500), # 평균이 다름
'feature2': np.random.normal(0, 3, 500), # 분산이 다름
'category': np.random.choice(['A', 'B', 'D'], 500) # 새로운 카테고리
})
drift_results = drift_detector.detect_covariate_shift(new_data)
# 수치형 변수에서 드리프트가 감지되어야 함
assert drift_results['feature1']['drift_detected']
assert drift_results['feature2']['drift_detected']
def test_concept_drift_detection(self, drift_detector, reference_data):
"""개념 드리프트 감지 테스트"""
# 성능이 저하된 모델 시뮬레이션
mock_model = Mock()
mock_model.predict.side_effect = [
np.array([1] * len(reference_data)), # 참조 데이터에서는 좋은 성능
np.array([0] * 500) # 새 데이터에서는 나쁜 성능
]
new_data = pd.DataFrame({
'feature1': np.random.normal(0, 1, 500),
'feature2': np.random.normal(0, 1, 500)
})
new_labels = pd.Series([1] * 500) # 실제 라벨은 모두 1
concept_drift = drift_detector.detect_concept_drift(new_data, new_labels, mock_model)
assert concept_drift['concept_drift_detected']
assert concept_drift['performance_drop'] > 0.05
# 통합 테스트
class TestMLOpsIntegration:
def test_end_to_end_pipeline(self):
"""전체 파이프라인 통합 테스트"""
# 테스트 데이터 생성
np.random.seed(42)
train_data = pd.DataFrame({
'feature1': np.random.normal(0, 1, 1000),
'feature2': np.random.normal(0, 1, 1000),
'target': np.random.randint(0, 2, 1000)
})
with tempfile.TemporaryDirectory() as tmpdir:
# 임시 데이터 파일 생성
train_path = os.path.join(tmpdir, 'train.csv')
train_data.to_csv(train_path, index=False)
# 파이프라인 실행
pipeline = MLOpsPipeline("integration_test", "integration_experiment")
with patch('pandas.read_csv', return_value=train_data):
with patch('mlflow.start_run'):
with patch('mlflow.log_params'):
with patch('mlflow.log_metrics'):
with patch('mlflow.sklearn.log_model'):
# 각 단계 순차 실행
validation_result = pipeline.data_validation()
feature_result = pipeline.feature_engineering()
training_result = pipeline.model_training()
# 결과 검증
assert validation_result is not None
assert feature_result['features_created'] > 0
assert training_result['accuracy'] > 0
@pytest.mark.performance
def test_model_inference_latency(self):
"""모델 추론 지연 시간 테스트"""
import time
from src.custom_predictor import CustomPredictor
# 더미 모델 생성
mock_model = Mock()
mock_model.predict.return_value = np.array([1])
mock_model.predict_proba.return_value = np.array([[0.3, 0.7]])
with patch('joblib.load', return_value=mock_model):
predictor = CustomPredictor("/fake/model/path")
# 지연 시간 측정
test_input = {'feature1': 0.5, 'feature2': -0.3}
start_time = time.time()
result = predictor.predict(test_input)
end_time = time.time()
latency = end_time - start_time
# 지연 시간이 100ms 이하여야 함
assert latency < 0.1
assert result['status'] == 'success'
# pytest 실행 설정
if __name__ == "__main__":
pytest.main([
"-v",
"--tb=short",
"--cov=src",
"--cov-report=html",
"--cov-report=term-missing"
])
7. 모델 거버넌스 및 규제 준수
7.1 모델 거버넌스 프레임워크
# model_governance.py
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from datetime import datetime
import json
import pandas as pd
from enum import Enum
class ModelRisk(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class ModelStage(Enum):
DEVELOPMENT = "development"
TESTING = "testing"
STAGING = "staging"
PRODUCTION = "production"
DEPRECATED = "deprecated"
@dataclass
class ModelMetadata:
"""모델 메타데이터 클래스"""
model_id: str
model_name: str
version: str
owner: str
business_unit: str
use_case: str
risk_level: ModelRisk
stage: ModelStage
created_date: datetime
last_updated: datetime
# 기술적 정보
algorithm_type: str
framework: str
features: List[str]
target_variable: str
# 성능 정보
training_accuracy: float
validation_accuracy: float
test_accuracy: float
# 데이터 정보
training_data_source: str
training_data_size: int
data_lineage: Dict[str, Any]
# 규제 정보
regulatory_requirements: List[str]
ethical_considerations: List[str]
bias_assessment: Dict[str, Any]
# 승인 정보
approvals: List[Dict[str, Any]]
def to_dict(self):
"""딕셔너리로 변환"""
return {
"model_id": self.model_id,
"model_name": self.model_name,
"version": self.version,
"owner": self.owner,
"business_unit": self.business_unit,
"use_case": self.use_case,
"risk_level": self.risk_level.value,
"stage": self.stage.value,
"created_date": self.created_date.isoformat(),
"last_updated": self.last_updated.isoformat(),
"algorithm_type": self.algorithm_type,
"framework": self.framework,
"features": self.features,
"target_variable": self.target_variable,
"training_accuracy": self.training_accuracy,
"validation_accuracy": self.validation_accuracy,
"test_accuracy": self.test_accuracy,
"training_data_source": self.training_data_source,
"training_data_size": self.training_data_size,
"data_lineage": self.data_lineage,
"regulatory_requirements": self.regulatory_requirements,
"ethical_considerations": self.ethical_considerations,
"bias_assessment": self.bias_assessment,
"approvals": self.approvals
}
class ModelGovernanceFramework:
def __init__(self):
self.models_registry = {}
self.governance_policies = self._load_governance_policies()
self.risk_thresholds = self._load_risk_thresholds()
def _load_governance_policies(self) -> Dict:
"""거버넌스 정책 로드"""
return {
"approval_required_stages": [ModelStage.STAGING, ModelStage.PRODUCTION],
"mandatory_reviews": {
ModelRisk.HIGH: ["technical", "business", "legal"],
ModelRisk.CRITICAL: ["technical", "business", "legal", "executive"]
},
"documentation_requirements": [
"model_card",
"risk_assessment",
"bias_analysis",
"performance_report"
],
"monitoring_requirements": {
"performance_checks": "daily",
"drift_detection": "weekly",
"bias_monitoring": "monthly"
}
}
def _load_risk_thresholds(self) -> Dict:
"""리스크 임계값 로드"""
return {
"performance_degradation": {
ModelRisk.LOW: 0.1,
ModelRisk.MEDIUM: 0.05,
ModelRisk.HIGH: 0.03,
ModelRisk.CRITICAL: 0.01
},
"bias_threshold": {
"demographic_parity": 0.05,
"equalized_odds": 0.05,
"statistical_parity": 0.05
},
"drift_threshold": {
ModelRisk.LOW: 0.2,
ModelRisk.MEDIUM: 0.15,
ModelRisk.HIGH: 0.1,
ModelRisk.CRITICAL: 0.05
}
}
def register_model(self, metadata: ModelMetadata) -> str:
"""모델 등록"""
# 필수 정보 검증
self._validate_model_metadata(metadata)
# 리스크 평가
risk_assessment = self._assess_model_risk(metadata)
metadata.risk_level = risk_assessment["risk_level"]
# 거버넌스 정책 적용
required_approvals = self._get_required_approvals(metadata)
# 등록
self.models_registry[metadata.model_id] = metadata
return metadata.model_id
def _validate_model_metadata(self, metadata: ModelMetadata):
"""모델 메타데이터 검증"""
required_fields = [
'model_name', 'owner', 'use_case', 'algorithm_type',
'features', 'target_variable', 'training_data_source'
]
for field in required_fields:
if not getattr(metadata, field):
raise ValueError(f"Required field '{field}' is missing")
# 성능 메트릭 검증
if metadata.training_accuracy < 0 or metadata.training_accuracy > 1:
raise ValueError("Training accuracy must be between 0 and 1")
def _assess_model_risk(self, metadata: ModelMetadata) -> Dict:
"""모델 리스크 평가"""
risk_factors = []
# 알고리즘 복잡성
complex_algorithms = ['neural_network', 'ensemble', 'deep_learning']
if metadata.algorithm_type in complex_algorithms:
risk_factors.append("complex_algorithm")
# 사용 사례 위험도
high_risk_use_cases = ['credit_scoring', 'hiring', 'medical_diagnosis']
if any(use_case in metadata.use_case.lower() for use_case in high_risk_use_cases):
risk_factors.append("high_impact_use_case")
# 데이터 크기
if metadata.training_data_size < 1000:
risk_factors.append("small_dataset")
# 성능
if metadata.validation_accuracy < 0.8:
risk_factors.append("low_performance")
# 리스크 레벨 결정
if len(risk_factors) >= 3:
risk_level = ModelRisk.CRITICAL
elif len(risk_factors) >= 2:
risk_level = ModelRisk.HIGH
elif len(risk_factors) >= 1:
risk_level = ModelRisk.MEDIUM
else:
risk_level = ModelRisk.LOW
return {
"risk_level": risk_level,
"risk_factors": risk_factors,
"assessment_date": datetime.now()
}
def _get_required_approvals(self, metadata: ModelMetadata) -> List[str]:
"""필요한 승인 목록 조회"""
required_approvals = []
# 리스크 레벨별 승인 요구사항
if metadata.risk_level in self.governance_policies["mandatory_reviews"]:
required_approvals.extend(
self.governance_policies["mandatory_reviews"][metadata.risk_level]
)
# 스테이지별 승인 요구사항
if metadata.stage in self.governance_policies["approval_required_stages"]:
if "technical" not in required_approvals:
required_approvals.append("technical")
return required_approvals
def promote_model(self, model_id: str, target_stage: ModelStage,
approver: str, approval_notes: str = "") -> bool:
"""모델 스테이지 승격"""
if model_id not in self.models_registry:
raise ValueError(f"Model {model_id} not found")
model = self.models_registry[model_id]
# 승격 전 검증
validation_result = self._validate_promotion(model, target_stage)
if not validation_result["valid"]:
raise ValueError(f"Promotion validation failed: {validation_result['reasons']}")
# 승인 기록
approval = {
"stage": target_stage.value,
"approver": approver,
"approval_date": datetime.now().isoformat(),
"notes": approval_notes
}
model.approvals.append(approval)
# 스테이지 업데이트
model.stage = target_stage
model.last_updated = datetime.now()
return True
def _validate_promotion(self, model: ModelMetadata, target_stage: ModelStage) -> Dict:
"""승격 검증"""
reasons = []
# 필요한 승인 확인
required_approvals = self._get_required_approvals(model)
current_approvals = [approval["approver"] for approval in model.approvals]
missing_approvals = set(required_approvals) - set(current_approvals)
if missing_approvals:
reasons.append(f"Missing approvals: {list(missing_approvals)}")
# 성능 임계값 확인
if target_stage == ModelStage.PRODUCTION:
if model.validation_accuracy < 0.8:
reasons.append("Performance below production threshold")
# 문서 요구사항 확인
# (실제로는 문서 시스템과 연동하여 확인)
return {
"valid": len(reasons) == 0,
"reasons": reasons
}
def generate_model_card(self, model_id: str) -> Dict:
"""모델 카드 생성"""
if model_id not in self.models_registry:
raise ValueError(f"Model {model_id} not found")
model = self.models_registry[model_id]
model_card = {
"model_details": {
"name": model.model_name,
"version": model.version,
"date": model.created_date.isoformat(),
"type": model.algorithm_type,
"paper_or_reference": "",
"license": "",
"feedback": f"mailto:{model.owner}"
},
"intended_use": {
"primary_intended_uses": model.use_case,
"primary_intended_users": model.business_unit,
"out_of_scope_use_cases": []
},
"factors": {
"relevant_factors": model.features,
"evaluation_factors": []
},
"metrics": {
"model_performance_measures": {
"training_accuracy": model.training_accuracy,
"validation_accuracy": model.validation_accuracy,
"test_accuracy": model.test_accuracy
},
"decision_thresholds": {},
"variation_approaches": []
},
"evaluation_data": {
"dataset": model.training_data_source,
"motivation": "",
"preprocessing": model.data_lineage
},
"training_data": {
"dataset": model.training_data_source,
"motivation": "",
"preprocessing": model.data_lineage
},
"quantitative_analyses": {
"unitary_results": {},
"intersectional_results": {}
},
"ethical_considerations": model.ethical_considerations,
"caveats_and_recommendations": model.regulatory_requirements
}
return model_card
class BiasAuditor:
"""편향성 감사 클래스"""
def __init__(self):
self.bias_metrics = [
"demographic_parity",
"equalized_odds",
"statistical_parity",
"individual_fairness"
]
def audit_bias(self, model, X_test: pd.DataFrame, y_test: pd.Series,
sensitive_attributes: List[str]) -> Dict:
"""편향성 감사 수행"""
results = {}
predictions = model.predict(X_test)
for attribute in sensitive_attributes:
if attribute in X_test.columns:
attribute_results = {}
# 그룹별 분석
groups = X_test[attribute].unique()
for metric in self.bias_metrics:
if metric == "demographic_parity":
score = self._calculate_demographic_parity(
X_test, predictions, attribute, groups
)
elif metric == "equalized_odds":
score = self._calculate_equalized_odds(
X_test, y_test, predictions, attribute, groups
)
# 다른 메트릭들도 구현...
attribute_results[metric] = score
results[attribute] = attribute_results
return results
def _calculate_demographic_parity(self, X: pd.DataFrame, predictions: np.ndarray,
attribute: str, groups: List) -> Dict:
"""인구통계학적 동등성 계산"""
positive_rates = {}
for group in groups:
group_mask = X[attribute] == group
group_predictions = predictions[group_mask]
positive_rate = np.mean(group_predictions == 1)
positive_rates[group] = positive_rate
# 최대 차이 계산
rates = list(positive_rates.values())
max_difference = max(rates) - min(rates)
return {
"positive_rates": positive_rates,
"max_difference": max_difference,
"passes_threshold": max_difference < 0.05
}
def _calculate_equalized_odds(self, X: pd.DataFrame, y_true: pd.Series,
predictions: np.ndarray, attribute: str, groups: List) -> Dict:
"""동등한 기회 계산"""
group_metrics = {}
for group in groups:
group_mask = X[attribute] == group
group_y_true = y_true[group_mask]
group_predictions = predictions[group_mask]
# True Positive Rate
tpr = np.sum((group_y_true == 1) & (group_predictions == 1)) / np.sum(group_y_true == 1)
# False Positive Rate
fpr = np.sum((group_y_true == 0) & (group_predictions == 1)) / np.sum(group_y_true == 0)
group_metrics[group] = {"tpr": tpr, "fpr": fpr}
# 그룹 간 차이 계산
tpr_values = [metrics["tpr"] for metrics in group_metrics.values()]
fpr_values = [metrics["fpr"] for metrics in group_metrics.values()]
tpr_difference = max(tpr_values) - min(tpr_values)
fpr_difference = max(fpr_values) - min(fpr_values)
return {
"group_metrics": group_metrics,
"tpr_difference": tpr_difference,
"fpr_difference": fpr_difference,
"passes_threshold": max(tpr_difference, fpr_difference) < 0.05
}
# 사용 예제
if __name__ == "__main__":
# 거버넌스 프레임워크 초기화
governance = ModelGovernanceFramework()
# 모델 메타데이터 생성
model_metadata = ModelMetadata(
model_id="credit_model_v1",
model_name="Credit Risk Assessment Model",
version="1.0",
owner="data-science-team@company.com",
business_unit="Consumer Banking",
use_case="Credit scoring for personal loans",
risk_level=ModelRisk.HIGH, # 초기값, 평가 후 업데이트됨
stage=ModelStage.DEVELOPMENT,
created_date=datetime.now(),
last_updated=datetime.now(),
algorithm_type="random_forest",
framework="scikit-learn",
features=["income", "credit_history", "employment_status", "loan_amount"],
target_variable="default_risk",
training_accuracy=0.85,
validation_accuracy=0.83,
test_accuracy=0.82,
training_data_source="customer_database_2024",
training_data_size=50000,
data_lineage={"source": "CRM", "transformations": ["normalization", "feature_engineering"]},
regulatory_requirements=["Fair Credit Reporting Act", "Equal Credit Opportunity Act"],
ethical_considerations=["Avoid discrimination based on protected attributes"],
bias_assessment={},
approvals=[]
)
# 모델 등록
model_id = governance.register_model(model_metadata)
print(f"Model registered with ID: {model_id}")
# 모델 카드 생성
model_card = governance.generate_model_card(model_id)
print("Model card generated")
# 편향성 감사 (예제)
auditor = BiasAuditor()
# bias_results = auditor.audit_bias(model, X_test, y_test, ["gender", "age_group"])
8. 실무 적용 가이드
8.1 조직별 MLOps 도입 전략
# mlops_adoption_strategy.py
from enum import Enum
from typing import Dict, List
from dataclasses import dataclass
class OrganizationType(Enum):
STARTUP = "startup"
SCALE_UP = "scale_up"
ENTERPRISE = "enterprise"
CONSULTING = "consulting"
class MLOpsMaturityLevel(Enum):
LEVEL_0 = "manual"
LEVEL_1 = "devops"
LEVEL_2 = "automated_training"
LEVEL_3 = "automated_deployment"
LEVEL_4 = "full_mlops"
@dataclass
class OrganizationProfile:
org_type: OrganizationType
team_size: int
ml_projects_count: int
current_maturity: MLOpsMaturityLevel
budget_range: str
technical_expertise: str
regulatory_requirements: List[str]
class MLOpsAdoptionStrategy:
def __init__(self):
self.strategy_templates = self._load_strategy_templates()
def _load_strategy_templates(self) -> Dict:
return {
OrganizationType.STARTUP: {
"priorities": ["fast_iteration", "cost_efficiency", "scalability"],
"recommended_tools": {
"experiment_tracking": "MLflow Community",
"model_serving": "FastAPI + Docker",
"monitoring": "Prometheus + Grafana",
"ci_cd": "GitHub Actions",
"infrastructure": "Cloud managed services"
},
"implementation_phases": [
{
"phase": 1,
"name": "Basic Tracking",
"duration": "2-4 weeks",
"goals": ["Implement experiment tracking", "Version control for models"],
"deliverables": ["MLflow setup", "Git workflows"]
},
{
"phase": 2,
"name": "Automated Deployment",
"duration": "4-6 weeks",
"goals": ["CI/CD pipeline", "Model serving"],
"deliverables": ["Docker containers", "API endpoints"]
},
{
"phase": 3,
"name": "Monitoring",
"duration": "3-4 weeks",
"goals": ["Model monitoring", "Performance tracking"],
"deliverables": ["Monitoring dashboard", "Alert system"]
}
]
},
OrganizationType.ENTERPRISE: {
"priorities": ["governance", "compliance", "security", "scalability"],
"recommended_tools": {
"experiment_tracking": "MLflow Enterprise",
"model_serving": "Kubernetes + KServe",
"monitoring": "Enterprise monitoring solution",
"ci_cd": "Jenkins/GitLab Enterprise",
"infrastructure": "Hybrid cloud + on-premise"
},
"implementation_phases": [
{
"phase": 1,
"name": "Governance Framework",
"duration": "8-12 weeks",
"goals": ["Establish governance", "Security policies"],
"deliverables": ["Governance framework", "Security guidelines"]
},
{
"phase": 2,
"name": "Platform Setup",
"duration": "12-16 weeks",
"goals": ["MLOps platform", "Integration with existing systems"],
"deliverables": ["MLOps platform", "Integration APIs"]
},
{
"phase": 3,
"name": "Pilot Projects",
"duration": "6-8 weeks",
"goals": ["Pilot implementation", "Team training"],
"deliverables": ["Pilot projects", "Training materials"]
},
{
"phase": 4,
"name": "Enterprise Rollout",
"duration": "16-24 weeks",
"goals": ["Organization-wide adoption"],
"deliverables": ["Enterprise deployment", "Documentation"]
}
]
}
}
def generate_adoption_plan(self, profile: OrganizationProfile) -> Dict:
"""조직 프로필 기반 도입 계획 생성"""
template = self.strategy_templates.get(profile.org_type, self.strategy_templates[OrganizationType.STARTUP])
# 조직 특성에 따른 계획 커스터마이징
customized_plan = self._customize_plan(template, profile)
return {
"organization_profile": profile.__dict__,
"adoption_strategy": customized_plan,
"estimated_timeline": self._calculate_timeline(customized_plan),
"resource_requirements": self._estimate_resources(profile, customized_plan),
"success_metrics": self._define_success_metrics(profile),
"risk_mitigation": self._identify_risks_and_mitigations(profile)
}
def _customize_plan(self, template: Dict, profile: OrganizationProfile) -> Dict:
"""프로필에 따른 계획 커스터마이징"""
customized = template.copy()
# 팀 크기에 따른 조정
if profile.team_size < 5:
customized["recommended_tools"]["infrastructure"] = "Serverless/managed services"
elif profile.team_size > 50:
customized["recommended_tools"]["infrastructure"] = "Self-managed Kubernetes"
# 규제 요구사항에 따른 조정
if "GDPR" in profile.regulatory_requirements:
customized["additional_requirements"] = [
"Data privacy controls",
"Audit logging",
"Right to explanation"
]
if "SOX" in profile.regulatory_requirements:
customized["additional_requirements"] = customized.get("additional_requirements", []) + [
"Change management controls",
"Segregation of duties",
"Financial data protection"
]
return customized
def _calculate_timeline(self, plan: Dict) -> Dict:
"""타임라인 계산"""
phases = plan.get("implementation_phases", [])
total_weeks = sum(self._parse_duration(phase["duration"]) for phase in phases)
return {
"total_duration_weeks": total_weeks,
"estimated_completion": f"{total_weeks} weeks",
"phase_breakdown": [
{
"phase": phase["phase"],
"name": phase["name"],
"duration": phase["duration"]
}
for phase in phases
]
}
def _parse_duration(self, duration_str: str) -> int:
"""기간 문자열을 주 단위로 파싱"""
# "2-4 weeks" -> 3 (평균값)
import re
match = re.search(r"(\d+)-(\d+)", duration_str)
if match:
return (int(match.group(1)) + int(match.group(2))) // 2
else:
single_match = re.search(r"(\d+)", duration_str)
return int(single_match.group(1)) if single_match else 4
def _estimate_resources(self, profile: OrganizationProfile, plan: Dict) -> Dict:
"""리소스 요구사항 추정"""
base_resources = {
"human_resources": {
"ml_engineers": max(1, profile.team_size // 3),
"devops_engineers": max(1, profile.team_size // 5),
"data_engineers": max(1, profile.team_size // 4),
"project_manager": 1 if profile.team_size > 10 else 0.5
},
"infrastructure_costs": {
"monthly_estimate": self._estimate_infrastructure_cost(profile),
"setup_cost": self._estimate_setup_cost(profile)
},
"training_requirements": {
"mlops_fundamentals": f"{profile.team_size} people",
"tool_specific_training": "Key team members",
"estimated_training_hours": profile.team_size * 40
}
}
return base_resources
def _estimate_infrastructure_cost(self, profile: OrganizationProfile) -> str:
"""인프라 비용 추정"""
if profile.org_type == OrganizationType.STARTUP:
return "$500-2000/month"
elif profile.org_type == OrganizationType.SCALE_UP:
return "$2000-10000/month"
else: # ENTERPRISE
return "$10000-50000/month"
def _estimate_setup_cost(self, profile: OrganizationProfile) -> str:
"""초기 설정 비용 추정"""
if profile.org_type == OrganizationType.STARTUP:
return "$5000-20000"
elif profile.org_type == OrganizationType.SCALE_UP:
return "$20000-100000"
else: # ENTERPRISE
return "$100000-500000"
def _define_success_metrics(self, profile: OrganizationProfile) -> List[Dict]:
"""성공 메트릭 정의"""
base_metrics = [
{
"metric": "Model deployment frequency",
"target": "Weekly deployments",
"measurement": "Number of model deployments per week"
},
{
"metric": "Time to production",
"target": "< 2 weeks from model training to production",
"measurement": "Average time from model completion to production deployment"
},
{
"metric": "Model performance monitoring",
"target": "100% of production models monitored",
"measurement": "Percentage of models with active monitoring"
}
]
if profile.org_type == OrganizationType.ENTERPRISE:
base_metrics.extend([
{
"metric": "Compliance adherence",
"target": "100% compliance with regulatory requirements",
"measurement": "Audit score"
},
{
"metric": "Model governance coverage",
"target": "100% of models with proper documentation",
"measurement": "Percentage of models with complete model cards"
}
])
return base_metrics
def _identify_risks_and_mitigations(self, profile: OrganizationProfile) -> List[Dict]:
"""리스크 및 완화 방안 식별"""
risks = [
{
"risk": "Team skill gaps",
"impact": "High",
"probability": "Medium",
"mitigation": "Comprehensive training program and external consulting"
},
{
"risk": "Tool integration complexity",
"impact": "Medium",
"probability": "High",
"mitigation": "Phased implementation and proof of concepts"
},
{
"risk": "Resistance to change",
"impact": "Medium",
"probability": "Medium",
"mitigation": "Change management program and success showcases"
}
]
if profile.org_type == OrganizationType.ENTERPRISE:
risks.extend([
{
"risk": "Security and compliance gaps",
"impact": "Critical",
"probability": "Medium",
"mitigation": "Security-first design and compliance review gates"
},
{
"risk": "Legacy system integration",
"impact": "High",
"probability": "High",
"mitigation": "API-first approach and gradual migration"
}
])
return risks
# ROI 계산기
class MLOpsROICalculator:
def __init__(self):
self.benefit_factors = {
"faster_deployment": {"time_saved_hours": 40, "hourly_rate": 100},
"improved_quality": {"error_reduction": 0.3, "error_cost": 10000},
"automation_savings": {"manual_hours_saved": 20, "hourly_rate": 80},
"compliance_benefits": {"audit_cost_reduction": 50000}
}
def calculate_roi(self, profile: OrganizationProfile, implementation_cost: float,
timeframe_months: int = 12) -> Dict:
"""ROI 계산"""
# 비용 계산
total_costs = implementation_cost
# 혜택 계산
monthly_benefits = 0
# 배포 속도 향상
deployments_per_month = profile.ml_projects_count * 2 # 가정
time_saved = deployments_per_month * self.benefit_factors["faster_deployment"]["time_saved_hours"]
monthly_benefits += time_saved * self.benefit_factors["faster_deployment"]["hourly_rate"]
# 품질 향상
error_reduction_value = (
profile.ml_projects_count *
self.benefit_factors["improved_quality"]["error_reduction"] *
self.benefit_factors["improved_quality"]["error_cost"]
)
monthly_benefits += error_reduction_value / 12 # 연간 혜택을 월간으로 변환
# 자동화 절약
automation_savings = (
profile.team_size *
self.benefit_factors["automation_savings"]["manual_hours_saved"] *
self.benefit_factors["automation_savings"]["hourly_rate"]
)
monthly_benefits += automation_savings
# 총 혜택
total_benefits = monthly_benefits * timeframe_months
# ROI 계산
roi_percentage = ((total_benefits - total_costs) / total_costs) * 100
payback_period = total_costs / monthly_benefits if monthly_benefits > 0 else float('inf')
return {
"total_costs": total_costs,
"total_benefits": total_benefits,
"net_benefit": total_benefits - total_costs,
"roi_percentage": roi_percentage,
"payback_period_months": payback_period,
"monthly_benefits": monthly_benefits,
"benefit_breakdown": {
"faster_deployment": time_saved * self.benefit_factors["faster_deployment"]["hourly_rate"],
"improved_quality": error_reduction_value / 12,
"automation_savings": automation_savings
}
}
# 사용 예제
if __name__ == "__main__":
# 조직 프로필 생성
startup_profile = OrganizationProfile(
org_type=OrganizationType.STARTUP,
team_size=8,
ml_projects_count=3,
current_maturity=MLOpsMaturityLevel.LEVEL_0,
budget_range="$50k-100k",
technical_expertise="medium",
regulatory_requirements=[]
)
enterprise_profile = OrganizationProfile(
org_type=OrganizationType.ENTERPRISE,
team_size=50,
ml_projects_count=15,
current_maturity=MLOpsMaturityLevel.LEVEL_1,
budget_range="$500k-1M",
technical_expertise="high",
regulatory_requirements=["SOX", "GDPR", "HIPAA"]
)
# 도입 전략 생성
strategy_generator = MLOpsAdoptionStrategy()
startup_plan = strategy_generator.generate_adoption_plan(startup_profile)
print("Startup MLOps Adoption Plan:")
print(json.dumps(startup_plan, indent=2, default=str))
# ROI 계산
roi_calculator = MLOpsROICalculator()
startup_roi = roi_calculator.calculate_roi(startup_profile, 75000, 12)
print("\nStartup ROI Analysis:")
print(json.dumps(startup_roi, indent=2))
8.2 문제 해결 가이드
# MLOps 문제 해결 가이드
## 일반적인 문제와 해결책
### 1. 모델 성능 저하
**증상**: 프로덕션 모델의 성능이 시간이 지나면서 저하
**원인**: 데이터 드리프트, 개념 드리프트, 환경 변화
**해결책**:
- 실시간 모니터링 시스템 구축
- 자동 재훈련 파이프라인 설정
- A/B 테스트를 통한 점진적 배포
### 2. 배포 파이프라인 실패
**증상**: CI/CD 파이프라인에서 모델 배포 실패
**원인**: 환경 불일치, 의존성 충돌, 리소스 부족
**해결책**:
- Docker 컨테이너 사용으로 환경 일관성 확보
- 의존성 버전 고정
- 리소스 모니터링 및 오토스케일링
### 3. 데이터 파이프라인 오류
**증상**: 훈련/추론용 데이터 파이프라인에서 오류 발생
**원인**: 데이터 스키마 변경, 소스 시스템 변경, 데이터 품질 문제
**해결책**:
- 데이터 검증 로직 강화
- 스키마 진화 처리 메커니즘
- 데이터 품질 모니터링
마무리
MLOps는 단순한 기술 도입이 아닌 조직 전체의 문화적 변화를 요구합니다. 2026년 현재, AI 모델이 비즈니스의 핵심이 되면서 MLOps의 중요성은 더욱 커지고 있습니다.
성공적인 MLOps 구축을 위한 핵심 원칙:
- 점진적 도입: 한 번에 모든 것을 바꾸지 말고 단계적으로 진행
- 자동화 우선: 반복 작업은 최대한 자동화
- 모니터링 필수: 모든 단계에서 모니터링과 로깅
- 협업 문화: 데이터 과학팀과 엔지니어링 팀의 긴밀한 협력
- 지속적 개선: 피드백을 바탕으로 한 지속적인 프로세스 개선
MLOps는 기술과 프로세스, 그리고 사람이 조화롭게 어우러져야 진정한 가치를 창출할 수 있습니다. 조직의 상황과 목표에 맞는 MLOps 전략을 수립하고, 단계적으로 실행해 나가시기 바랍니다.