DevSecOps 실전 가이드: 개발 파이프라인에 보안을 내장하는 방법

DevSecOps보안CI/CDSASTDAST컨테이너보안인프라보안취약점스캔

DevSecOps 실전 가이드: 개발 파이프라인에 보안을 내장하는 방법

2026년 현재, 사이버 보안 위협이 날로 정교해지면서 기존의 "개발 후 보안 검토" 방식으로는 더 이상 안전한 소프트웨어를 보장할 수 없게 되었습니다. DevSecOps(Development, Security, Operations)는 이런 문제를 해결하기 위해 보안을 개발 프로세스의 모든 단계에 통합하는 방법론입니다.

Gartner의 2025 보고서에 따르면, DevSecOps를 도입한 기업들이 보안 취약점으로 인한 배포 지연을 70% 줄였으며, 프로덕션 환경의 보안 인시던트를 60% 감소시켰습니다. 이제 "Shift Left Security"는 선택이 아닌 필수가 되었습니다.

DevSecOps의 핵심 개념과 원칙

전통적 보안 접근법의 한계

# 기존 방식: 개발 완료 후 보안 검토
traditional_workflow:
  development: "6주"
  security_review: "2주"  # 보안 팀의 수동 검토
  fixes_required: "3주"   # 발견된 취약점 수정
  re_review: "1주"
  total_time: "12주"

# 문제점들:
problems:
  - late_discovery: "개발 완료 후 취약점 발견"
  - high_fix_cost: "수정 비용 10-100배 증가"
  - deployment_delays: "배포 지연"
  - security_silos: "보안팀과 개발팀의 단절"
  - limited_coverage: "제한적인 보안 검토 범위"

DevSecOps의 "Shift Left" 접근법

# DevSecOps 방식: 모든 단계에 보안 통합
devSecOps_workflow:
  planning:
    - threat_modeling: "위협 모델링"
    - security_requirements: "보안 요구사항 정의"
    - risk_assessment: "리스크 평가"

  development:
    - secure_coding: "보안 코딩 가이드라인"
    - ide_security_plugins: "IDE 보안 플러그인"
    - peer_review: "보안 중심 코드 리뷰"

  build:
    - sast: "정적 애플리케이션 보안 테스트"
    - dependency_scan: "종속성 취약점 스캔"
    - secret_detection: "하드코딩된 시크릿 탐지"

  test:
    - dast: "동적 애플리케이션 보안 테스트"
    - iast: "대화형 애플리케이션 보안 테스트"
    - security_unit_tests: "보안 단위 테스트"

  deploy:
    - container_scan: "컨테이너 이미지 스캔"
    - infrastructure_scan: "인프라 설정 검증"
    - runtime_protection: "런타임 보안 모니터링"

  operate:
    - continuous_monitoring: "지속적 보안 모니터링"
    - incident_response: "보안 인시던트 대응"
    - compliance_audit: "규정 준수 감사"

개발 단계별 보안 통합

1. 계획 단계: 위협 모델링과 보안 요구사항

# threat_modeling.py - 위협 모델링 자동화
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum

