MLOps 실전 구축 가이드 - 2026년 엔터프라이즈 AI 운영 전략

MLOpsMachine LearningAIDevOpsMLflowKubeflow데이터 엔지니어링모델 배포AI 운영

MLOps 실전 구축 가이드 - 2026년 엔터프라이즈 AI 운영 전략

1. MLOps 개념과 필요성

1.1 MLOps란 무엇인가?

MLOps(Machine Learning Operations)는 머신러닝 모델의 개발, 배포, 운영을 자동화하고 표준화하는 방법론입니다.

# MLOps 성숙도 단계
class MLOpsMaturity:
    def __init__(self):
        self.stages = {
            "Level 0": "Manual Process",
            "Level 1": "ML Pipeline Automation",
            "Level 2": "CI/CD Pipeline Automation",
            "Level 3": "Advanced MLOps with Monitoring"
        }

    def assess_maturity(self, organization):
        # 조직의 MLOps 성숙도 평가
        criteria = {
            "automation": self.check_automation_level(organization),
            "monitoring": self.check_monitoring_capabilities(organization),
            "governance": self.check_ml_governance(organization),
            "collaboration": self.check_team_collaboration(organization)
        }
        return self.calculate_maturity_score(criteria)

1.2 전통적인 ML과 MLOps의 차이점

전통적인 MLMLOps
일회성 모델 개발지속적인 모델 개선
수동 배포자동화된 배포 파이프라인
모델 성능 추적 부재실시간 모니터링
실험 재현 어려움실험 추적 및 버전 관리

2. MLOps 아키텍처 설계

2.1 핵심 컴포넌트 구조

# mlops-architecture.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: mlops-architecture
data:
  components: |
    # 데이터 파이프라인
    data_pipeline:
      ingestion: Apache Kafka, Apache Airflow
      preprocessing: Apache Spark, Dask
      storage: MinIO, AWS S3, Delta Lake

    # 모델 개발 환경
    development:
      experimentation: Jupyter, MLflow
      training: Kubernetes, Ray
      versioning: DVC, Git LFS

    # 모델 서빙
    serving:
      inference: Seldon Core, KServe
      api_gateway: Istio, Ambassador
      monitoring: Prometheus, Grafana

    # 모델 관리
    management:
      registry: MLflow Model Registry
      metadata: ML Metadata
      governance: Apache Atlas

2.2 MLOps 파이프라인 구성

# mlops_pipeline.py
import mlflow
import mlflow.sklearn
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import joblib
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score

class MLOpsPipeline:
    def __init__(self, model_name, experiment_name):
        self.model_name = model_name
        self.experiment_name = experiment_name
        mlflow.set_experiment(experiment_name)

    def data_validation(self, **context):
        """데이터 품질 검증"""
        data = pd.read_csv("/data/input/training_data.csv")

        # 데이터 품질 체크
        quality_checks = {
            "missing_values": data.isnull().sum().sum(),
            "duplicate_rows": data.duplicated().sum(),
            "data_shape": data.shape,
            "data_types": data.dtypes.to_dict()
        }

        # 품질 임계값 검증
        if quality_checks["missing_values"] > 1000:
            raise ValueError("Too many missing values")

        # MLflow에 데이터 메트릭 로깅
        with mlflow.start_run():
            mlflow.log_metrics(quality_checks)

        return quality_checks

    def feature_engineering(self, **context):
        """피처 엔지니어링"""
        data = pd.read_csv("/data/input/training_data.csv")

        # 피처 생성
        data['feature_interaction'] = data['feature1'] * data['feature2']
        data['feature_ratio'] = data['feature1'] / (data['feature2'] + 1e-8)

        # 피처 선택
        important_features = ['feature1', 'feature2', 'feature_interaction', 'feature_ratio']
        processed_data = data[important_features + ['target']]

        # 처리된 데이터 저장
        processed_data.to_csv("/data/processed/features.csv", index=False)

        return {"features_created": len(important_features)}

    def model_training(self, **context):
        """모델 훈련"""
        data = pd.read_csv("/data/processed/features.csv")

        X = data.drop('target', axis=1)
        y = data['target']

        with mlflow.start_run():
            # 하이퍼파라미터
            params = {
                "n_estimators": 100,
                "max_depth": 10,
                "random_state": 42
            }
            mlflow.log_params(params)

            # 모델 훈련
            model = RandomForestClassifier(**params)
            model.fit(X, y)

            # 모델 평가
            predictions = model.predict(X)
            metrics = {
                "accuracy": accuracy_score(y, predictions),
                "precision": precision_score(y, predictions, average='weighted'),
                "recall": recall_score(y, predictions, average='weighted')
            }
            mlflow.log_metrics(metrics)

            # 모델 저장
            mlflow.sklearn.log_model(model, "model")
            joblib.dump(model, "/models/trained_model.pkl")

            return metrics

    def model_validation(self, **context):
        """모델 검증"""
        model = joblib.load("/models/trained_model.pkl")
        validation_data = pd.read_csv("/data/validation/test_data.csv")

        X_val = validation_data.drop('target', axis=1)
        y_val = validation_data['target']

        # 모델 성능 평가
        predictions = model.predict(X_val)
        validation_metrics = {
            "val_accuracy": accuracy_score(y_val, predictions),
            "val_precision": precision_score(y_val, predictions, average='weighted'),
            "val_recall": recall_score(y_val, predictions, average='weighted')
        }

        # 성능 임계값 검증
        if validation_metrics["val_accuracy"] < 0.8:
            raise ValueError("Model performance below threshold")

        with mlflow.start_run():
            mlflow.log_metrics(validation_metrics)

        return validation_metrics

    def model_deployment(self, **context):
        """모델 배포"""
        # MLflow Model Registry에 모델 등록
        model_uri = f"runs:/{context['task_instance'].xcom_pull(task_ids='model_training')}/model"

        model_version = mlflow.register_model(
            model_uri=model_uri,
            name=self.model_name
        )

        # 프로덕션으로 승격
        client = mlflow.tracking.MlflowClient()
        client.transition_model_version_stage(
            name=self.model_name,
            version=model_version.version,
            stage="Production"
        )

        return {"model_version": model_version.version}

# Airflow DAG 정의
default_args = {
    'owner': 'mlops-team',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'mlops_pipeline',
    default_args=default_args,
    description='MLOps training and deployment pipeline',
    schedule_interval='@daily',
    catchup=False
)

pipeline = MLOpsPipeline("production_model", "model_experiment")

# 태스크 정의
data_validation_task = PythonOperator(
    task_id='data_validation',
    python_callable=pipeline.data_validation,
    dag=dag
)

feature_engineering_task = PythonOperator(
    task_id='feature_engineering',
    python_callable=pipeline.feature_engineering,
    dag=dag
)

model_training_task = PythonOperator(
    task_id='model_training',
    python_callable=pipeline.model_training,
    dag=dag
)

model_validation_task = PythonOperator(
    task_id='model_validation',
    python_callable=pipeline.model_validation,
    dag=dag
)

model_deployment_task = PythonOperator(
    task_id='model_deployment',
    python_callable=pipeline.model_deployment,
    dag=dag
)

# 태스크 의존성 설정
data_validation_task >> feature_engineering_task >> model_training_task >> model_validation_task >> model_deployment_task

3. MLflow를 활용한 실험 관리

3.1 MLflow 설정 및 구성

# mlflow_setup.py
import mlflow
import mlflow.tracking
from mlflow.tracking.client import MlflowClient
import os

class MLflowManager:
    def __init__(self, tracking_uri="http://localhost:5000"):
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()

    def setup_experiment(self, experiment_name, artifact_location=None):
        """실험 환경 설정"""
        try:
            experiment_id = mlflow.create_experiment(
                name=experiment_name,
                artifact_location=artifact_location
            )
        except mlflow.exceptions.MlflowException:
            experiment = mlflow.get_experiment_by_name(experiment_name)
            experiment_id = experiment.experiment_id

        mlflow.set_experiment(experiment_name)
        return experiment_id

    def log_model_comparison(self, models_dict, test_data):
        """여러 모델 성능 비교"""
        X_test, y_test = test_data

        comparison_results = []

        for model_name, model in models_dict.items():
            with mlflow.start_run(run_name=f"{model_name}_comparison"):
                # 모델 예측
                predictions = model.predict(X_test)

                # 메트릭 계산
                metrics = {
                    "accuracy": accuracy_score(y_test, predictions),
                    "precision": precision_score(y_test, predictions, average='weighted'),
                    "recall": recall_score(y_test, predictions, average='weighted'),
                    "f1_score": f1_score(y_test, predictions, average='weighted')
                }

                # MLflow 로깅
                mlflow.log_params(model.get_params())
                mlflow.log_metrics(metrics)
                mlflow.sklearn.log_model(model, f"{model_name}_model")

                # 혼동 행렬 저장
                cm = confusion_matrix(y_test, predictions)
                self.save_confusion_matrix(cm, model_name)

                comparison_results.append({
                    "model": model_name,
                    "metrics": metrics,
                    "run_id": mlflow.active_run().info.run_id
                })

        return comparison_results

    def save_confusion_matrix(self, cm, model_name):
        """혼동 행렬 시각화 저장"""
        import matplotlib.pyplot as plt
        import seaborn as sns

        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
        plt.title(f'Confusion Matrix - {model_name}')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')

        # MLflow에 아티팩트로 저장
        plt.savefig(f"confusion_matrix_{model_name}.png")
        mlflow.log_artifact(f"confusion_matrix_{model_name}.png")
        plt.close()

    def auto_hyperparameter_tuning(self, model_class, param_grid, X_train, y_train, X_val, y_val):
        """자동 하이퍼파라미터 튜닝"""
        from sklearn.model_selection import ParameterGrid

        best_score = 0
        best_params = None
        best_model = None

        for params in ParameterGrid(param_grid):
            with mlflow.start_run():
                # 모델 훈련
                model = model_class(**params)
                model.fit(X_train, y_train)

                # 검증 성능 평가
                val_predictions = model.predict(X_val)
                val_score = accuracy_score(y_val, val_predictions)

                # MLflow 로깅
                mlflow.log_params(params)
                mlflow.log_metric("validation_accuracy", val_score)
                mlflow.sklearn.log_model(model, "model")

                # 최고 성능 모델 추적
                if val_score > best_score:
                    best_score = val_score
                    best_params = params
                    best_model = model

        return best_model, best_params, best_score