class ThreatLevel(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

class AttackVector(Enum):
    NETWORK = "Network"
    ADJACENT = "Adjacent Network"
    LOCAL = "Local"
    PHYSICAL = "Physical"

@dataclass
class Asset:
    name: str
    description: str
    classification: str  # public, internal, confidential, restricted
    business_impact: ThreatLevel

@dataclass
class Threat:
    id: str
    name: str
    description: str
    attack_vector: AttackVector
    likelihood: ThreatLevel
    impact: ThreatLevel
    affected_assets: List[str]
    mitigations: List[str]

    @property
    def risk_score(self) -> float:
        """위험도 점수 계산 (CVSS 기반)"""
        return (self.likelihood.value * self.impact.value) / 16 * 10

class ThreatModel:
    def __init__(self, application_name: str):
        self.application_name = application_name
        self.assets: List[Asset] = []
        self.threats: List[Threat] = []
        self.mitigations: Dict[str, List[str]] = {}

    def add_asset(self, asset: Asset):
        """자산 추가"""
        self.assets.append(asset)

    def add_threat(self, threat: Threat):
        """위협 추가"""
        self.threats.append(threat)

    def generate_security_requirements(self) -> List[str]:
        """위협 분석을 바탕으로 보안 요구사항 생성"""
        requirements = []

        high_risk_threats = [t for t in self.threats if t.risk_score >= 7.0]

        for threat in high_risk_threats:
            for mitigation in threat.mitigations:
                if mitigation not in requirements:
                    requirements.append(mitigation)

        return requirements

    def export_to_security_policy(self) -> Dict[str, Any]:
        """보안 정책으로 내보내기"""
        return {
            "application": self.application_name,
            "assets": [
                {
                    "name": asset.name,
                    "classification": asset.classification,
                    "protection_level": asset.business_impact.name
                }
                for asset in self.assets
            ],
            "threats": [
                {
                    "id": threat.id,
                    "name": threat.name,
                    "risk_score": threat.risk_score,
                    "required_mitigations": threat.mitigations
                }
                for threat in self.threats if threat.risk_score >= 4.0
            ],
            "security_requirements": self.generate_security_requirements()
        }

# 사용 예시
def create_web_app_threat_model():
    # 웹 애플리케이션 위협 모델 생성
    threat_model = ThreatModel("E-commerce Web Application")

    # 자산 정의
    threat_model.add_asset(Asset(
        name="User Database",
        description="Customer personal and payment information",
        classification="confidential",
        business_impact=ThreatLevel.CRITICAL
    ))

    threat_model.add_asset(Asset(
        name="Product Catalog",
        description="Product information and inventory",
        classification="internal",
        business_impact=ThreatLevel.MEDIUM
    ))

    # 위협 정의
    threat_model.add_threat(Threat(
        id="T001",
        name="SQL Injection",
        description="Attacker injects malicious SQL code",
        attack_vector=AttackVector.NETWORK,
        likelihood=ThreatLevel.HIGH,
        impact=ThreatLevel.CRITICAL,
        affected_assets=["User Database"],
        mitigations=[
            "Use parameterized queries",
            "Input validation",
            "Least privilege database access",
            "Web Application Firewall"
        ]
    ))

    threat_model.add_threat(Threat(
        id="T002",
        name="Cross-Site Scripting (XSS)",
        description="Injection of malicious scripts in web pages",
        attack_vector=AttackVector.NETWORK,
        likelihood=ThreatLevel.MEDIUM,
        impact=ThreatLevel.MEDIUM,
        affected_assets=["User Database"],
        mitigations=[
            "Output encoding",
            "Content Security Policy",
            "Input sanitization",
            "HttpOnly cookies"
        ]
    ))

    return threat_model.export_to_security_policy()

# 보안 요구사항 추적
class SecurityRequirement:
    def __init__(self, id: str, description: str, control_type: str):
        self.id = id
        self.description = description
        self.control_type = control_type  # preventive, detective, corrective
        self.implementation_status = "planned"
        self.test_cases = []
        self.implementation_notes = []

    def add_test_case(self, test_case: str):
        self.test_cases.append(test_case)

    def update_status(self, status: str, notes: str = ""):
        """구현 상태 업데이트"""
        valid_statuses = ["planned", "in_progress", "implemented", "tested", "verified"]
        if status in valid_statuses:
            self.implementation_status = status
            if notes:
                self.implementation_notes.append(f"{status}: {notes}")

# 보안 요구사항 추적 시스템
class SecurityRequirementTracker:
    def __init__(self):
        self.requirements: Dict[str, SecurityRequirement] = {}

    def import_from_threat_model(self, threat_model_output: Dict[str, Any]):
        """위협 모델에서 보안 요구사항 가져오기"""
        for i, req in enumerate(threat_model_output["security_requirements"]):
            req_id = f"SEC-{i+1:03d}"
            self.requirements[req_id] = SecurityRequirement(
                id=req_id,
                description=req,
                control_type="preventive"
            )

    def generate_test_plan(self) -> List[Dict[str, Any]]:
        """보안 테스트 계획 생성"""
        test_plan = []
        for req in self.requirements.values():
            if req.test_cases:
                test_plan.append({
                    "requirement_id": req.id,
                    "description": req.description,
                    "test_cases": req.test_cases,
                    "status": req.implementation_status
                })
        return test_plan

2. 개발 단계: 보안 코딩과 IDE 통합

# secure_coding_analyzer.py - 보안 코딩 가이드라인 검증
import ast
import re
from typing import List, Dict, Tuple

class SecurityViolation:
    def __init__(self, file: str, line: int, rule: str, severity: str, message: str):
        self.file = file
        self.line = line
        self.rule = rule
        self.severity = severity  # low, medium, high, critical
        self.message = message

    def __str__(self):
        return f"[{self.severity.upper()}] {self.file}:{self.line} - {self.rule}: {self.message}"

class SecureCodingAnalyzer:
    def __init__(self):
        self.violations: List[SecurityViolation] = []

        # 보안 위험 패턴들
        self.dangerous_functions = {
            'eval': 'Code injection vulnerability',
            'exec': 'Code injection vulnerability',
            'compile': 'Code injection vulnerability',
            'input': 'Potential injection if not validated',
            '__import__': 'Dynamic import security risk'
        }

        self.crypto_bad_practices = [
            'md5', 'sha1', 'DES', 'RC4'  # 약한 암호화 알고리즘
        ]

        self.hardcoded_secrets_patterns = [
            r'password\s*=\s*["\'][^"\']+["\']',
            r'api_key\s*=\s*["\'][^"\']+["\']',
            r'secret\s*=\s*["\'][^"\']+["\']',
            r'token\s*=\s*["\'][^"\']+["\']'
        ]

    def analyze_file(self, file_path: str) -> List[SecurityViolation]:
        """파일의 보안 취약점 분석"""
        violations = []

        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                content = file.read()
                lines = content.split('\n')

            # AST를 사용한 정적 분석
            tree = ast.parse(content)
            violations.extend(self._analyze_ast(tree, file_path))

            # 정규식을 사용한 패턴 매칭
            violations.extend(self._analyze_patterns(lines, file_path))

        except Exception as e:
            violations.append(SecurityViolation(
                file=file_path,
                line=0,
                rule="PARSE_ERROR",
                severity="medium",
                message=f"Failed to parse file: {e}"
            ))

        return violations

    def _analyze_ast(self, tree: ast.AST, file_path: str) -> List[SecurityViolation]:
        """AST 기반 보안 분석"""
        violations = []

        class SecurityVisitor(ast.NodeVisitor):
            def __init__(self, analyzer):
                self.analyzer = analyzer
                self.violations = []

            def visit_Call(self, node):
                # 위험한 함수 호출 검사
                if isinstance(node.func, ast.Name):
                    func_name = node.func.id
                    if func_name in self.analyzer.dangerous_functions:
                        self.violations.append(SecurityViolation(
                            file=file_path,
                            line=node.lineno,
                            rule=f"DANGEROUS_FUNCTION_{func_name.upper()}",
                            severity="high",
                            message=f"Use of dangerous function '{func_name}': {self.analyzer.dangerous_functions[func_name]}"
                        ))

                # SQL 문자열 직접 결합 검사
                if isinstance(node.func, ast.Attribute) and node.func.attr in ['execute', 'executemany']:
                    for arg in node.args:
                        if isinstance(arg, ast.BinOp) and isinstance(arg.op, (ast.Add, ast.Mod)):
                            self.violations.append(SecurityViolation(
                                file=file_path,
                                line=node.lineno,
                                rule="SQL_INJECTION_RISK",
                                severity="critical",
                                message="Possible SQL injection: use parameterized queries instead of string concatenation"
                            ))

                self.generic_visit(node)

            def visit_Import(self, node):
                # 위험한 모듈 import 검사
                for alias in node.names:
                    if any(crypto in alias.name for crypto in self.analyzer.crypto_bad_practices):
                        self.violations.append(SecurityViolation(
                            file=file_path,
                            line=node.lineno,
                            rule="WEAK_CRYPTO",
                            severity="medium",
                            message=f"Weak cryptographic algorithm imported: {alias.name}"
                        ))

                self.generic_visit(node)

        visitor = SecurityVisitor(self)
        visitor.visit(tree)
        return visitor.violations

    def _analyze_patterns(self, lines: List[str], file_path: str) -> List[SecurityViolation]:
        """패턴 매칭을 통한 보안 분석"""
        violations = []

        for line_num, line in enumerate(lines, 1):
            # 하드코딩된 시크릿 검사
            for pattern in self.hardcoded_secrets_patterns:
                if re.search(pattern, line, re.IGNORECASE):
                    violations.append(SecurityViolation(
                        file=file_path,
                        line=line_num,
                        rule="HARDCODED_SECRET",
                        severity="critical",
                        message="Potential hardcoded secret detected"
                    ))

            # 인증/인가 바이패스 패턴
            if re.search(r'auth\s*=\s*False', line, re.IGNORECASE):
                violations.append(SecurityViolation(
                    file=file_path,
                    line=line_num,
                    rule="AUTH_BYPASS",
                    severity="high",
                    message="Authentication bypass detected"
                ))

            # 디버그 모드 활성화
            if re.search(r'debug\s*=\s*True', line, re.IGNORECASE):
                violations.append(SecurityViolation(
                    file=file_path,
                    line=line_num,
                    rule="DEBUG_MODE",
                    severity="medium",
                    message="Debug mode enabled - should be disabled in production"
                ))

        return violations

# Pre-commit 훅 통합
def create_precommit_security_hook():
    """Pre-commit 보안 훅 생성"""
    hook_script = '''#!/bin/bash
    # .git/hooks/pre-commit - 보안 검사 훅

    echo "Running security checks..."

    # Python 파일 보안 검사
    python_files=$(git diff --cached --name-only --diff-filter=ACM | grep "\\.py$")

    if [ -n "$python_files" ]; then
        for file in $python_files; do
            echo "Scanning $file for security vulnerabilities..."
            python secure_coding_analyzer.py "$file"

            if [ $? -ne 0 ]; then
                echo "Security vulnerabilities found in $file"
                echo "Commit blocked. Please fix security issues before committing."
                exit 1
            fi
        done
    fi

    # 시크릿 검사
    echo "Checking for hardcoded secrets..."
    if git diff --cached --name-only | xargs grep -l -E "(password|api_key|secret|token)\\s*=\\s*[\"'][^\"']+[\"']"; then
        echo "Potential hardcoded secrets detected!"
        echo "Please remove secrets and use environment variables or secret management tools."
        exit 1
    fi

    # 의존성 취약점 검사
    if [ -f "requirements.txt" ] && git diff --cached --name-only | grep -q "requirements.txt"; then
        echo "Checking dependencies for known vulnerabilities..."
        safety check -r requirements.txt

        if [ $? -ne 0 ]; then
            echo "Vulnerable dependencies detected!"
            echo "Please update dependencies before committing."
            exit 1
        fi
    fi

    echo "Security checks passed!"
    '''

    return hook_script

3. CI/CD 파이프라인 보안 통합

# .github/workflows/devsecops-pipeline.yml
name: DevSecOps Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}