# MLflow 사용 예제
mlflow_manager = MLflowManager()
experiment_id = mlflow_manager.setup_experiment("production_experiments")

# 모델 비교 실행
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression

models = {
    "RandomForest": RandomForestClassifier(n_estimators=100),
    "GradientBoosting": GradientBoostingClassifier(n_estimators=100),
    "LogisticRegression": LogisticRegression()
}

# comparison_results = mlflow_manager.log_model_comparison(models, (X_test, y_test))

3.2 모델 레지스트리 관리

# model_registry.py
class ModelRegistry:
    def __init__(self):
        self.client = MlflowClient()

    def register_model(self, model_uri, model_name, description=""):
        """모델을 레지스트리에 등록"""
        model_version = mlflow.register_model(
            model_uri=model_uri,
            name=model_name
        )

        # 모델 설명 추가
        self.client.update_model_version(
            name=model_name,
            version=model_version.version,
            description=description
        )

        return model_version

    def promote_model(self, model_name, version, stage):
        """모델을 특정 스테이지로 승격"""
        self.client.transition_model_version_stage(
            name=model_name,
            version=version,
            stage=stage
        )

    def compare_model_versions(self, model_name, version1, version2):
        """모델 버전 간 성능 비교"""
        # 버전별 메트릭 조회
        mv1 = self.client.get_model_version(model_name, version1)
        mv2 = self.client.get_model_version(model_name, version2)

        # 런 정보 조회
        run1 = self.client.get_run(mv1.run_id)
        run2 = self.client.get_run(mv2.run_id)

        comparison = {
            "version1": {
                "version": version1,
                "metrics": run1.data.metrics,
                "params": run1.data.params
            },
            "version2": {
                "version": version2,
                "metrics": run2.data.metrics,
                "params": run2.data.params
            }
        }

        return comparison

    def get_best_model(self, model_name, metric_name="accuracy"):
        """최고 성능 모델 버전 조회"""
        versions = self.client.search_model_versions(f"name='{model_name}'")

        best_version = None
        best_score = 0

        for version in versions:
            run = self.client.get_run(version.run_id)
            if metric_name in run.data.metrics:
                score = run.data.metrics[metric_name]
                if score > best_score:
                    best_score = score
                    best_version = version

        return best_version

4. Kubernetes 기반 모델 서빙

4.1 KServe를 활용한 모델 배포

# model-serving.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: sklearn-model-serving
  namespace: production
spec:
  predictor:
    sklearn:
      storageUri: s3://mlops-models/sklearn-model/
      resources:
        requests:
          cpu: "0.1"
          memory: "512Mi"
        limits:
          cpu: "1"
          memory: "1Gi"
  transformer:
    custom:
      image: "myregistry.com/feature-transformer:latest"
      env:
        - name: STORAGE_URI
          value: "s3://mlops-models/transformers/"
---
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: pytorch-model-serving
  namespace: production
spec:
  predictor:
    pytorch:
      storageUri: s3://mlops-models/pytorch-model/
      resources:
        requests:
          cpu: "0.5"
          memory: "1Gi"
          nvidia.com/gpu: "1"
        limits:
          cpu: "2"
          memory: "4Gi"
          nvidia.com/gpu: "1"
      env:
        - name: CUDA_VISIBLE_DEVICES
          value: "0"

4.2 커스텀 예측 서버 구현

# custom_predictor.py
from kserve import KServeClient, V1beta1InferenceService
import joblib
import numpy as np
import pandas as pd
from typing import Dict, List
import json
import logging

class CustomPredictor:
    def __init__(self, model_path: str):
        self.model = joblib.load(model_path)
        self.feature_names = None

    def preprocess(self, inputs: Dict) -> np.ndarray:
        """입력 데이터 전처리"""
        if isinstance(inputs, dict):
            df = pd.DataFrame([inputs])
        else:
            df = pd.DataFrame(inputs)

        # 피처 엔지니어링
        df['feature_interaction'] = df['feature1'] * df['feature2']
        df['feature_ratio'] = df['feature1'] / (df['feature2'] + 1e-8)

        # 필요한 컬럼만 선택
        required_features = ['feature1', 'feature2', 'feature_interaction', 'feature_ratio']
        processed_df = df[required_features]

        return processed_df.values

    def predict(self, inputs: Dict) -> Dict:
        """모델 예측"""
        try:
            # 전처리
            processed_inputs = self.preprocess(inputs)

            # 예측 수행
            predictions = self.model.predict(processed_inputs)
            probabilities = self.model.predict_proba(processed_inputs)

            # 결과 포맷팅
            results = {
                "predictions": predictions.tolist(),
                "probabilities": probabilities.tolist(),
                "model_version": "1.0.0",
                "status": "success"
            }

            return results

        except Exception as e:
            logging.error(f"Prediction error: {str(e)}")
            return {
                "error": str(e),
                "status": "error"
            }

    def explain(self, inputs: Dict) -> Dict:
        """모델 예측 설명"""
        try:
            # SHAP을 사용한 설명
            import shap

            processed_inputs = self.preprocess(inputs)
            explainer = shap.TreeExplainer(self.model)
            shap_values = explainer.shap_values(processed_inputs)

            explanation = {
                "shap_values": shap_values.tolist(),
                "feature_importance": dict(zip(
                    ['feature1', 'feature2', 'feature_interaction', 'feature_ratio'],
                    shap_values[0]
                )),
                "base_value": explainer.expected_value
            }

            return explanation

        except Exception as e:
            return {"error": f"Explanation error: {str(e)}"}

# Flask API 서버
from flask import Flask, request, jsonify
import time

app = Flask(__name__)
predictor = CustomPredictor("/models/trained_model.pkl")

# 모델 서빙 메트릭
class ModelMetrics:
    def __init__(self):
        self.request_count = 0
        self.error_count = 0
        self.response_times = []

    def record_request(self, response_time: float, error: bool = False):
        self.request_count += 1
        self.response_times.append(response_time)
        if error:
            self.error_count += 1

    def get_metrics(self):
        return {
            "total_requests": self.request_count,
            "error_rate": self.error_count / max(self.request_count, 1),
            "avg_response_time": np.mean(self.response_times) if self.response_times else 0,
            "p95_response_time": np.percentile(self.response_times, 95) if self.response_times else 0
        }

metrics = ModelMetrics()

@app.route('/predict', methods=['POST'])
def predict():
    start_time = time.time()

    try:
        data = request.get_json()
        result = predictor.predict(data)

        response_time = time.time() - start_time
        metrics.record_request(response_time, "error" in result)

        return jsonify(result)

    except Exception as e:
        response_time = time.time() - start_time
        metrics.record_request(response_time, True)

        return jsonify({
            "error": str(e),
            "status": "error"
        }), 500

@app.route('/explain', methods=['POST'])
def explain():
    try:
        data = request.get_json()
        explanation = predictor.explain(data)
        return jsonify(explanation)
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/health', methods=['GET'])
def health():
    return jsonify({
        "status": "healthy",
        "model_loaded": predictor.model is not None,
        "timestamp": time.time()
    })

@app.route('/metrics', methods=['GET'])
def get_metrics():
    return jsonify(metrics.get_metrics())

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

5. 모델 모니터링 시스템

5.1 데이터 드리프트 감지

# drift_detection.py
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.metrics import accuracy_score
import warnings
from typing import Dict, List, Tuple
import logging