jobs:
  security-scan:
    name: Security Scanning
    runs-on: ubuntu-latest
    permissions:
      contents: read
      security-events: write

    steps:
    - name: Checkout code
      uses: actions/checkout@v4
      with:
        fetch-depth: 0

    # 1. Secret Scanning
    - name: Run TruffleHog OSS
      uses: trufflesecurity/trufflehog@main
      with:
        path: ./
        base: main
        head: HEAD
        extra_args: --debug --only-verified

    # 2. Static Application Security Testing (SAST)
    - name: Run Semgrep
      uses: returntocorp/semgrep-action@v1
      with:
        config: >-
          p/security-audit
          p/secrets
          p/owasp-top-ten
        generateSarif: "1"

    - name: Upload Semgrep results to GitHub
      uses: github/codeql-action/upload-sarif@v2
      if: always()
      with:
        sarif_file: semgrep.sarif

    # 3. Dependency Vulnerability Scanning
    - name: Run Snyk to check for vulnerabilities
      uses: snyk/actions/python@master
      env:
        SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
      with:
        args: --severity-threshold=medium
        command: test

    # 4. License Compliance Check
    - name: FOSSA Scan
      uses: fossas/fossa-action@main
      with:
        api-key: ${{ secrets.FOSSA_API_KEY }}

    # 5. Code Quality & Security
    - name: SonarCloud Scan
      uses: SonarSource/sonarcloud-github-action@master
      env:
        GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
        SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}

  build-and-scan:
    name: Build and Container Security Scan
    needs: security-scan
    runs-on: ubuntu-latest
    permissions:
      contents: read
      packages: write
      security-events: write

    steps:
    - name: Checkout repository
      uses: actions/checkout@v4

    - name: Set up Docker Buildx
      uses: docker/setup-buildx-action@v3

    - name: Log in to Container Registry
      uses: docker/login-action@v3
      with:
        registry: ${{ env.REGISTRY }}
        username: ${{ github.actor }}
        password: ${{ secrets.GITHUB_TOKEN }}

    - name: Extract metadata
      id: meta
      uses: docker/metadata-action@v5
      with:
        images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}

    - name: Build Docker image
      id: build
      uses: docker/build-push-action@v5
      with:
        context: .
        push: false
        tags: ${{ steps.meta.outputs.tags }}
        labels: ${{ steps.meta.outputs.labels }}
        load: true

    # 6. Container Image Scanning
    - name: Run Trivy vulnerability scanner
      uses: aquasecurity/trivy-action@master
      with:
        image-ref: ${{ steps.meta.outputs.tags }}
        format: 'sarif'
        output: 'trivy-results.sarif'

    - name: Upload Trivy scan results
      uses: github/codeql-action/upload-sarif@v2
      if: always()
      with:
        sarif_file: 'trivy-results.sarif'

    # 7. Docker Bench Security
    - name: Run Docker Bench Security
      run: |
        docker run --rm --net host --pid host --userns host --cap-add audit_control \
          -e DOCKER_CONTENT_TRUST=$DOCKER_CONTENT_TRUST \
          -v /var/lib:/var/lib:ro \
          -v /var/run/docker.sock:/var/run/docker.sock:ro \
          -v /usr/lib/systemd:/usr/lib/systemd:ro \
          -v /etc:/etc:ro \
          --label docker_bench_security \
          docker/docker-bench-security

    # 8. Container Image Signing (Cosign)
    - name: Install Cosign
      uses: sigstore/cosign-installer@v3

    - name: Sign container image
      if: github.ref == 'refs/heads/main'
      run: |
        cosign sign --yes ${{ steps.meta.outputs.tags }}@${{ steps.build.outputs.digest }}
      env:
        COSIGN_EXPERIMENTAL: 1

    - name: Push Docker image
      if: github.ref == 'refs/heads/main'
      uses: docker/build-push-action@v5
      with:
        context: .
        push: true
        tags: ${{ steps.meta.outputs.tags }}

  dynamic-security-testing:
    name: Dynamic Application Security Testing (DAST)
    needs: build-and-scan
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest

    steps:
    - name: Checkout code
      uses: actions/checkout@v4

    - name: Deploy to staging environment
      run: |
        # 스테이징 환경에 애플리케이션 배포
        echo "Deploying to staging environment..."
        # kubectl apply -f k8s/staging/ 등의 배포 명령

    - name: Wait for deployment
      run: |
        sleep 120  # 애플리케이션이 준비될 때까지 대기

    # 9. OWASP ZAP Dynamic Scan
    - name: Run OWASP ZAP Full Scan
      uses: zaproxy/action-full-scan@v0.10.0
      with:
        target: 'https://staging.myapp.com'
        rules_file_name: '.zap/rules.tsv'
        cmd_options: '-a'

    # 10. Nuclei Security Scanner
    - name: Run Nuclei Scanner
      uses: projectdiscovery/nuclei-action@main
      with:
        target: 'https://staging.myapp.com'
        github-report: true
        github-token: ${{ secrets.GITHUB_TOKEN }}

  infrastructure-security:
    name: Infrastructure Security Scanning
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v4

    # 11. Infrastructure as Code Security
    - name: Run Checkov
      id: checkov
      uses: bridgecrewio/checkov-action@master
      with:
        directory: .
        framework: terraform,kubernetes,dockerfile
        output_format: sarif
        output_file_path: checkov-report.sarif

    - name: Upload Checkov scan results
      uses: github/codeql-action/upload-sarif@v2
      if: always()
      with:
        sarif_file: checkov-report.sarif

    # 12. Kubernetes Security Scanning
    - name: Run Kubesec
      run: |
        find k8s/ -name "*.yaml" | xargs -I {} bash -c 'echo "Scanning {}" && curl -sSX POST --data-binary @{} https://v2.kubesec.io/scan'

    # 13. Terraform Security
    - name: Run tfsec
      uses: aquasecurity/tfsec-pr-commenter-action@v1.2.0
      with:
        github_token: ${{ github.token }}

  compliance-check:
    name: Compliance and Policy Check
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v4

    # 14. Open Policy Agent (OPA) Policy Check
    - name: Run OPA Conftest
      run: |
        # OPA 정책 검증
        docker run --rm -v $(pwd):/project openpolicyagent/conftest test --policy /project/.opa-policies /project/k8s/

    # 15. Compliance Scanning
    - name: Run compliance checks
      run: |
        # PCI DSS, GDPR, SOX 등 규정 준수 검사
        echo "Running compliance checks..."

  security-reporting:
    name: Security Report Generation
    needs: [security-scan, build-and-scan, dynamic-security-testing, infrastructure-security]
    if: always()
    runs-on: ubuntu-latest
    steps:
    - name: Generate Security Dashboard
      run: |
        # 보안 스캔 결과를 통합한 대시보드 생성
        echo "Generating comprehensive security report..."

    - name: Notify security team
      if: failure()
      uses: 8398a7/action-slack@v3
      with:
        status: failure
        channel: '#security-alerts'
        fields: repo,message,commit,author,action,eventName,ref,workflow
      env:
        SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