class DriftDetector:
    def __init__(self, reference_data: pd.DataFrame, threshold: float = 0.05):
        self.reference_data = reference_data
        self.threshold = threshold
        self.feature_stats = self._compute_reference_stats()

    def _compute_reference_stats(self) -> Dict:
        """참조 데이터의 통계량 계산"""
        stats = {}
        for column in self.reference_data.columns:
            if self.reference_data[column].dtype in ['int64', 'float64']:
                stats[column] = {
                    'mean': self.reference_data[column].mean(),
                    'std': self.reference_data[column].std(),
                    'min': self.reference_data[column].min(),
                    'max': self.reference_data[column].max(),
                    'quantiles': self.reference_data[column].quantile([0.25, 0.5, 0.75]).to_dict()
                }
            else:
                stats[column] = {
                    'value_counts': self.reference_data[column].value_counts().to_dict()
                }
        return stats

    def detect_covariate_shift(self, new_data: pd.DataFrame) -> Dict:
        """공변량 시프트 감지"""
        drift_results = {}

        for column in self.reference_data.columns:
            if column in new_data.columns:
                if self.reference_data[column].dtype in ['int64', 'float64']:
                    # 수치형 변수: KS 테스트
                    ks_stat, p_value = stats.ks_2samp(
                        self.reference_data[column].dropna(),
                        new_data[column].dropna()
                    )

                    drift_results[column] = {
                        'test': 'kolmogorov_smirnov',
                        'statistic': ks_stat,
                        'p_value': p_value,
                        'drift_detected': p_value < self.threshold,
                        'severity': self._calculate_severity(p_value)
                    }
                else:
                    # 범주형 변수: Chi-square 테스트
                    ref_counts = self.reference_data[column].value_counts()
                    new_counts = new_data[column].value_counts()

                    # 공통 카테고리만 고려
                    common_categories = set(ref_counts.index) & set(new_counts.index)
                    if len(common_categories) > 1:
                        ref_aligned = [ref_counts.get(cat, 0) for cat in common_categories]
                        new_aligned = [new_counts.get(cat, 0) for cat in common_categories]

                        chi2_stat, p_value = stats.chisquare(new_aligned, ref_aligned)

                        drift_results[column] = {
                            'test': 'chi_square',
                            'statistic': chi2_stat,
                            'p_value': p_value,
                            'drift_detected': p_value < self.threshold,
                            'severity': self._calculate_severity(p_value)
                        }

        return drift_results

    def detect_concept_drift(self, new_data: pd.DataFrame, new_labels: pd.Series,
                           model) -> Dict:
        """개념 드리프트 감지"""
        # 모델 성능 비교
        ref_predictions = model.predict(self.reference_data.drop('target', axis=1, errors='ignore'))
        new_predictions = model.predict(new_data.drop('target', axis=1, errors='ignore'))

        ref_accuracy = accuracy_score(
            self.reference_data.get('target', ref_predictions),
            ref_predictions
        )
        new_accuracy = accuracy_score(new_labels, new_predictions)

        performance_drop = ref_accuracy - new_accuracy

        return {
            'reference_accuracy': ref_accuracy,
            'current_accuracy': new_accuracy,
            'performance_drop': performance_drop,
            'concept_drift_detected': performance_drop > 0.05,  # 5% 성능 저하 임계값
            'severity': 'high' if performance_drop > 0.1 else 'medium' if performance_drop > 0.05 else 'low'
        }

    def _calculate_severity(self, p_value: float) -> str:
        """드리프트 심각도 계산"""
        if p_value < 0.001:
            return 'high'
        elif p_value < 0.01:
            return 'medium'
        elif p_value < self.threshold:
            return 'low'
        else:
            return 'none'

    def generate_drift_report(self, new_data: pd.DataFrame,
                            new_labels: pd.Series = None, model=None) -> Dict:
        """종합 드리프트 리포트 생성"""
        report = {
            'timestamp': pd.Timestamp.now().isoformat(),
            'data_size': len(new_data),
            'covariate_shift': self.detect_covariate_shift(new_data)
        }

        if new_labels is not None and model is not None:
            report['concept_drift'] = self.detect_concept_drift(new_data, new_labels, model)

        # 전체 드리프트 점수 계산
        drift_scores = [
            result['drift_detected'] for result in report['covariate_shift'].values()
        ]
        report['overall_drift_score'] = sum(drift_scores) / len(drift_scores) if drift_scores else 0

        return report

# 실시간 모니터링 시스템
class RealTimeMonitor:
    def __init__(self, model, reference_data: pd.DataFrame):
        self.model = model
        self.drift_detector = DriftDetector(reference_data)
        self.prediction_buffer = []
        self.alert_thresholds = {
            'drift_score': 0.3,
            'performance_drop': 0.05,
            'error_rate': 0.1
        }

    def log_prediction(self, input_data: Dict, prediction: float,
                      actual: float = None, response_time: float = None):
        """예측 로그 기록"""
        log_entry = {
            'timestamp': pd.Timestamp.now(),
            'input_data': input_data,
            'prediction': prediction,
            'actual': actual,
            'response_time': response_time
        }

        self.prediction_buffer.append(log_entry)

        # 버퍼 크기 제한
        if len(self.prediction_buffer) > 10000:
            self.prediction_buffer = self.prediction_buffer[-5000:]

        # 주기적 모니터링
        if len(self.prediction_buffer) % 100 == 0:
            self._check_model_health()

    def _check_model_health(self):
        """모델 헬스 체크"""
        recent_data = pd.DataFrame([
            entry['input_data'] for entry in self.prediction_buffer[-100:]
        ])

        # 드리프트 감지
        drift_report = self.drift_detector.generate_drift_report(recent_data)

        # 알림 발생
        if drift_report['overall_drift_score'] > self.alert_thresholds['drift_score']:
            self._send_alert('drift_detected', drift_report)

        # 성능 저하 체크
        recent_predictions = [entry for entry in self.prediction_buffer[-100:]
                            if entry['actual'] is not None]

        if len(recent_predictions) > 10:
            actuals = [entry['actual'] for entry in recent_predictions]
            predictions = [entry['prediction'] for entry in recent_predictions]
            current_accuracy = accuracy_score(actuals, predictions)

            if current_accuracy < 0.8:  # 성능 임계값
                self._send_alert('performance_degradation', {
                    'current_accuracy': current_accuracy,
                    'samples_evaluated': len(recent_predictions)
                })

    def _send_alert(self, alert_type: str, details: Dict):
        """알림 발송"""
        alert_message = {
            'alert_type': alert_type,
            'timestamp': pd.Timestamp.now().isoformat(),
            'details': details,
            'severity': 'high' if alert_type == 'performance_degradation' else 'medium'
        }

        logging.warning(f"MLOps Alert: {alert_message}")

        # 실제 환경에서는 Slack, PagerDuty 등으로 알림 발송
        # self.slack_client.send_message(f"🚨 MLOps Alert: {alert_type}")
        # self.pagerduty_client.create_incident(alert_message)

5.2 모델 성능 대시보드

# monitoring_dashboard.py
import streamlit as st
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import mlflow
from mlflow.tracking import MlflowClient