4. 컨테이너 보안 강화

# Dockerfile.secure - 보안 강화된 Dockerfile
# 1. 최소 권한 base 이미지 사용
FROM python:3.11-slim as builder

# 2. 보안 업데이트 설치
RUN apt-get update && apt-get upgrade -y && \
    apt-get install -y --no-install-recommends \
        build-essential \
        curl \
    && rm -rf /var/lib/apt/lists/*

# 3. 의존성 별도 설치 (레이어 캐싱 최적화)
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt

# 4. 프로덕션 스테이지
FROM python:3.11-slim as production

# 5. 보안 업데이트
RUN apt-get update && apt-get upgrade -y && \
    # 필수 패키지만 설치
    apt-get install -y --no-install-recommends \
        curl \
        ca-certificates \
    && rm -rf /var/lib/apt/lists/* \
    # 불필요한 패키지 정리
    && apt-get autoremove -y \
    && apt-get clean

# 6. 비루트 사용자 생성
RUN groupadd --gid 1000 appuser && \
    useradd --uid 1000 --gid 1000 --create-home appuser

# 7. 애플리케이션 디렉토리 설정
WORKDIR /app

# 8. 빌드된 의존성 복사
COPY --from=builder /root/.local/lib/python3.11/site-packages /home/appuser/.local/lib/python3.11/site-packages
COPY --from=builder /root/.local/bin /home/appuser/.local/bin

# 9. 애플리케이션 코드 복사
COPY --chown=appuser:appuser . .

# 10. 파일 권한 설정
RUN chmod -R 755 /app && \
    # 실행 파일만 실행 권한
    find /app -type f -name "*.py" -exec chmod 644 {} \; && \
    # 설정 파일 보호
    find /app -name "*.conf" -o -name "*.ini" -exec chmod 600 {} \;

# 11. 사용자 전환
USER appuser

# 12. PATH 설정
ENV PATH="/home/appuser/.local/bin:$PATH"

# 13. 헬스체크 설정
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8080/health || exit 1

# 14. 시그널 처리를 위한 init 설정
# tini 사용으로 좀비 프로세스 방지
RUN pip install --user tini
ENTRYPOINT ["python", "-m", "tini", "--"]

# 15. 보안 라벨 추가
LABEL security.scan.date="2026-01-24" \
      security.scan.tool="trivy" \
      security.compliance="pci-dss" \
      maintainer="security-team@company.com"

# 16. 애플리케이션 실행
EXPOSE 8080
CMD ["python", "app.py"]
# k8s/security/pod-security-policy.yaml - Kubernetes 보안 정책
apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
  name: restricted-psp
spec:
  # 권한 에스컬레이션 방지
  privileged: false
  allowPrivilegeEscalation: false

  # 루트 실행 금지
  runAsUser:
    rule: 'MustRunAsNonRoot'
  runAsGroup:
    rule: 'MustRunAs'
    ranges:
      - min: 1000
        max: 65535

  # 파일시스템 읽기 전용
  readOnlyRootFilesystem: true

  # 호스트 네임스페이스 격리
  hostNetwork: false
  hostIPC: false
  hostPID: false

  # 볼륨 타입 제한
  volumes:
    - 'configMap'
    - 'emptyDir'
    - 'projected'
    - 'secret'
    - 'downwardAPI'
    - 'persistentVolumeClaim'

  # 시큐리티 컨텍스트
  seLinux:
    rule: 'RunAsAny'
  fsGroup:
    rule: 'RunAsAny'

  # 네트워크 정책
  defaultAllowPrivilegeEscalation: false
  allowedCapabilities: []
  requiredDropCapabilities:
    - ALL

---
# k8s/security/network-policy.yaml - 네트워크 보안 정책
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: myapp-netpol
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: myapp

  policyTypes:
  - Ingress
  - Egress

  ingress:
  # 인그레스 컨트롤러에서만 트래픽 허용
  - from:
    - namespaceSelector:
        matchLabels:
          name: ingress-nginx
    ports:
    - protocol: TCP
      port: 8080

  # 모니터링 시스템에서의 접근 허용
  - from:
    - namespaceSelector:
        matchLabels:
          name: monitoring
    ports:
    - protocol: TCP
      port: 9090  # 메트릭 포트

  egress:
  # 데이터베이스 접근 허용
  - to:
    - podSelector:
        matchLabels:
          app: postgresql
    ports:
    - protocol: TCP
      port: 5432

  # Redis 접근 허용
  - to:
    - podSelector:
        matchLabels:
          app: redis
    ports:
    - protocol: TCP
      port: 6379

  # DNS 쿼리 허용
  - to: []
    ports:
    - protocol: UDP
      port: 53
    - protocol: TCP
      port: 53

  # HTTPS 아웃바운드 허용 (API 호출용)
  - to: []
    ports:
    - protocol: TCP
      port: 443

---
# k8s/security/rbac.yaml - RBAC 설정
apiVersion: v1
kind: ServiceAccount
metadata:
  name: myapp-service-account
  namespace: production

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: production
  name: myapp-role
rules:
# ConfigMap 읽기 권한만
- apiGroups: [""]
  resources: ["configmaps"]
  verbs: ["get", "list"]
# Secret 읽기 권한만 (필요한 경우)
- apiGroups: [""]
  resources: ["secrets"]
  verbs: ["get"]
  resourceNames: ["myapp-secrets"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: myapp-rolebinding
  namespace: production
subjects:
- kind: ServiceAccount
  name: myapp-service-account
  namespace: production
roleRef:
  kind: Role
  name: myapp-role
  apiGroup: rbac.authorization.k8s.io

런타임 보안 모니터링

Falco를 활용한 실시간 위협 탐지

# monitoring/falco-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: falco-config
  namespace: security
data:
  falco.yaml: |
    rules_file:
      - /etc/falco/falco_rules.yaml
      - /etc/falco/falco_rules.local.yaml
      - /etc/falco/k8s_audit_rules.yaml
      - /etc/falco/rules.d

    # 출력 설정
    json_output: true
    json_include_output_property: true

    # gRPC 설정
    grpc:
      enabled: true
      bind_address: "0.0.0.0:5060"
      threadiness: 0

    # 웹훅 설정
    http_output:
      enabled: true
      url: http://falco-sidekick:2801/

  custom_rules.yaml: |
    # 커스텀 보안 규칙
    - rule: Unexpected outbound connection
      desc: Detect unexpected outbound network connections
      condition: >
        (inbound or outbound) and not fd.typechar=4 and not fd.typechar=6 and
        not proc.name in (curl, wget, python, node, java) and
        fd.net != "127.0.0.1" and not fd.snet in (rfc_1918_addresses)
      output: >
        Unexpected outbound connection (user=%user.name command=%proc.cmdline
        connection=%fd.name container=%container.name image=%container.image)
      priority: WARNING
      tags: [network, suspicious]

    - rule: Sensitive file access
      desc: Detect access to sensitive files
      condition: >
        open_read and
        (fd.filename startswith /etc/passwd or
         fd.filename startswith /etc/shadow or
         fd.filename startswith /etc/sudoers or
         fd.filename startswith /root/.ssh or
         fd.filename startswith /home/*/.ssh)
      output: >
        Sensitive file access (user=%user.name command=%proc.cmdline
        file=%fd.name container=%container.name)
      priority: HIGH
      tags: [file, sensitive]

    - rule: Cryptocurrency mining detection
      desc: Detect potential cryptocurrency mining activity
      condition: >
        spawned_process and
        (proc.name in (xmrig, cpuminer, cgminer, bfgminer, sgminer) or
         proc.cmdline contains "stratum+tcp" or
         proc.cmdline contains "mining.pool" or
         proc.cmdline contains "cryptonight")
      output: >
        Potential cryptocurrency mining detected (user=%user.name
        command=%proc.cmdline container=%container.name)
      priority: CRITICAL
      tags: [malware, mining]

---
# monitoring/falco-deployment.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: falco
  namespace: security
spec:
  selector:
    matchLabels:
      app: falco
  template:
    metadata:
      labels:
        app: falco
    spec:
      serviceAccount: falco
      hostNetwork: true
      hostPID: true
      containers:
      - name: falco
        image: falcosecurity/falco:0.35.1
        args:
          - /usr/bin/falco
          - --cri
          - /run/containerd/containerd.sock
          - --k8s-api
          - --k8s-api-cert
          - /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
          - --k8s-api-token
          - /var/run/secrets/kubernetes.io/serviceaccount/token
        securityContext:
          privileged: true
        volumeMounts:
        - mountPath: /host/var/run/docker.sock
          name: docker-socket
        - mountPath: /host/dev
          name: dev-fs
        - mountPath: /host/proc
          name: proc-fs
          readOnly: true
        - mountPath: /host/boot
          name: boot-fs
          readOnly: true
        - mountPath: /host/lib/modules
          name: lib-modules
          readOnly: true
        - mountPath: /host/usr
          name: usr-fs
          readOnly: true
        - mountPath: /etc/falco
          name: falco-config
        env:
        - name: FALCO_K8S_NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
      volumes:
      - name: docker-socket
        hostPath:
          path: /var/run/docker.sock
      - name: dev-fs
        hostPath:
          path: /dev
      - name: proc-fs
        hostPath:
          path: /proc
      - name: boot-fs
        hostPath:
          path: /boot
      - name: lib-modules
        hostPath:
          path: /lib/modules
      - name: usr-fs
        hostPath:
          path: /usr
      - name: falco-config
        configMap:
          name: falco-config

보안 인시던트 자동 대응

# security_incident_response.py - 자동 인시던트 대응 시스템
import json
import logging
from datetime import datetime
from typing import Dict, List, Any
from kubernetes import client, config
from slack_sdk import WebClient
import smtplib
from email.mime.text import MimeText