class MLOpsDebboardApp:
    def __init__(self):
        self.client = MlflowClient()
        st.set_page_config(
            page_title="MLOps Monitoring Dashboard",
            page_icon="🚀",
            layout="wide"
        )

    def run(self):
        st.title("🚀 MLOps Monitoring Dashboard")

        # 사이드바 설정
        st.sidebar.header("설정")

        # 모델 선택
        models = [mv.name for mv in self.client.search_registered_models()]
        selected_model = st.sidebar.selectbox("모델 선택", models)

        # 시간 범위 선택
        time_range = st.sidebar.selectbox(
            "시간 범위",
            ["지난 1시간", "지난 24시간", "지난 7일", "지난 30일"]
        )

        # 메인 대시보드
        col1, col2, col3 = st.columns(3)

        with col1:
            self._show_model_performance_metrics(selected_model, time_range)

        with col2:
            self._show_prediction_volume_chart(selected_model, time_range)

        with col3:
            self._show_error_rate_chart(selected_model, time_range)

        # 드리프트 모니터링
        st.header("📊 드리프트 모니터링")
        col4, col5 = st.columns(2)

        with col4:
            self._show_feature_drift_chart(selected_model, time_range)

        with col5:
            self._show_prediction_distribution(selected_model, time_range)

        # 모델 버전 비교
        st.header("🔄 모델 버전 비교")
        self._show_model_version_comparison(selected_model)

        # 알림 및 이벤트
        st.header("🚨 최근 알림 및 이벤트")
        self._show_recent_alerts()

    def _show_model_performance_metrics(self, model_name: str, time_range: str):
        """모델 성능 메트릭 표시"""
        st.subheader("모델 성능")

        # 더미 데이터 (실제로는 모니터링 시스템에서 가져옴)
        metrics = {
            "정확도": 0.87,
            "정밀도": 0.89,
            "재현율": 0.85,
            "F1 스코어": 0.87
        }

        for metric, value in metrics.items():
            st.metric(
                label=metric,
                value=f"{value:.3f}",
                delta=f"{np.random.uniform(-0.02, 0.02):.3f}"
            )

    def _show_prediction_volume_chart(self, model_name: str, time_range: str):
        """예측 요청 볼륨 차트"""
        st.subheader("예측 요청 볼륨")

        # 더미 시계열 데이터 생성
        dates = pd.date_range(end=datetime.now(), periods=24, freq='H')
        volumes = np.random.poisson(100, 24) + np.random.randint(50, 150, 24)

        fig = go.Figure()
        fig.add_trace(go.Scatter(
            x=dates,
            y=volumes,
            mode='lines+markers',
            name='예측 요청 수',
            line=dict(color='blue')
        ))

        fig.update_layout(
            title="시간별 예측 요청 수",
            xaxis_title="시간",
            yaxis_title="요청 수",
            height=300
        )

        st.plotly_chart(fig, use_container_width=True)

    def _show_error_rate_chart(self, model_name: str, time_range: str):
        """에러율 차트"""
        st.subheader("에러율")

        # 더미 에러율 데이터
        dates = pd.date_range(end=datetime.now(), periods=24, freq='H')
        error_rates = np.random.uniform(0.01, 0.05, 24)

        fig = go.Figure()
        fig.add_trace(go.Scatter(
            x=dates,
            y=error_rates * 100,
            mode='lines+markers',
            name='에러율 (%)',
            line=dict(color='red')
        ))

        # 임계값 라인 추가
        fig.add_hline(y=3, line_dash="dash", line_color="orange",
                     annotation_text="임계값 (3%)")

        fig.update_layout(
            title="시간별 에러율",
            xaxis_title="시간",
            yaxis_title="에러율 (%)",
            height=300
        )

        st.plotly_chart(fig, use_container_width=True)

    def _show_feature_drift_chart(self, model_name: str, time_range: str):
        """피처 드리프트 차트"""
        st.subheader("피처 드리프트 점수")

        # 더미 드리프트 데이터
        features = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5']
        drift_scores = np.random.uniform(0, 0.3, len(features))

        fig = go.Figure(data=[
            go.Bar(x=features, y=drift_scores,
                  marker_color=['red' if score > 0.2 else 'orange' if score > 0.1 else 'green'
                               for score in drift_scores])
        ])

        fig.add_hline(y=0.1, line_dash="dash", line_color="orange",
                     annotation_text="주의 (0.1)")
        fig.add_hline(y=0.2, line_dash="dash", line_color="red",
                     annotation_text="경고 (0.2)")

        fig.update_layout(
            title="피처별 드리프트 점수",
            xaxis_title="피처",
            yaxis_title="드리프트 점수",
            height=300
        )

        st.plotly_chart(fig, use_container_width=True)

    def _show_prediction_distribution(self, model_name: str, time_range: str):
        """예측 분포 차트"""
        st.subheader("예측 분포")

        # 더미 예측 분포 데이터
        reference_predictions = np.random.normal(0.5, 0.2, 1000)
        current_predictions = np.random.normal(0.55, 0.25, 1000)

        fig = go.Figure()

        fig.add_trace(go.Histogram(
            x=reference_predictions,
            name='참조 기간',
            opacity=0.7,
            nbinsx=30
        ))

        fig.add_trace(go.Histogram(
            x=current_predictions,
            name='현재 기간',
            opacity=0.7,
            nbinsx=30
        ))

        fig.update_layout(
            title="예측값 분포 비교",
            xaxis_title="예측값",
            yaxis_title="빈도",
            barmode='overlay',
            height=300
        )

        st.plotly_chart(fig, use_container_width=True)

    def _show_model_version_comparison(self, model_name: str):
        """모델 버전 비교"""
        # 더미 모델 버전 데이터
        versions_data = {
            'Version': ['v1.0', 'v1.1', 'v1.2', 'v2.0'],
            'Accuracy': [0.85, 0.87, 0.88, 0.89],
            'Precision': [0.83, 0.86, 0.87, 0.88],
            'Recall': [0.87, 0.88, 0.89, 0.90],
            'Training Date': ['2026-01-01', '2026-01-05', '2026-01-10', '2026-01-15']
        }

        df = pd.DataFrame(versions_data)

        col1, col2 = st.columns(2)

        with col1:
            # 메트릭 비교 차트
            fig = go.Figure()

            for metric in ['Accuracy', 'Precision', 'Recall']:
                fig.add_trace(go.Scatter(
                    x=df['Version'],
                    y=df[metric],
                    mode='lines+markers',
                    name=metric
                ))

            fig.update_layout(
                title="버전별 성능 메트릭 비교",
                xaxis_title="모델 버전",
                yaxis_title="점수",
                height=300
            )

            st.plotly_chart(fig, use_container_width=True)

        with col2:
            # 버전 정보 테이블
            st.dataframe(df, use_container_width=True)

    def _show_recent_alerts(self):
        """최근 알림 표시"""
        # 더미 알림 데이터
        alerts_data = {
            'Time': ['2026-01-24 14:30', '2026-01-24 12:15', '2026-01-24 09:45'],
            'Type': ['성능 저하', '데이터 드리프트', '높은 에러율'],
            'Severity': ['High', 'Medium', 'Low'],
            'Message': [
                '모델 정확도가 임계값 아래로 떨어졌습니다',
                'feature1에서 유의미한 드리프트가 감지되었습니다',
                '에러율이 평소보다 높습니다'
            ]
        }

        alerts_df = pd.DataFrame(alerts_data)

        # 심각도별 색상 적용
        def highlight_severity(row):
            if row.Severity == 'High':
                return ['background-color: #ffcccc'] * len(row)
            elif row.Severity == 'Medium':
                return ['background-color: #fff2cc'] * len(row)
            else:
                return ['background-color: #ccffcc'] * len(row)

        st.dataframe(
            alerts_df.style.apply(highlight_severity, axis=1),
            use_container_width=True
        )

# Streamlit 앱 실행
if __name__ == "__main__":
    app = MLOpsDebboardApp()
    app.run()

6. CI/CD 파이프라인 구성

6.1 GitHub Actions를 활용한 MLOps 파이프라인

# .github/workflows/mlops-pipeline.yml
name: MLOps Pipeline

on:
  push:
    branches: [main]
    paths:
      - 'models/**'
      - 'data/**'
      - 'src/**'
  pull_request:
    branches: [main]
  schedule:
    # 매일 새벽 2시에 모델 재훈련
    - cron: '0 2 * * *'

env:
  MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
  AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
  AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
  KUBERNETES_NAMESPACE: production

jobs:
  data-validation:
    runs-on: ubuntu-latest
    outputs:
      data-valid: ${{ steps.validate.outputs.valid }}

    steps:
    - uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'

    - name: Install dependencies
      run: |
        pip install -r requirements.txt

    - name: Validate data
      id: validate
      run: |
        python scripts/validate_data.py
        echo "valid=$?" >> $GITHUB_OUTPUT

    - name: Upload data validation report
      uses: actions/upload-artifact@v3
      with:
        name: data-validation-report
        path: reports/data_validation.json

  model-training:
    needs: data-validation
    if: needs.data-validation.outputs.data-valid == 'true'
    runs-on: ubuntu-latest
    outputs:
      model-id: ${{ steps.training.outputs.model-id }}

    steps:
    - uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'

    - name: Install dependencies
      run: |
        pip install -r requirements.txt

    - name: Train model
      id: training
      run: |
        python scripts/train_model.py
        echo "model-id=$(cat model_id.txt)" >> $GITHUB_OUTPUT

    - name: Upload model artifacts
      uses: actions/upload-artifact@v3
      with:
        name: trained-model
        path: models/

  model-evaluation:
    needs: model-training
    runs-on: ubuntu-latest
    outputs:
      performance-score: ${{ steps.evaluate.outputs.score }}

    steps:
    - uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'

    - name: Install dependencies
      run: |
        pip install -r requirements.txt

    - name: Download model
      uses: actions/download-artifact@v3
      with:
        name: trained-model
        path: models/

    - name: Evaluate model
      id: evaluate
      run: |
        python scripts/evaluate_model.py
        echo "score=$(cat evaluation_score.txt)" >> $GITHUB_OUTPUT

    - name: Generate evaluation report
      run: |
        python scripts/generate_report.py

    - name: Upload evaluation report
      uses: actions/upload-artifact@v3
      with:
        name: evaluation-report
        path: reports/

  security-scan:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3

    - name: Run security scan
      uses: securecodewarrior/github-action-add-sarif@v1
      with:
        sarif-file: security-report.sarif

    - name: Scan dependencies
      run: |
        pip install safety
        safety check --json --output safety-report.json

  model-deployment:
    needs: [model-evaluation, security-scan]
    if: needs.model-evaluation.outputs.performance-score > '0.85'
    runs-on: ubuntu-latest
    environment: production

    steps:
    - uses: actions/checkout@v3

    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v2
      with:
        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        aws-region: us-west-2

    - name: Configure kubectl
      run: |
        aws eks update-kubeconfig --name production-cluster

    - name: Download model
      uses: actions/download-artifact@v3
      with:
        name: trained-model
        path: models/

    - name: Build and push Docker image
      run: |
        docker build -t ${{ secrets.ECR_REGISTRY }}/ml-model:${{ github.sha }} .
        docker push ${{ secrets.ECR_REGISTRY }}/ml-model:${{ github.sha }}

    - name: Deploy to Kubernetes
      run: |
        envsubst < k8s/deployment.yaml | kubectl apply -f -
      env:
        IMAGE_TAG: ${{ github.sha }}
        MODEL_ID: ${{ needs.model-training.outputs.model-id }}

    - name: Wait for deployment
      run: |
        kubectl rollout status deployment/ml-model-service -n $KUBERNETES_NAMESPACE

    - name: Run smoke tests
      run: |
        python scripts/smoke_tests.py --endpoint https://api.production.com/predict

  notification:
    needs: [model-deployment]
    if: always()
    runs-on: ubuntu-latest

    steps:
    - name: Notify Slack
      uses: 8398a7/action-slack@v3
      with:
        status: ${{ job.status }}
        text: |
          MLOps Pipeline ${{ job.status }}!
          Model ID: ${{ needs.model-training.outputs.model-id }}
          Performance: ${{ needs.model-evaluation.outputs.performance-score }}
      env:
        SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

6.2 테스트 자동화

# tests/test_model_pipeline.py
import pytest
import pandas as pd
import numpy as np
import joblib
from unittest.mock import Mock, patch
import tempfile
import os
from src.mlops_pipeline import MLOpsPipeline
from src.drift_detection import DriftDetector