class SecurityIncidentResponse:
    def __init__(self):
        self.k8s_client = self._init_k8s_client()
        self.slack_client = WebClient(token=os.environ.get('SLACK_TOKEN'))
        self.severity_thresholds = {
            'CRITICAL': self._handle_critical_incident,
            'HIGH': self._handle_high_incident,
            'MEDIUM': self._handle_medium_incident,
            'LOW': self._handle_low_incident
        }

    def _init_k8s_client(self):
        """Kubernetes 클라이언트 초기화"""
        try:
            config.load_incluster_config()
        except:
            config.load_kube_config()
        return client.AppsV1Api()

    def process_falco_alert(self, alert_data: Dict[str, Any]):
        """Falco 알럿 처리"""
        try:
            alert = self._parse_falco_alert(alert_data)
            severity = self._calculate_severity(alert)

            # 알럿 로그 기록
            self._log_incident(alert, severity)

            # 심각도별 대응 수행
            response_handler = self.severity_thresholds.get(severity)
            if response_handler:
                response_handler(alert)

            # 보안팀 알림
            self._notify_security_team(alert, severity)

        except Exception as e:
            logging.error(f"Error processing Falco alert: {e}")

    def _parse_falco_alert(self, alert_data: Dict[str, Any]) -> Dict[str, Any]:
        """Falco 알럿 파싱"""
        return {
            'timestamp': alert_data.get('time'),
            'rule': alert_data.get('rule'),
            'priority': alert_data.get('priority'),
            'output': alert_data.get('output'),
            'fields': alert_data.get('output_fields', {}),
            'hostname': alert_data.get('hostname'),
            'container_name': alert_data.get('output_fields', {}).get('container.name'),
            'pod_name': alert_data.get('output_fields', {}).get('k8s.pod.name'),
            'namespace': alert_data.get('output_fields', {}).get('k8s.ns.name'),
            'process_name': alert_data.get('output_fields', {}).get('proc.name'),
            'command_line': alert_data.get('output_fields', {}).get('proc.cmdline'),
            'user_name': alert_data.get('output_fields', {}).get('user.name')
        }

    def _calculate_severity(self, alert: Dict[str, Any]) -> str:
        """알럿 심각도 계산"""
        priority = alert.get('priority', '').upper()
        rule = alert.get('rule', '').lower()

        # 크리티컬 조건들
        critical_indicators = [
            'cryptocurrency mining',
            'privilege escalation',
            'container escape',
            'malware',
            'backdoor'
        ]

        if any(indicator in rule for indicator in critical_indicators):
            return 'CRITICAL'

        # Falco 우선순위 기반 매핑
        priority_mapping = {
            'EMERGENCY': 'CRITICAL',
            'ALERT': 'CRITICAL',
            'CRITICAL': 'CRITICAL',
            'ERROR': 'HIGH',
            'WARNING': 'MEDIUM',
            'NOTICE': 'LOW',
            'INFORMATIONAL': 'LOW',
            'DEBUG': 'LOW'
        }

        return priority_mapping.get(priority, 'MEDIUM')

    def _handle_critical_incident(self, alert: Dict[str, Any]):
        """크리티컬 인시던트 처리"""
        logging.critical(f"CRITICAL SECURITY INCIDENT: {alert['rule']}")

        # 1. 즉시 격리
        if alert.get('pod_name') and alert.get('namespace'):
            self._isolate_pod(alert['pod_name'], alert['namespace'])

        # 2. 네트워크 차단
        self._apply_network_isolation(alert['namespace'], alert['pod_name'])

        # 3. 포렌식 데이터 수집
        self._collect_forensic_data(alert)

        # 4. 즉시 에스컬레이션
        self._escalate_to_security_team(alert, 'CRITICAL')

    def _handle_high_incident(self, alert: Dict[str, Any]):
        """고위험 인시던트 처리"""
        logging.error(f"HIGH SECURITY INCIDENT: {alert['rule']}")

        # 1. 상세 모니터링 활성화
        self._enable_enhanced_monitoring(alert['namespace'], alert['pod_name'])

        # 2. 네트워크 트래픽 제한
        self._restrict_network_traffic(alert['namespace'], alert['pod_name'])

        # 3. 보안팀 알림
        self._notify_security_team(alert, 'HIGH')

    def _handle_medium_incident(self, alert: Dict[str, Any]):
        """중위험 인시던트 처리"""
        logging.warning(f"MEDIUM SECURITY INCIDENT: {alert['rule']}")

        # 1. 로그 수집 강화
        self._enhance_logging(alert['namespace'], alert['pod_name'])

        # 2. 알림 및 추적
        self._track_incident(alert)

    def _handle_low_incident(self, alert: Dict[str, Any]):
        """저위험 인시던트 처리"""
        logging.info(f"LOW SECURITY INCIDENT: {alert['rule']}")

        # 기본 로깅만 수행
        self._log_incident(alert, 'LOW')

    def _isolate_pod(self, pod_name: str, namespace: str):
        """팟 격리 (NetworkPolicy 적용)"""
        try:
            isolation_policy = {
                'apiVersion': 'networking.k8s.io/v1',
                'kind': 'NetworkPolicy',
                'metadata': {
                    'name': f'isolate-{pod_name}',
                    'namespace': namespace
                },
                'spec': {
                    'podSelector': {
                        'matchLabels': {
                            'security.incident': 'isolated'
                        }
                    },
                    'policyTypes': ['Ingress', 'Egress'],
                    'ingress': [],  # 모든 인그레스 차단
                    'egress': []    # 모든 이그레스 차단
                }
            }

            # 팟에 격리 라벨 추가
            patch_body = {
                'metadata': {
                    'labels': {
                        'security.incident': 'isolated'
                    }
                }
            }

            client.CoreV1Api().patch_namespaced_pod(
                name=pod_name,
                namespace=namespace,
                body=patch_body
            )

            # NetworkPolicy 적용
            client.NetworkingV1Api().create_namespaced_network_policy(
                namespace=namespace,
                body=isolation_policy
            )

            logging.info(f"Pod {pod_name} isolated in namespace {namespace}")

        except Exception as e:
            logging.error(f"Failed to isolate pod {pod_name}: {e}")

    def _collect_forensic_data(self, alert: Dict[str, Any]):
        """포렌식 데이터 수집"""
        try:
            forensic_data = {
                'timestamp': datetime.utcnow().isoformat(),
                'incident_id': f"INC-{datetime.now().strftime('%Y%m%d%H%M%S')}",
                'alert_details': alert,
                'pod_logs': self._get_pod_logs(alert.get('pod_name'), alert.get('namespace')),
                'container_info': self._get_container_info(alert.get('pod_name'), alert.get('namespace')),
                'network_connections': self._get_network_connections(alert.get('pod_name')),
                'process_tree': self._get_process_tree(alert.get('pod_name'))
            }

            # 포렌식 데이터를 안전한 저장소에 저장
            self._store_forensic_data(forensic_data)

        except Exception as e:
            logging.error(f"Failed to collect forensic data: {e}")

    def _notify_security_team(self, alert: Dict[str, Any], severity: str):
        """보안팀 알림"""
        try:
            # Slack 알림
            slack_message = self._format_slack_message(alert, severity)
            self.slack_client.chat_postMessage(
                channel='#security-incidents',
                text=slack_message['text'],
                attachments=slack_message['attachments']
            )

            # 이메일 알림 (크리티컬한 경우)
            if severity in ['CRITICAL', 'HIGH']:
                self._send_email_alert(alert, severity)

        except Exception as e:
            logging.error(f"Failed to notify security team: {e}")

    def _format_slack_message(self, alert: Dict[str, Any], severity: str):
        """Slack 메시지 포맷"""
        color_map = {
            'CRITICAL': '#FF0000',
            'HIGH': '#FF8C00',
            'MEDIUM': '#FFD700',
            'LOW': '#32CD32'
        }

        return {
            'text': f"🚨 Security Incident Detected - {severity}",
            'attachments': [{
                'color': color_map.get(severity, '#808080'),
                'title': f"Rule: {alert.get('rule', 'Unknown')}",
                'fields': [
                    {'title': 'Severity', 'value': severity, 'short': True},
                    {'title': 'Timestamp', 'value': alert.get('timestamp', 'Unknown'), 'short': True},
                    {'title': 'Pod', 'value': alert.get('pod_name', 'Unknown'), 'short': True},
                    {'title': 'Namespace', 'value': alert.get('namespace', 'Unknown'), 'short': True},
                    {'title': 'Process', 'value': alert.get('process_name', 'Unknown'), 'short': True},
                    {'title': 'User', 'value': alert.get('user_name', 'Unknown'), 'short': True},
                ],
                'text': f"```{alert.get('output', 'No details available')}```"
            }]
        }