class TestMLOpsPipeline:

    @pytest.fixture
    def sample_data(self):
        """테스트용 샘플 데이터 생성"""
        np.random.seed(42)
        data = {
            'feature1': np.random.normal(0, 1, 1000),
            'feature2': np.random.normal(0, 1, 1000),
            'target': np.random.randint(0, 2, 1000)
        }
        return pd.DataFrame(data)

    @pytest.fixture
    def pipeline(self):
        """MLOps 파이프라인 인스턴스"""
        return MLOpsPipeline("test_model", "test_experiment")

    def test_data_validation_success(self, pipeline, sample_data):
        """데이터 검증 성공 케이스"""
        with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
            sample_data.to_csv(f.name, index=False)

            with patch('pandas.read_csv', return_value=sample_data):
                result = pipeline.data_validation()

            assert result['missing_values'] == 0
            assert result['duplicate_rows'] == 0
            assert result['data_shape'] == (1000, 3)

        os.unlink(f.name)

    def test_data_validation_failure(self, pipeline):
        """데이터 검증 실패 케이스"""
        # 결측값이 많은 잘못된 데이터
        bad_data = pd.DataFrame({
            'feature1': [np.nan] * 1500,  # 임계값 초과
            'feature2': [1, 2, 3] * 500,
            'target': [0, 1] * 750
        })

        with patch('pandas.read_csv', return_value=bad_data):
            with pytest.raises(ValueError, match="Too many missing values"):
                pipeline.data_validation()

    def test_feature_engineering(self, pipeline, sample_data):
        """피처 엔지니어링 테스트"""
        with patch('pandas.read_csv', return_value=sample_data):
            result = pipeline.feature_engineering()

        assert result['features_created'] == 4

        # 생성된 피처 파일 확인
        with patch('pandas.read_csv') as mock_read:
            mock_read.return_value = sample_data
            processed_data = pd.read_csv("/data/processed/features.csv")

            expected_columns = ['feature1', 'feature2', 'feature_interaction', 'feature_ratio', 'target']
            # assert list(processed_data.columns) == expected_columns

    def test_model_training_performance(self, pipeline, sample_data):
        """모델 훈련 성능 테스트"""
        with patch('pandas.read_csv', return_value=sample_data):
            with patch('mlflow.start_run'):
                with patch('mlflow.log_params'):
                    with patch('mlflow.log_metrics'):
                        with patch('mlflow.sklearn.log_model'):
                            result = pipeline.model_training()

        # 성능 메트릭 확인
        assert 'accuracy' in result
        assert 'precision' in result
        assert 'recall' in result
        assert 0 <= result['accuracy'] <= 1

    def test_model_validation_threshold(self, pipeline):
        """모델 검증 임계값 테스트"""
        # 성능이 낮은 모델 시뮬레이션
        mock_model = Mock()
        mock_model.predict.return_value = np.array([0] * 100)  # 모든 예측을 0으로

        validation_data = pd.DataFrame({
            'feature1': np.random.normal(0, 1, 100),
            'feature2': np.random.normal(0, 1, 100),
            'target': np.array([1] * 100)  # 실제 값은 모두 1
        })

        with patch('joblib.load', return_value=mock_model):
            with patch('pandas.read_csv', return_value=validation_data):
                with pytest.raises(ValueError, match="Model performance below threshold"):
                    pipeline.model_validation()

class TestDriftDetection:

    @pytest.fixture
    def reference_data(self):
        """참조 데이터"""
        np.random.seed(42)
        return pd.DataFrame({
            'feature1': np.random.normal(0, 1, 1000),
            'feature2': np.random.normal(0, 1, 1000),
            'category': np.random.choice(['A', 'B', 'C'], 1000)
        })

    @pytest.fixture
    def drift_detector(self, reference_data):
        """드리프트 감지기"""
        return DriftDetector(reference_data)

    def test_no_drift_detection(self, drift_detector, reference_data):
        """드리프트 없는 경우 테스트"""
        # 참조 데이터와 동일한 분포의 새 데이터
        new_data = pd.DataFrame({
            'feature1': np.random.normal(0, 1, 500),
            'feature2': np.random.normal(0, 1, 500),
            'category': np.random.choice(['A', 'B', 'C'], 500)
        })

        drift_results = drift_detector.detect_covariate_shift(new_data)

        # 드리프트가 감지되지 않아야 함
        for feature, result in drift_results.items():
            assert not result['drift_detected']

    def test_drift_detection(self, drift_detector):
        """드리프트 있는 경우 테스트"""
        # 분포가 다른 새 데이터
        new_data = pd.DataFrame({
            'feature1': np.random.normal(2, 1, 500),  # 평균이 다름
            'feature2': np.random.normal(0, 3, 500),  # 분산이 다름
            'category': np.random.choice(['A', 'B', 'D'], 500)  # 새로운 카테고리
        })

        drift_results = drift_detector.detect_covariate_shift(new_data)

        # 수치형 변수에서 드리프트가 감지되어야 함
        assert drift_results['feature1']['drift_detected']
        assert drift_results['feature2']['drift_detected']

    def test_concept_drift_detection(self, drift_detector, reference_data):
        """개념 드리프트 감지 테스트"""
        # 성능이 저하된 모델 시뮬레이션
        mock_model = Mock()
        mock_model.predict.side_effect = [
            np.array([1] * len(reference_data)),  # 참조 데이터에서는 좋은 성능
            np.array([0] * 500)  # 새 데이터에서는 나쁜 성능
        ]

        new_data = pd.DataFrame({
            'feature1': np.random.normal(0, 1, 500),
            'feature2': np.random.normal(0, 1, 500)
        })
        new_labels = pd.Series([1] * 500)  # 실제 라벨은 모두 1

        concept_drift = drift_detector.detect_concept_drift(new_data, new_labels, mock_model)

        assert concept_drift['concept_drift_detected']
        assert concept_drift['performance_drop'] > 0.05

# 통합 테스트
class TestMLOpsIntegration:

    def test_end_to_end_pipeline(self):
        """전체 파이프라인 통합 테스트"""
        # 테스트 데이터 생성
        np.random.seed(42)
        train_data = pd.DataFrame({
            'feature1': np.random.normal(0, 1, 1000),
            'feature2': np.random.normal(0, 1, 1000),
            'target': np.random.randint(0, 2, 1000)
        })

        with tempfile.TemporaryDirectory() as tmpdir:
            # 임시 데이터 파일 생성
            train_path = os.path.join(tmpdir, 'train.csv')
            train_data.to_csv(train_path, index=False)

            # 파이프라인 실행
            pipeline = MLOpsPipeline("integration_test", "integration_experiment")

            with patch('pandas.read_csv', return_value=train_data):
                with patch('mlflow.start_run'):
                    with patch('mlflow.log_params'):
                        with patch('mlflow.log_metrics'):
                            with patch('mlflow.sklearn.log_model'):
                                # 각 단계 순차 실행
                                validation_result = pipeline.data_validation()
                                feature_result = pipeline.feature_engineering()
                                training_result = pipeline.model_training()

                                # 결과 검증
                                assert validation_result is not None
                                assert feature_result['features_created'] > 0
                                assert training_result['accuracy'] > 0

    @pytest.mark.performance
    def test_model_inference_latency(self):
        """모델 추론 지연 시간 테스트"""
        import time
        from src.custom_predictor import CustomPredictor

        # 더미 모델 생성
        mock_model = Mock()
        mock_model.predict.return_value = np.array([1])
        mock_model.predict_proba.return_value = np.array([[0.3, 0.7]])

        with patch('joblib.load', return_value=mock_model):
            predictor = CustomPredictor("/fake/model/path")

            # 지연 시간 측정
            test_input = {'feature1': 0.5, 'feature2': -0.3}

            start_time = time.time()
            result = predictor.predict(test_input)
            end_time = time.time()

            latency = end_time - start_time

            # 지연 시간이 100ms 이하여야 함
            assert latency < 0.1
            assert result['status'] == 'success'

# pytest 실행 설정
if __name__ == "__main__":
    pytest.main([
        "-v",
        "--tb=short",
        "--cov=src",
        "--cov-report=html",
        "--cov-report=term-missing"
    ])

7. 모델 거버넌스 및 규제 준수

7.1 모델 거버넌스 프레임워크

# model_governance.py
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from datetime import datetime
import json
import pandas as pd
from enum import Enum