# Flask 웹훅 엔드포인트
from flask import Flask, request, jsonify

app = Flask(__name__)
incident_response = SecurityIncidentResponse()

@app.route('/falco/webhook', methods=['POST'])
def handle_falco_webhook():
    """Falco 웹훅 엔드포인트"""
    try:
        alert_data = request.json
        incident_response.process_falco_alert(alert_data)
        return jsonify({'status': 'processed'}), 200
    except Exception as e:
        logging.error(f"Error handling Falco webhook: {e}")
        return jsonify({'error': str(e)}), 500

@app.route('/health', methods=['GET'])
def health_check():
    return jsonify({'status': 'healthy'}), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

DevSecOps 메트릭과 KPI

보안 메트릭 수집 및 대시보드

# monitoring/security-metrics.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: security-dashboard-config
  namespace: monitoring
data:
  dashboard.json: |
    {
      "dashboard": {
        "title": "DevSecOps Security Metrics",
        "panels": [
          {
            "title": "Security Scan Results",
            "type": "stat",
            "targets": [
              {
                "expr": "sum(security_scan_vulnerabilities_total) by (severity)",
                "legendFormat": "{{ severity }} vulnerabilities"
              }
            ]
          },
          {
            "title": "Security Incidents Trend",
            "type": "graph",
            "targets": [
              {
                "expr": "rate(security_incidents_total[5m]) * 300",
                "legendFormat": "Incidents per 5min"
              }
            ]
          },
          {
            "title": "Time to Fix Security Issues",
            "type": "histogram",
            "targets": [
              {
                "expr": "histogram_quantile(0.95, security_issue_resolution_time_bucket)",
                "legendFormat": "95th percentile fix time"
              }
            ]
          },
          {
            "title": "Security Policy Compliance",
            "type": "pie",
            "targets": [
              {
                "expr": "compliance_policy_status",
                "legendFormat": "{{ policy_name }}"
              }
            ]
          }
        ]
      }
    }

  prometheus-rules.yaml: |
    groups:
    - name: security.rules
      rules:
      - alert: HighSeverityVulnerability
        expr: security_scan_vulnerabilities_total{severity="critical"} > 0
        for: 0m
        labels:
          severity: critical
          team: security
        annotations:
          summary: "Critical vulnerability detected"
          description: "{{ $value }} critical vulnerabilities found"

      - alert: SecurityPolicyViolation
        expr: compliance_policy_violations_total > 0
        for: 1m
        labels:
          severity: warning
          team: security
        annotations:
          summary: "Security policy violation detected"
          description: "Security policy {{ $labels.policy }} has been violated"

      - alert: SecurityIncidentSpike
        expr: rate(security_incidents_total[5m]) > 0.1
        for: 2m
        labels:
          severity: warning
          team: security
        annotations:
          summary: "Spike in security incidents"
          description: "Security incident rate is {{ $value }} incidents per second"
# metrics_collector.py - 보안 메트릭 수집기
import time
import json
from prometheus_client import CollectorRegistry, Gauge, Counter, Histogram, start_http_server
from datetime import datetime, timedelta
import requests