class ModelRisk(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class ModelStage(Enum):
    DEVELOPMENT = "development"
    TESTING = "testing"
    STAGING = "staging"
    PRODUCTION = "production"
    DEPRECATED = "deprecated"

@dataclass
class ModelMetadata:
    """모델 메타데이터 클래스"""
    model_id: str
    model_name: str
    version: str
    owner: str
    business_unit: str
    use_case: str
    risk_level: ModelRisk
    stage: ModelStage
    created_date: datetime
    last_updated: datetime

    # 기술적 정보
    algorithm_type: str
    framework: str
    features: List[str]
    target_variable: str

    # 성능 정보
    training_accuracy: float
    validation_accuracy: float
    test_accuracy: float

    # 데이터 정보
    training_data_source: str
    training_data_size: int
    data_lineage: Dict[str, Any]

    # 규제 정보
    regulatory_requirements: List[str]
    ethical_considerations: List[str]
    bias_assessment: Dict[str, Any]

    # 승인 정보
    approvals: List[Dict[str, Any]]

    def to_dict(self):
        """딕셔너리로 변환"""
        return {
            "model_id": self.model_id,
            "model_name": self.model_name,
            "version": self.version,
            "owner": self.owner,
            "business_unit": self.business_unit,
            "use_case": self.use_case,
            "risk_level": self.risk_level.value,
            "stage": self.stage.value,
            "created_date": self.created_date.isoformat(),
            "last_updated": self.last_updated.isoformat(),
            "algorithm_type": self.algorithm_type,
            "framework": self.framework,
            "features": self.features,
            "target_variable": self.target_variable,
            "training_accuracy": self.training_accuracy,
            "validation_accuracy": self.validation_accuracy,
            "test_accuracy": self.test_accuracy,
            "training_data_source": self.training_data_source,
            "training_data_size": self.training_data_size,
            "data_lineage": self.data_lineage,
            "regulatory_requirements": self.regulatory_requirements,
            "ethical_considerations": self.ethical_considerations,
            "bias_assessment": self.bias_assessment,
            "approvals": self.approvals
        }

class ModelGovernanceFramework:
    def __init__(self):
        self.models_registry = {}
        self.governance_policies = self._load_governance_policies()
        self.risk_thresholds = self._load_risk_thresholds()

    def _load_governance_policies(self) -> Dict:
        """거버넌스 정책 로드"""
        return {
            "approval_required_stages": [ModelStage.STAGING, ModelStage.PRODUCTION],
            "mandatory_reviews": {
                ModelRisk.HIGH: ["technical", "business", "legal"],
                ModelRisk.CRITICAL: ["technical", "business", "legal", "executive"]
            },
            "documentation_requirements": [
                "model_card",
                "risk_assessment",
                "bias_analysis",
                "performance_report"
            ],
            "monitoring_requirements": {
                "performance_checks": "daily",
                "drift_detection": "weekly",
                "bias_monitoring": "monthly"
            }
        }

    def _load_risk_thresholds(self) -> Dict:
        """리스크 임계값 로드"""
        return {
            "performance_degradation": {
                ModelRisk.LOW: 0.1,
                ModelRisk.MEDIUM: 0.05,
                ModelRisk.HIGH: 0.03,
                ModelRisk.CRITICAL: 0.01
            },
            "bias_threshold": {
                "demographic_parity": 0.05,
                "equalized_odds": 0.05,
                "statistical_parity": 0.05
            },
            "drift_threshold": {
                ModelRisk.LOW: 0.2,
                ModelRisk.MEDIUM: 0.15,
                ModelRisk.HIGH: 0.1,
                ModelRisk.CRITICAL: 0.05
            }
        }

    def register_model(self, metadata: ModelMetadata) -> str:
        """모델 등록"""
        # 필수 정보 검증
        self._validate_model_metadata(metadata)

        # 리스크 평가
        risk_assessment = self._assess_model_risk(metadata)
        metadata.risk_level = risk_assessment["risk_level"]

        # 거버넌스 정책 적용
        required_approvals = self._get_required_approvals(metadata)

        # 등록
        self.models_registry[metadata.model_id] = metadata

        return metadata.model_id

    def _validate_model_metadata(self, metadata: ModelMetadata):
        """모델 메타데이터 검증"""
        required_fields = [
            'model_name', 'owner', 'use_case', 'algorithm_type',
            'features', 'target_variable', 'training_data_source'
        ]

        for field in required_fields:
            if not getattr(metadata, field):
                raise ValueError(f"Required field '{field}' is missing")

        # 성능 메트릭 검증
        if metadata.training_accuracy < 0 or metadata.training_accuracy > 1:
            raise ValueError("Training accuracy must be between 0 and 1")

    def _assess_model_risk(self, metadata: ModelMetadata) -> Dict:
        """모델 리스크 평가"""
        risk_factors = []

        # 알고리즘 복잡성
        complex_algorithms = ['neural_network', 'ensemble', 'deep_learning']
        if metadata.algorithm_type in complex_algorithms:
            risk_factors.append("complex_algorithm")

        # 사용 사례 위험도
        high_risk_use_cases = ['credit_scoring', 'hiring', 'medical_diagnosis']
        if any(use_case in metadata.use_case.lower() for use_case in high_risk_use_cases):
            risk_factors.append("high_impact_use_case")

        # 데이터 크기
        if metadata.training_data_size < 1000:
            risk_factors.append("small_dataset")

        # 성능
        if metadata.validation_accuracy < 0.8:
            risk_factors.append("low_performance")

        # 리스크 레벨 결정
        if len(risk_factors) >= 3:
            risk_level = ModelRisk.CRITICAL
        elif len(risk_factors) >= 2:
            risk_level = ModelRisk.HIGH
        elif len(risk_factors) >= 1:
            risk_level = ModelRisk.MEDIUM
        else:
            risk_level = ModelRisk.LOW

        return {
            "risk_level": risk_level,
            "risk_factors": risk_factors,
            "assessment_date": datetime.now()
        }

    def _get_required_approvals(self, metadata: ModelMetadata) -> List[str]:
        """필요한 승인 목록 조회"""
        required_approvals = []

        # 리스크 레벨별 승인 요구사항
        if metadata.risk_level in self.governance_policies["mandatory_reviews"]:
            required_approvals.extend(
                self.governance_policies["mandatory_reviews"][metadata.risk_level]
            )

        # 스테이지별 승인 요구사항
        if metadata.stage in self.governance_policies["approval_required_stages"]:
            if "technical" not in required_approvals:
                required_approvals.append("technical")

        return required_approvals

    def promote_model(self, model_id: str, target_stage: ModelStage,
                     approver: str, approval_notes: str = "") -> bool:
        """모델 스테이지 승격"""
        if model_id not in self.models_registry:
            raise ValueError(f"Model {model_id} not found")

        model = self.models_registry[model_id]

        # 승격 전 검증
        validation_result = self._validate_promotion(model, target_stage)
        if not validation_result["valid"]:
            raise ValueError(f"Promotion validation failed: {validation_result['reasons']}")

        # 승인 기록
        approval = {
            "stage": target_stage.value,
            "approver": approver,
            "approval_date": datetime.now().isoformat(),
            "notes": approval_notes
        }
        model.approvals.append(approval)

        # 스테이지 업데이트
        model.stage = target_stage
        model.last_updated = datetime.now()

        return True

    def _validate_promotion(self, model: ModelMetadata, target_stage: ModelStage) -> Dict:
        """승격 검증"""
        reasons = []

        # 필요한 승인 확인
        required_approvals = self._get_required_approvals(model)
        current_approvals = [approval["approver"] for approval in model.approvals]

        missing_approvals = set(required_approvals) - set(current_approvals)
        if missing_approvals:
            reasons.append(f"Missing approvals: {list(missing_approvals)}")

        # 성능 임계값 확인
        if target_stage == ModelStage.PRODUCTION:
            if model.validation_accuracy < 0.8:
                reasons.append("Performance below production threshold")

        # 문서 요구사항 확인
        # (실제로는 문서 시스템과 연동하여 확인)

        return {
            "valid": len(reasons) == 0,
            "reasons": reasons
        }

    def generate_model_card(self, model_id: str) -> Dict:
        """모델 카드 생성"""
        if model_id not in self.models_registry:
            raise ValueError(f"Model {model_id} not found")

        model = self.models_registry[model_id]

        model_card = {
            "model_details": {
                "name": model.model_name,
                "version": model.version,
                "date": model.created_date.isoformat(),
                "type": model.algorithm_type,
                "paper_or_reference": "",
                "license": "",
                "feedback": f"mailto:{model.owner}"
            },
            "intended_use": {
                "primary_intended_uses": model.use_case,
                "primary_intended_users": model.business_unit,
                "out_of_scope_use_cases": []
            },
            "factors": {
                "relevant_factors": model.features,
                "evaluation_factors": []
            },
            "metrics": {
                "model_performance_measures": {
                    "training_accuracy": model.training_accuracy,
                    "validation_accuracy": model.validation_accuracy,
                    "test_accuracy": model.test_accuracy
                },
                "decision_thresholds": {},
                "variation_approaches": []
            },
            "evaluation_data": {
                "dataset": model.training_data_source,
                "motivation": "",
                "preprocessing": model.data_lineage
            },
            "training_data": {
                "dataset": model.training_data_source,
                "motivation": "",
                "preprocessing": model.data_lineage
            },
            "quantitative_analyses": {
                "unitary_results": {},
                "intersectional_results": {}
            },
            "ethical_considerations": model.ethical_considerations,
            "caveats_and_recommendations": model.regulatory_requirements
        }

        return model_card

class BiasAuditor:
    """편향성 감사 클래스"""

    def __init__(self):
        self.bias_metrics = [
            "demographic_parity",
            "equalized_odds",
            "statistical_parity",
            "individual_fairness"
        ]

    def audit_bias(self, model, X_test: pd.DataFrame, y_test: pd.Series,
                  sensitive_attributes: List[str]) -> Dict:
        """편향성 감사 수행"""
        results = {}
        predictions = model.predict(X_test)

        for attribute in sensitive_attributes:
            if attribute in X_test.columns:
                attribute_results = {}

                # 그룹별 분석
                groups = X_test[attribute].unique()

                for metric in self.bias_metrics:
                    if metric == "demographic_parity":
                        score = self._calculate_demographic_parity(
                            X_test, predictions, attribute, groups
                        )
                    elif metric == "equalized_odds":
                        score = self._calculate_equalized_odds(
                            X_test, y_test, predictions, attribute, groups
                        )
                    # 다른 메트릭들도 구현...

                    attribute_results[metric] = score

                results[attribute] = attribute_results

        return results

    def _calculate_demographic_parity(self, X: pd.DataFrame, predictions: np.ndarray,
                                    attribute: str, groups: List) -> Dict:
        """인구통계학적 동등성 계산"""
        positive_rates = {}

        for group in groups:
            group_mask = X[attribute] == group
            group_predictions = predictions[group_mask]
            positive_rate = np.mean(group_predictions == 1)
            positive_rates[group] = positive_rate

        # 최대 차이 계산
        rates = list(positive_rates.values())
        max_difference = max(rates) - min(rates)

        return {
            "positive_rates": positive_rates,
            "max_difference": max_difference,
            "passes_threshold": max_difference < 0.05
        }

    def _calculate_equalized_odds(self, X: pd.DataFrame, y_true: pd.Series,
                                predictions: np.ndarray, attribute: str, groups: List) -> Dict:
        """동등한 기회 계산"""
        group_metrics = {}

        for group in groups:
            group_mask = X[attribute] == group
            group_y_true = y_true[group_mask]
            group_predictions = predictions[group_mask]

            # True Positive Rate
            tpr = np.sum((group_y_true == 1) & (group_predictions == 1)) / np.sum(group_y_true == 1)
            # False Positive Rate
            fpr = np.sum((group_y_true == 0) & (group_predictions == 1)) / np.sum(group_y_true == 0)

            group_metrics[group] = {"tpr": tpr, "fpr": fpr}

        # 그룹 간 차이 계산
        tpr_values = [metrics["tpr"] for metrics in group_metrics.values()]
        fpr_values = [metrics["fpr"] for metrics in group_metrics.values()]

        tpr_difference = max(tpr_values) - min(tpr_values)
        fpr_difference = max(fpr_values) - min(fpr_values)

        return {
            "group_metrics": group_metrics,
            "tpr_difference": tpr_difference,
            "fpr_difference": fpr_difference,
            "passes_threshold": max(tpr_difference, fpr_difference) < 0.05
        }

# 사용 예제
if __name__ == "__main__":
    # 거버넌스 프레임워크 초기화
    governance = ModelGovernanceFramework()

    # 모델 메타데이터 생성
    model_metadata = ModelMetadata(
        model_id="credit_model_v1",
        model_name="Credit Risk Assessment Model",
        version="1.0",
        owner="data-science-team@company.com",
        business_unit="Consumer Banking",
        use_case="Credit scoring for personal loans",
        risk_level=ModelRisk.HIGH,  # 초기값, 평가 후 업데이트됨
        stage=ModelStage.DEVELOPMENT,
        created_date=datetime.now(),
        last_updated=datetime.now(),
        algorithm_type="random_forest",
        framework="scikit-learn",
        features=["income", "credit_history", "employment_status", "loan_amount"],
        target_variable="default_risk",
        training_accuracy=0.85,
        validation_accuracy=0.83,
        test_accuracy=0.82,
        training_data_source="customer_database_2024",
        training_data_size=50000,
        data_lineage={"source": "CRM", "transformations": ["normalization", "feature_engineering"]},
        regulatory_requirements=["Fair Credit Reporting Act", "Equal Credit Opportunity Act"],
        ethical_considerations=["Avoid discrimination based on protected attributes"],
        bias_assessment={},
        approvals=[]
    )

    # 모델 등록
    model_id = governance.register_model(model_metadata)
    print(f"Model registered with ID: {model_id}")

    # 모델 카드 생성
    model_card = governance.generate_model_card(model_id)
    print("Model card generated")

    # 편향성 감사 (예제)
    auditor = BiasAuditor()
    # bias_results = auditor.audit_bias(model, X_test, y_test, ["gender", "age_group"])

8. 실무 적용 가이드

8.1 조직별 MLOps 도입 전략

# mlops_adoption_strategy.py
from enum import Enum
from typing import Dict, List
from dataclasses import dataclass

class OrganizationType(Enum):
    STARTUP = "startup"
    SCALE_UP = "scale_up"
    ENTERPRISE = "enterprise"
    CONSULTING = "consulting"

class MLOpsMaturityLevel(Enum):
    LEVEL_0 = "manual"
    LEVEL_1 = "devops"
    LEVEL_2 = "automated_training"
    LEVEL_3 = "automated_deployment"
    LEVEL_4 = "full_mlops"

@dataclass
class OrganizationProfile:
    org_type: OrganizationType
    team_size: int
    ml_projects_count: int
    current_maturity: MLOpsMaturityLevel
    budget_range: str
    technical_expertise: str
    regulatory_requirements: List[str]

class MLOpsAdoptionStrategy:
    def __init__(self):
        self.strategy_templates = self._load_strategy_templates()

    def _load_strategy_templates(self) -> Dict:
        return {
            OrganizationType.STARTUP: {
                "priorities": ["fast_iteration", "cost_efficiency", "scalability"],
                "recommended_tools": {
                    "experiment_tracking": "MLflow Community",
                    "model_serving": "FastAPI + Docker",
                    "monitoring": "Prometheus + Grafana",
                    "ci_cd": "GitHub Actions",
                    "infrastructure": "Cloud managed services"
                },
                "implementation_phases": [
                    {
                        "phase": 1,
                        "name": "Basic Tracking",
                        "duration": "2-4 weeks",
                        "goals": ["Implement experiment tracking", "Version control for models"],
                        "deliverables": ["MLflow setup", "Git workflows"]
                    },
                    {
                        "phase": 2,
                        "name": "Automated Deployment",
                        "duration": "4-6 weeks",
                        "goals": ["CI/CD pipeline", "Model serving"],
                        "deliverables": ["Docker containers", "API endpoints"]
                    },
                    {
                        "phase": 3,
                        "name": "Monitoring",
                        "duration": "3-4 weeks",
                        "goals": ["Model monitoring", "Performance tracking"],
                        "deliverables": ["Monitoring dashboard", "Alert system"]
                    }
                ]
            },
            OrganizationType.ENTERPRISE: {
                "priorities": ["governance", "compliance", "security", "scalability"],
                "recommended_tools": {
                    "experiment_tracking": "MLflow Enterprise",
                    "model_serving": "Kubernetes + KServe",
                    "monitoring": "Enterprise monitoring solution",
                    "ci_cd": "Jenkins/GitLab Enterprise",
                    "infrastructure": "Hybrid cloud + on-premise"
                },
                "implementation_phases": [
                    {
                        "phase": 1,
                        "name": "Governance Framework",
                        "duration": "8-12 weeks",
                        "goals": ["Establish governance", "Security policies"],
                        "deliverables": ["Governance framework", "Security guidelines"]
                    },
                    {
                        "phase": 2,
                        "name": "Platform Setup",
                        "duration": "12-16 weeks",
                        "goals": ["MLOps platform", "Integration with existing systems"],
                        "deliverables": ["MLOps platform", "Integration APIs"]
                    },
                    {
                        "phase": 3,
                        "name": "Pilot Projects",
                        "duration": "6-8 weeks",
                        "goals": ["Pilot implementation", "Team training"],
                        "deliverables": ["Pilot projects", "Training materials"]
                    },
                    {
                        "phase": 4,
                        "name": "Enterprise Rollout",
                        "duration": "16-24 weeks",
                        "goals": ["Organization-wide adoption"],
                        "deliverables": ["Enterprise deployment", "Documentation"]
                    }
                ]
            }
        }

    def generate_adoption_plan(self, profile: OrganizationProfile) -> Dict:
        """조직 프로필 기반 도입 계획 생성"""
        template = self.strategy_templates.get(profile.org_type, self.strategy_templates[OrganizationType.STARTUP])

        # 조직 특성에 따른 계획 커스터마이징
        customized_plan = self._customize_plan(template, profile)

        return {
            "organization_profile": profile.__dict__,
            "adoption_strategy": customized_plan,
            "estimated_timeline": self._calculate_timeline(customized_plan),
            "resource_requirements": self._estimate_resources(profile, customized_plan),
            "success_metrics": self._define_success_metrics(profile),
            "risk_mitigation": self._identify_risks_and_mitigations(profile)
        }

    def _customize_plan(self, template: Dict, profile: OrganizationProfile) -> Dict:
        """프로필에 따른 계획 커스터마이징"""
        customized = template.copy()

        # 팀 크기에 따른 조정
        if profile.team_size < 5:
            customized["recommended_tools"]["infrastructure"] = "Serverless/managed services"
        elif profile.team_size > 50:
            customized["recommended_tools"]["infrastructure"] = "Self-managed Kubernetes"

        # 규제 요구사항에 따른 조정
        if "GDPR" in profile.regulatory_requirements:
            customized["additional_requirements"] = [
                "Data privacy controls",
                "Audit logging",
                "Right to explanation"
            ]

        if "SOX" in profile.regulatory_requirements:
            customized["additional_requirements"] = customized.get("additional_requirements", []) + [
                "Change management controls",
                "Segregation of duties",
                "Financial data protection"
            ]

        return customized

    def _calculate_timeline(self, plan: Dict) -> Dict:
        """타임라인 계산"""
        phases = plan.get("implementation_phases", [])
        total_weeks = sum(self._parse_duration(phase["duration"]) for phase in phases)

        return {
            "total_duration_weeks": total_weeks,
            "estimated_completion": f"{total_weeks} weeks",
            "phase_breakdown": [
                {
                    "phase": phase["phase"],
                    "name": phase["name"],
                    "duration": phase["duration"]
                }
                for phase in phases
            ]
        }

    def _parse_duration(self, duration_str: str) -> int:
        """기간 문자열을 주 단위로 파싱"""
        # "2-4 weeks" -> 3 (평균값)
        import re
        match = re.search(r"(\d+)-(\d+)", duration_str)
        if match:
            return (int(match.group(1)) + int(match.group(2))) // 2
        else:
            single_match = re.search(r"(\d+)", duration_str)
            return int(single_match.group(1)) if single_match else 4

    def _estimate_resources(self, profile: OrganizationProfile, plan: Dict) -> Dict:
        """리소스 요구사항 추정"""
        base_resources = {
            "human_resources": {
                "ml_engineers": max(1, profile.team_size // 3),
                "devops_engineers": max(1, profile.team_size // 5),
                "data_engineers": max(1, profile.team_size // 4),
                "project_manager": 1 if profile.team_size > 10 else 0.5
            },
            "infrastructure_costs": {
                "monthly_estimate": self._estimate_infrastructure_cost(profile),
                "setup_cost": self._estimate_setup_cost(profile)
            },
            "training_requirements": {
                "mlops_fundamentals": f"{profile.team_size} people",
                "tool_specific_training": "Key team members",
                "estimated_training_hours": profile.team_size * 40
            }
        }

        return base_resources

    def _estimate_infrastructure_cost(self, profile: OrganizationProfile) -> str:
        """인프라 비용 추정"""
        if profile.org_type == OrganizationType.STARTUP:
            return "$500-2000/month"
        elif profile.org_type == OrganizationType.SCALE_UP:
            return "$2000-10000/month"
        else:  # ENTERPRISE
            return "$10000-50000/month"

    def _estimate_setup_cost(self, profile: OrganizationProfile) -> str:
        """초기 설정 비용 추정"""
        if profile.org_type == OrganizationType.STARTUP:
            return "$5000-20000"
        elif profile.org_type == OrganizationType.SCALE_UP:
            return "$20000-100000"
        else:  # ENTERPRISE
            return "$100000-500000"

    def _define_success_metrics(self, profile: OrganizationProfile) -> List[Dict]:
        """성공 메트릭 정의"""
        base_metrics = [
            {
                "metric": "Model deployment frequency",
                "target": "Weekly deployments",
                "measurement": "Number of model deployments per week"
            },
            {
                "metric": "Time to production",
                "target": "< 2 weeks from model training to production",
                "measurement": "Average time from model completion to production deployment"
            },
            {
                "metric": "Model performance monitoring",
                "target": "100% of production models monitored",
                "measurement": "Percentage of models with active monitoring"
            }
        ]

        if profile.org_type == OrganizationType.ENTERPRISE:
            base_metrics.extend([
                {
                    "metric": "Compliance adherence",
                    "target": "100% compliance with regulatory requirements",
                    "measurement": "Audit score"
                },
                {
                    "metric": "Model governance coverage",
                    "target": "100% of models with proper documentation",
                    "measurement": "Percentage of models with complete model cards"
                }
            ])

        return base_metrics

    def _identify_risks_and_mitigations(self, profile: OrganizationProfile) -> List[Dict]:
        """리스크 및 완화 방안 식별"""
        risks = [
            {
                "risk": "Team skill gaps",
                "impact": "High",
                "probability": "Medium",
                "mitigation": "Comprehensive training program and external consulting"
            },
            {
                "risk": "Tool integration complexity",
                "impact": "Medium",
                "probability": "High",
                "mitigation": "Phased implementation and proof of concepts"
            },
            {
                "risk": "Resistance to change",
                "impact": "Medium",
                "probability": "Medium",
                "mitigation": "Change management program and success showcases"
            }
        ]

        if profile.org_type == OrganizationType.ENTERPRISE:
            risks.extend([
                {
                    "risk": "Security and compliance gaps",
                    "impact": "Critical",
                    "probability": "Medium",
                    "mitigation": "Security-first design and compliance review gates"
                },
                {
                    "risk": "Legacy system integration",
                    "impact": "High",
                    "probability": "High",
                    "mitigation": "API-first approach and gradual migration"
                }
            ])

        return risks

# ROI 계산기
class MLOpsROICalculator:
    def __init__(self):
        self.benefit_factors = {
            "faster_deployment": {"time_saved_hours": 40, "hourly_rate": 100},
            "improved_quality": {"error_reduction": 0.3, "error_cost": 10000},
            "automation_savings": {"manual_hours_saved": 20, "hourly_rate": 80},
            "compliance_benefits": {"audit_cost_reduction": 50000}
        }

    def calculate_roi(self, profile: OrganizationProfile, implementation_cost: float,
                     timeframe_months: int = 12) -> Dict:
        """ROI 계산"""

        # 비용 계산
        total_costs = implementation_cost

        # 혜택 계산
        monthly_benefits = 0

        # 배포 속도 향상
        deployments_per_month = profile.ml_projects_count * 2  # 가정
        time_saved = deployments_per_month * self.benefit_factors["faster_deployment"]["time_saved_hours"]
        monthly_benefits += time_saved * self.benefit_factors["faster_deployment"]["hourly_rate"]

        # 품질 향상
        error_reduction_value = (
            profile.ml_projects_count *
            self.benefit_factors["improved_quality"]["error_reduction"] *
            self.benefit_factors["improved_quality"]["error_cost"]
        )
        monthly_benefits += error_reduction_value / 12  # 연간 혜택을 월간으로 변환

        # 자동화 절약
        automation_savings = (
            profile.team_size *
            self.benefit_factors["automation_savings"]["manual_hours_saved"] *
            self.benefit_factors["automation_savings"]["hourly_rate"]
        )
        monthly_benefits += automation_savings

        # 총 혜택
        total_benefits = monthly_benefits * timeframe_months

        # ROI 계산
        roi_percentage = ((total_benefits - total_costs) / total_costs) * 100
        payback_period = total_costs / monthly_benefits if monthly_benefits > 0 else float('inf')

        return {
            "total_costs": total_costs,
            "total_benefits": total_benefits,
            "net_benefit": total_benefits - total_costs,
            "roi_percentage": roi_percentage,
            "payback_period_months": payback_period,
            "monthly_benefits": monthly_benefits,
            "benefit_breakdown": {
                "faster_deployment": time_saved * self.benefit_factors["faster_deployment"]["hourly_rate"],
                "improved_quality": error_reduction_value / 12,
                "automation_savings": automation_savings
            }
        }

# 사용 예제
if __name__ == "__main__":
    # 조직 프로필 생성
    startup_profile = OrganizationProfile(
        org_type=OrganizationType.STARTUP,
        team_size=8,
        ml_projects_count=3,
        current_maturity=MLOpsMaturityLevel.LEVEL_0,
        budget_range="$50k-100k",
        technical_expertise="medium",
        regulatory_requirements=[]
    )

    enterprise_profile = OrganizationProfile(
        org_type=OrganizationType.ENTERPRISE,
        team_size=50,
        ml_projects_count=15,
        current_maturity=MLOpsMaturityLevel.LEVEL_1,
        budget_range="$500k-1M",
        technical_expertise="high",
        regulatory_requirements=["SOX", "GDPR", "HIPAA"]
    )

    # 도입 전략 생성
    strategy_generator = MLOpsAdoptionStrategy()

    startup_plan = strategy_generator.generate_adoption_plan(startup_profile)
    print("Startup MLOps Adoption Plan:")
    print(json.dumps(startup_plan, indent=2, default=str))

    # ROI 계산
    roi_calculator = MLOpsROICalculator()
    startup_roi = roi_calculator.calculate_roi(startup_profile, 75000, 12)
    print("\nStartup ROI Analysis:")
    print(json.dumps(startup_roi, indent=2))

8.2 문제 해결 가이드

# MLOps 문제 해결 가이드

## 일반적인 문제와 해결책

### 1. 모델 성능 저하
**증상**: 프로덕션 모델의 성능이 시간이 지나면서 저하
**원인**: 데이터 드리프트, 개념 드리프트, 환경 변화
**해결책**:
- 실시간 모니터링 시스템 구축
- 자동 재훈련 파이프라인 설정
- A/B 테스트를 통한 점진적 배포

### 2. 배포 파이프라인 실패
**증상**: CI/CD 파이프라인에서 모델 배포 실패
**원인**: 환경 불일치, 의존성 충돌, 리소스 부족
**해결책**:
- Docker 컨테이너 사용으로 환경 일관성 확보
- 의존성 버전 고정
- 리소스 모니터링 및 오토스케일링

### 3. 데이터 파이프라인 오류
**증상**: 훈련/추론용 데이터 파이프라인에서 오류 발생
**원인**: 데이터 스키마 변경, 소스 시스템 변경, 데이터 품질 문제
**해결책**:
- 데이터 검증 로직 강화
- 스키마 진화 처리 메커니즘
- 데이터 품질 모니터링

마무리

MLOps는 단순한 기술 도입이 아닌 조직 전체의 문화적 변화를 요구합니다. 2026년 현재, AI 모델이 비즈니스의 핵심이 되면서 MLOps의 중요성은 더욱 커지고 있습니다.

성공적인 MLOps 구축을 위한 핵심 원칙:

  1. 점진적 도입: 한 번에 모든 것을 바꾸지 말고 단계적으로 진행
  2. 자동화 우선: 반복 작업은 최대한 자동화
  3. 모니터링 필수: 모든 단계에서 모니터링과 로깅
  4. 협업 문화: 데이터 과학팀과 엔지니어링 팀의 긴밀한 협력
  5. 지속적 개선: 피드백을 바탕으로 한 지속적인 프로세스 개선

MLOps는 기술과 프로세스, 그리고 사람이 조화롭게 어우러져야 진정한 가치를 창출할 수 있습니다. 조직의 상황과 목표에 맞는 MLOps 전략을 수립하고, 단계적으로 실행해 나가시기 바랍니다.

궁금한 점이 있으신가요?

문의사항이 있으시면 언제든지 연락주세요.