class SecurityMetricsCollector:
    def __init__(self):
        self.registry = CollectorRegistry()

        # 메트릭 정의
        self.vulnerability_gauge = Gauge(
            'security_scan_vulnerabilities_total',
            'Total number of vulnerabilities by severity',
            ['severity', 'component'],
            registry=self.registry
        )

        self.incident_counter = Counter(
            'security_incidents_total',
            'Total number of security incidents',
            ['severity', 'type'],
            registry=self.registry
        )

        self.fix_time_histogram = Histogram(
            'security_issue_resolution_time_seconds',
            'Time to resolve security issues',
            ['severity', 'component'],
            buckets=[300, 1800, 3600, 21600, 86400, 604800],  # 5m, 30m, 1h, 6h, 1d, 1w
            registry=self.registry
        )

        self.compliance_gauge = Gauge(
            'compliance_policy_status',
            'Compliance policy status (1=compliant, 0=non-compliant)',
            ['policy_name', 'environment'],
            registry=self.registry
        )

        # 데이터 소스 설정
        self.data_sources = {
            'snyk': self._collect_snyk_metrics,
            'sonar': self._collect_sonar_metrics,
            'falco': self._collect_falco_metrics,
            'policy': self._collect_policy_metrics
        }

    def start_collection(self, interval=60):
        """메트릭 수집 시작"""
        start_http_server(8000, registry=self.registry)

        while True:
            for source_name, collector_func in self.data_sources.items():
                try:
                    collector_func()
                except Exception as e:
                    print(f"Error collecting {source_name} metrics: {e}")

            time.sleep(interval)

    def _collect_snyk_metrics(self):
        """Snyk 취약점 스캔 메트릭 수집"""
        try:
            # Snyk API 호출 (예시)
            response = requests.get(
                'https://snyk.io/api/v1/org/{org-id}/projects',
                headers={'Authorization': f'token {SNYK_TOKEN}'}
            )

            if response.status_code == 200:
                projects = response.json()

                for project in projects['projects']:
                    # 각 프로젝트의 취약점 정보 수집
                    vuln_response = requests.get(
                        f'https://snyk.io/api/v1/org/{org_id}/project/{project["id"]}/aggregated-issues',
                        headers={'Authorization': f'token {SNYK_TOKEN}'}
                    )

                    if vuln_response.status_code == 200:
                        issues = vuln_response.json()

                        # 심각도별 취약점 수 업데이트
                        severity_counts = {
                            'critical': 0, 'high': 0, 'medium': 0, 'low': 0
                        }

                        for issue in issues.get('issues', []):
                            severity = issue.get('issueData', {}).get('severity', 'unknown').lower()
                            if severity in severity_counts:
                                severity_counts[severity] += 1

                        for severity, count in severity_counts.items():
                            self.vulnerability_gauge.labels(
                                severity=severity,
                                component=project['name']
                            ).set(count)

        except Exception as e:
            print(f"Error collecting Snyk metrics: {e}")

    def _collect_sonar_metrics(self):
        """SonarQube 보안 메트릭 수집"""
        try:
            # SonarQube API 호출
            response = requests.get(
                f'http://sonarqube:9000/api/issues/search?componentKeys=my-project&types=VULNERABILITY,SECURITY_HOTSPOT',
                auth=(SONAR_USER, SONAR_PASSWORD)
            )

            if response.status_code == 200:
                issues = response.json()

                severity_counts = {}
                for issue in issues.get('issues', []):
                    severity = issue.get('severity', 'UNKNOWN').lower()
                    component = issue.get('component', 'unknown')

                    key = (severity, component)
                    severity_counts[key] = severity_counts.get(key, 0) + 1

                for (severity, component), count in severity_counts.items():
                    self.vulnerability_gauge.labels(
                        severity=severity,
                        component=component
                    ).set(count)

        except Exception as e:
            print(f"Error collecting SonarQube metrics: {e}")

    def _collect_falco_metrics(self):
        """Falco 보안 인시던트 메트릭 수집"""
        try:
            # Falco 이벤트 로그에서 메트릭 추출
            # (실제로는 Falco sidekick이나 로그 집계 시스템에서 수집)

            # 예시: 최근 1시간의 인시던트 수집
            end_time = datetime.utcnow()
            start_time = end_time - timedelta(hours=1)

            # ElasticSearch나 Loki에서 Falco 로그 쿼리
            incidents = self._query_security_incidents(start_time, end_time)

            incident_counts = {}
            for incident in incidents:
                severity = incident.get('severity', 'unknown')
                incident_type = incident.get('rule_name', 'unknown')

                key = (severity, incident_type)
                incident_counts[key] = incident_counts.get(key, 0) + 1

            for (severity, incident_type), count in incident_counts.items():
                self.incident_counter.labels(
                    severity=severity,
                    type=incident_type
                ).inc(count)

        except Exception as e:
            print(f"Error collecting Falco metrics: {e}")

    def _collect_policy_metrics(self):
        """정책 준수 메트릭 수집"""
        try:
            # OPA Gatekeeper 정책 위반 체크
            policies = [
                'pod-security-policy',
                'network-policy-required',
                'resource-limits-required',
                'no-privileged-containers'
            ]

            for policy in policies:
                # Kubernetes API를 통해 정책 위반 확인
                violations = self._check_policy_violations(policy)

                # 준수 상태 설정 (0: 위반, 1: 준수)
                compliance_status = 1 if violations == 0 else 0

                self.compliance_gauge.labels(
                    policy_name=policy,
                    environment='production'
                ).set(compliance_status)

        except Exception as e:
            print(f"Error collecting policy metrics: {e}")

    def _query_security_incidents(self, start_time, end_time):
        """보안 인시던트 쿼리 (예시)"""
        # 실제로는 로그 집계 시스템(ELK, Loki 등)에서 쿼리
        return []

    def _check_policy_violations(self, policy_name):
        """정책 위반 체크 (예시)"""
        # 실제로는 OPA Gatekeeper API 호출
        return 0

# DevSecOps KPI 계산기
class DevSecOpsKPICalculator:
    def __init__(self, metrics_collector):
        self.metrics_collector = metrics_collector

    def calculate_security_debt(self):
        """보안 부채 계산"""
        # 미해결 취약점에 가중치 적용
        weights = {'critical': 10, 'high': 5, 'medium': 2, 'low': 1}

        total_debt = 0
        for severity, weight in weights.items():
            vuln_count = self._get_vulnerability_count(severity)
            total_debt += vuln_count * weight

        return total_debt

    def calculate_mttr_security(self):
        """보안 이슈 평균 해결 시간"""
        # Prometheus 히스토그램에서 평균값 계산
        return self._calculate_histogram_average('security_issue_resolution_time_seconds')

    def calculate_security_coverage(self):
        """보안 스캔 커버리지"""
        total_components = self._get_total_components()
        scanned_components = self._get_scanned_components()

        return (scanned_components / total_components) * 100 if total_components > 0 else 0

    def generate_security_report(self):
        """보안 리포트 생성"""
        return {
            'timestamp': datetime.utcnow().isoformat(),
            'security_debt': self.calculate_security_debt(),
            'mttr_security_hours': self.calculate_mttr_security() / 3600,
            'security_coverage_percent': self.calculate_security_coverage(),
            'critical_vulnerabilities': self._get_vulnerability_count('critical'),
            'policy_compliance_percent': self._get_policy_compliance_rate(),
            'incident_rate_per_day': self._get_incident_rate_per_day()
        }

if __name__ == '__main__':
    collector = SecurityMetricsCollector()
    collector.start_collection()

결론: 지속 가능한 보안 문화 구축

DevSecOps는 2026년 현재 단순한 도구 도입을 넘어 조직의 문화적 변화를 요구하는 방법론으로 자리잡았습니다. 보안을 개발 프로세스의 모든 단계에 내재화함으로써, 더 안전하고 신뢰할 수 있는 소프트웨어를 더 빠르게 배포할 수 있게 되었습니다.

DevSecOps 성공의 핵심 요소:

  1. 자동화: 수동 보안 검토를 자동화된 파이프라인으로 대체
  2. 통합: 개발 도구와 워크플로우에 보안 기능 내장
  3. 교육: 개발자와 운영팀의 보안 역량 강화
  4. 측정: 지속적인 모니터링과 개선을 위한 메트릭 수집

조직 차원의 변화:

  • 개발팀: 보안을 코딩의 일부로 인식
  • 보안팀: 게이트키퍼에서 조력자로 역할 전환
  • 운영팀: 보안 모니터링과 인시던트 대응 자동화
  • 경영진: 보안을 비즈니스 리스크 관리의 핵심으로 인식

DevSecOps의 궁극적인 목표는 보안이 개발 속도를 늦추는 장애물이 아니라, 더 나은 소프트웨어를 만드는 촉진제가 되는 것입니다. 이를 위해서는 기술적 구현만큼이나 조직 문화의 변화가 중요하며, 지속적인 학습과 개선을 통해 성숙한 보안 문화를 구축해야 합니다.

궁금한 점이 있으신가요?

문의사항이 있으시면 언제든지 연락주세요.