OpenSearch 인덱싱 파이프라인

LTR을 위한 검색 인덱스 구축

Searchdoc | luga-indexing-v2

왜 인덱싱이 필요한가?

LTR은 다양한 Feature로 검색 결과를 랭킹

→ Feature를 추출하려면 먼저 문서를 인덱싱해야 함

문서 → Chunk → Feature 추출 → OpenSearch 인덱싱 → LTR 학습/검색

전체 파이프라인 개요

ENTR-2 ENTR-3 ENTR-4 ENTR-5

단계 이름 역할
ENTR-2 Workspace Loader S3에서 대상 문서 목록 작성
ENTR-3 Source Processing PDF 파싱 + source.json + TOC 생성
ENTR-4 Chunking LLM 기반 Context Chunking
ENTR-5 Feature & Indexing Feature 추출 + OpenSearch 인덱싱

AWS 아키텍처

Step Functions 가 전체 워크플로우 오케스트레이션

각 ENTR 단계 = Lambda 함수

Compute

  • AWS Lambda (Python 3.11)
  • SageMaker Endpoint (DP)

AI/ML

  • Bedrock (Claude, Titan)
  • Upstage Document Parse

인프라 구성

Storage

  • S3 - 문서/중간 결과 저장
  • Secrets Manager - 인증 정보

Search

  • OpenSearch - 검색 엔진
  • LTR Plugin 활성화

VPC 내부 배포 | Lambda → OpenSearch 직접 통신

Step Functions 워크플로우

Map과 Iterator의 역할

Map State: 배열의 각 요소를 병렬 처리하는 루프

Iterator: Map 내부에서 각 요소에 대해 실행할 State들

비유: Map = for문, Iterator = for문 안의 코드 블록

Step Functions - Map 상세

"ProcessDocumentTypes": {
  "Type": "Map",                              // Map State 선언
  "ItemsPath": "$.fileIndexes.body.collections",  // 순회할 배열 지정
  "MaxConcurrency": 2,                        // 동시 실행 수 제한
  "Parameters": {                             // 각 반복에 전달할 파라미터
    "workspace.$": "$.workspace",             // 원본 데이터에서 복사
    "file_type.$": "$$.Map.Item.Value.file_type"  // 현재 배열 요소의 값
  },
  "Iterator": {                               // 각 요소에 대해 실행할 State들
    "StartAt": "ProcessDocumentBatch",
    "States": { ... }
  }
}

$$.Map.Item.Value = 현재 순회 중인 배열 요소

Step Functions - 중첩 Map

1단계 Map ProcessDocumentTypes
└ collections 배열 순회 (file_type별)

    2단계 Map ProcessDocumentBatch
    └ files 배열 순회 (개별 문서)

        Iterator
        └ ENTR-3 → ENTR-4 → ENTR-5 순차 실행

MaxConcurrency: 18 → 최대 18개 문서 동시 처리

Step Functions - 전체 흐름 코드

{
  "StartAt": "Entr2",
  "States": {
    "Entr2": {
      "Type": "Task",
      "ResultPath": "$.fileIndexes",   // 결과를 fileIndexes에 저장
      "Next": "ProcessDocumentTypes"
    },
    "ProcessDocumentTypes": {
      "Type": "Map",
      "ItemsPath": "$.fileIndexes.body.collections",  // [{file_type, files}]
      "Iterator": {
        "StartAt": "ProcessDocumentBatch",
        "States": {
          "ProcessDocumentBatch": {
            "Type": "Map",
            "ItemsPath": "$.files",  // [{index: 0}, {index: 1}, ...]
            "MaxConcurrency": 18,
            "Iterator": {
              "StartAt": "Entr3Task",
              "States": {
                "Entr3Task": { "Next": "Entr4Task" },
                "Entr4Task": { "Next": "Entr5Task" },
                "Entr5Task": { "End": true }
              }
            }
          }
        }
      }
    }
  }
}

ENTR-2: Workspace Loader

S3 버킷에서 처리할 문서 목록 생성

Input: workspace, source_prefix
Output: 파일 타입별 문서 목록 (collections)

ENTR-2 처리 흐름

# 1. S3 스토리지 초기화
s3_source_storage = S3Storage(bucket_name=BUCKET_NAME, prefix=source_prefix)

# 2. 워크스페이스 로더 생성
loader = WorkspaceLoader(
    s3_storage=s3_source_storage,
    workspace_repository=workspace_repo,
    source_repository=source_repo
)

# 3. 옵션 설정
options = WorkspaceLoadOptions(
    supported_extensions={'.pdf'},  # PDF만 처리
    scan_subdirectories=True,       # 하위 디렉토리 포함
    calculate_stats=True            # 통계 계산
)

# 4. 워크스페이스 로딩
workspace = loader.load_workspace(workspace_name, options)

ENTR-2 Output 구조

{
  "workspace_name": "project-a",
  "total_files": 50,
  "total_size": 1024000,
  "collections": [
    {
      "file_type": "pdf",
      "file_count": 50,
      "total_size": 1024000,
      "files": [
        { "index": 0 },
        { "index": 1 },
        { "index": 2 }
      ]
    }
  ]
}

collections → Step Functions Map State의 ItemsPath로 사용

ENTR-3: Source Processing

PDF 파싱source.json + TOC 생성

Input: workspace, source_index
Output: source_id, pages, toc_tree

ENTR-3 전체 흐름

1 PDF 파일 로드 (S3 → Lambda /tmp)

2 Upstage DP PDF 파싱 → 텍스트 추출

3 Claude TOC 추출 → toc_list 생성

4 Claude TOC 분석 → toc_tree 생성

5 source.json 저장 (S3)

Step 1-2: Upstage Document Parse

PDF → 구조화된 텍스트 추출

def request_dp(self, file_path: Path, endpoint_name: str = None) -> dict:
    # SageMaker Endpoint 호출 (Upstage Document Parse)
    m = MultipartEncoder(fields={
        'document': (file_path.name, open(file_path, 'rb')),
        'model': 'document-parse',
        'ocr': 'auto',              # OCR 자동 적용
        'coordinates': 'true',       # 좌표 정보 포함
        'output_formats': '["text"]'
    })

    response = self.runtime_sm_client.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType=m.content_type,
        Body=m.to_string()
    )
    return json.loads(response["Body"].read())

출력: 페이지별 텍스트 + heading1/paragraph/table 구분

Step 3: TOC List 생성

heading1 요소에서 목차 후보 추출

def analyze_pages(self, file_path, params):
    toc_list = []
    current_heading = None

    for page in extracted_pages:
        for element in page["elements"]:
            text = element["content"]["text"]
            category = element["category"]  # heading1, paragraph, table...

            # heading1이고 TOC 패턴에 맞으면 새 섹션 시작
            if category == "heading1" and self.check_toc_header(text):
                if current_heading:
                    toc_list.append({
                        "title": current_heading["title"],
                        "page_no": current_heading["page_no"],
                        "body": current_body  # 해당 섹션의 본문
                    })
                current_heading = {"title": text, "page_no": page_no}
                current_body = ""
            else:
                current_body += text + "\n"  # 본문에 추가

Step 4: TOC Tree 생성 (Bedrock LLM)

Claude Sonnet로 계층 구조 분석

class TocExtractor(FeatureExtractor):
    def __init__(self, model_id="anthropic.claude-3-5-sonnet-20240620-v1:0"):
        self.client = boto3.client('bedrock-runtime')

    def _extract(self, text: str) -> str:
        prompt = f"""
# MISSION
- Act as a Table Of Contents(TOC) extractor
- Create a TOC Tree from the input

# RULES
- Use '#' to represent depth (max 6)
- Identifier format: 'id.id.id.' always ending with '.'
- Output markdown formatted TOC

# INPUT
{text}

# OUTPUT FORMAT

# 1. Section Title
## 1.1. Subsection
### 1.1.1. Sub-subsection

"""
        response = self.client.invoke_model(
            modelId=self.model_id,
            body=json.dumps({"messages": [{"role": "user", "content": prompt}]})
        )

source.json 구조

{
  "source_id": "3f30285548c6668704245fc8d974da21...",
  "name": "Technical_Specification.pdf",
  "category": "Volume 4 - Technical Reference",
  "total_pages": 156,
  "pages": {
    "1": { "page_no": 1, "text": "..." },
    "2": { "page_no": 2, "text": "..." }
  },
  "toc_list": [
    { "title": "1. Introduction", "page_no": 1, "body": "..." },
    { "title": "2. Scope", "page_no": 5, "body": "..." }
  ],
  "toc_tree": {
    "identifier": "root",
    "children": [
      {
        "identifier": "1.",
        "title": "Introduction",
        "body": "...",
        "children": [...]
      }
    ]
  },
  "processing_status": "COMPLETED"
}

ENTR-4: Chunking

LLM 기반 Context Chunking - 의미 단위 분할

Input: source_id (ENTR-3 결과)
Output: chunk_id 목록, 총 청크 수

Contextualized Chunking Strategy

Fixed-size가 아닌 LLM 기반 문맥 청킹

class ContextualizedChunkingStrategy(ChunkingStrategy):
    def __init__(self):
        # Claude Sonnet으로 문단 구분
        self.paragraph_extractor = ParagraphExtractor(
            model_id="anthropic.claude-3-5-sonnet-20240620-v1:0"
        )

    def chunk_text(self, text: str) -> List[str]:
        """텍스트를 의미 단위로 분할"""
        if not text:
            return []
        # LLM이 문맥을 파악하여 적절한 단위로 분할
        return self.paragraph_extractor.extract(text)

TOC Tree의 각 노드(섹션) 단위로 청킹 수행

ParagraphExtractor (LLM Prompt)

Claude Sonnet에게 문단 분할 요청

# 입력: 라인별로 분리된 텍스트
context = """
01.1 General Requirements
1The contractor shall...
2All materials must...
...
"""

prompt = f"""
# MISSION
- Act as a Document Analyzer
- Group lines into contextually related paragraphs

# RULES
- Each paragraph must contain enough content to understand a single context
- Related content MUST be kept together in one paragraph
- Include introductory text, structured content, and conclusions together

# OUTPUT FORMAT

    
        0
        1
    
    
        2
        ...
    

"""

왜 큰 Chunk를 만드는가?

일반적인 청킹

  • 고정 크기 (512 tokens)
  • 문맥 파편화
  • 검색 시 context 부족

Context Chunking

  • 의미 단위로 분할
  • 큰 덩어리 유지
  • 완전한 context 보존

핵심: LTR에서 Feature 추출 시 충분한 문맥 필요
→ "1.1 General Requirements" 전체가 하나의 Chunk
→ 검색 결과로 완전한 섹션 반환 가능

ENTR-4 처리 흐름

def create_chunk_iterator(self, source: Source, checkpoint, checkpoint_manager):
    """TOC Tree를 순회하며 각 노드를 청킹"""
    toc_tree = source.toc_tree

    # 루트 노드 처리
    if toc_tree.get("body"):
        yield from self._process_toc_node_iterator(toc_tree, ...)

    # 자식 노드들 재귀 처리
    if "children" in toc_tree:
        for node in toc_tree["children"]:
            yield from self._process_node_and_children(node, ...)

def _process_toc_node_iterator(self, toc_node, ...):
    """단일 TOC 노드(섹션)를 청킹"""
    body_lines = toc_node.get("body", "").split('\n')

    # LLM 기반 청킹 (ContextualizedChunkingStrategy)
    chunks_buffer = list(self.chunking_strategy.chunk_text(body_lines))

    for chunk_text in chunks_buffer:
        chunk = Chunk.create(
            source_id=source.source_id,
            chunk_no=self._current_chunk_no,
            text=chunk_text,
            metadata=metadata  # 섹션 정보 포함
        )
        yield chunk

Checkpoint Manager

Lambda 타임아웃 대응 - 노드 단위 체크포인트

def _process_node_and_children(self, node, checkpoint, checkpoint_manager, ...):
    node_identifier = self._get_node_id(node)

    # 이미 처리된 노드인지 확인
    if checkpoint.is_node_processed(node_identifier):
        if self._verify_node_processing(source_id, node_identifier):
            return  # 이미 처리됨, 스킵

    # 노드 처리 후 체크포인트 저장
    self._save_node_results(source_id, node_identifier, results)
    checkpoint.processed_nodes[node_identifier] = datetime.now()
    checkpoint_manager.save_checkpoint(checkpoint)

Lambda 15분 제한 → 타임아웃 시 마지막 처리된 노드부터 재개

ENTR-5: Feature & Indexing

Feature 추출 + OpenSearch 인덱싱

Input: source_id, chunks (ENTR-4 결과)
Output: OpenSearch 문서 인덱싱 완료

Feature Registry

각 Chunk에서 추출할 Feature 목록 정의

def create_feature_registry() -> FeatureRegistry:
    registry = FeatureRegistry()

    # Text Features (Claude로 생성)
    registry.register('english_translation', ChunkTextEnExtractor(...))
    registry.register('keywords', KeywordExtractor(...))
    registry.register('chunk_summary', ChunkSummaryExtractor(...))
    registry.register('qr', QRFeatureExtractor(...))

    # Embedding Features (Titan으로 생성)
    registry.register('chunk_embedding', ChunkTextEmbeddingExtractor(...))
    registry.register('qr_1_embedding', QR1EmbeddingFeatureExtractor(...))

    return registry

Feature 예시 1: Keyword Extractor

Claude Haiku로 키워드 추출

class KeywordExtractor(FeatureExtractor):
    def _extract(self, document: Document, feature_repository) -> List[Feature]:
        chunk_text = feature_repository.get_feature(doc_id, "chunk_text_eng")

        prompt = f"""
# MISSION: Extract key information from ITB construction documents

# TARGET ITEMS
* Project Name, Location, Scope of Works, Time for Completion,
* Contract price, Bid Bond, Performance Bond, Technical Items...

# INPUT TEXT
{chunk_text}

# OUTPUT FORMAT

value1, value2, value3, ...

"""
        response = self.client.invoke_model(modelId="claude-3-haiku", ...)
        keywords = self._parse_response(response)  # ["Taiba IPP", "power generation", ...]

        return [Feature.create(doc_id=document.doc_id, name="keywords", value=keywords)]

Feature 예시 2: Chunk Embedding

Titan v2로 벡터 임베딩

class ChunkTextEmbeddingExtractor(FeatureExtractor):
    def __init__(self, model_id="amazon.titan-embed-text-v2:0"):
        self.client = boto3.client('bedrock-runtime')

    def _extract(self, document: Document, feature_repository) -> List[Feature]:
        # 영어 번역된 텍스트로 임베딩 생성
        chunk_text_eng = feature_repository.get_feature(
            doc_id=document.doc_id,
            feature_name="chunk_text_eng"
        )

        # Titan Embedding 호출
        embedding = self.get_embedding(chunk_text_eng.value)  # 1024 dimension

        return [Feature.create(
            doc_id=document.doc_id,
            name="chunk_embedding",
            value=embedding  # [0.123, -0.456, ...] (1024 floats)
        )]

이 패턴으로 qr_embedding, summary_embedding 등 확장

LTR Feature와의 연결

인덱싱된 필드 → LTR Featureset에서 사용

인덱싱 필드 LTR Feature
chunk_text txt_en_1 (BM25)
absolute_title txt_en_3 (BM25)
qr txt_en_4 (BM25)
keywords txt_en_5 (BM25)
chunk_embedding emb_en_1 (KNN)
qr_1_embedding emb_en_2 (KNN)

OpenSearch 인덱싱

Feature가 추출된 Document를 OpenSearch에 저장

# OpenSearch 클라이언트 초기화
opensearch_client = OpenSearchClient(
    endpoint=opensearch_secret.get('endpoint'),
    username=opensearch_secret.get('username'),
    password=opensearch_secret.get('password')
)

opensearch_repository = OpenSearchRepository(
    client=opensearch_client,
    config=OpenSearchConfig(
        index_name=index_name,
        dimension=1024,  # Titan embedding dimension
        synonyms_path='analyzers/synonyms.txt',
        compound_nouns_path='analyzers/compound_nouns.txt'
    )
)

한국어 동의어/복합명사 Analyzer 설정 포함

전체 데이터 흐름

ENTR-2: S3 스캔 → 문서 목록

ENTR-3: PDF 파싱 + LLM TOC 분석 → source.json (toc_tree)

ENTR-4: LLM 기반 Context Chunking → chunk_id 목록

ENTR-5: Feature 추출 + 인덱싱 → OpenSearch 문서

Step Functions 실행 예시

// Step Functions 입력
{
  "workspace": "project-a",
  "index_name": "itb-project-a-v1",
  "source_prefix": "collection/project-a/",
  "addendum_no": 0
}

// 실행 결과
- 총 문서: 50개
- 총 청크: 2,847개
- 총 Feature: 28,470개 (10 features × 2,847 chunks)
- OpenSearch 인덱싱: 2,847 documents

인덱싱 → LTR 연결

인덱싱 파이프라인 OpenSearch LTR 학습/검색

인덱싱 결과 LTR 활용
chunk_text, keywords, qr 등 Text Match Feature (BM25)
chunk_embedding, qr_embedding 등 KNN Feature (Cosine Similarity)
OpenSearch Index SLTR Query, Featureset, Model

LTR 검색 쿼리 (Rescore)

{
  "query": {
    "bool": {
      "should": [
        { "match": { "chunk_text": "검색어" } },
        { "knn": { "chunk_embedding": { "vector": [...], "k": 10 } } }
      ]
    }
  },
  "rescore": {
    "query": {
      "rescore_query": {
        "sltr": {
          "model": "my-ltr-model",
          "featureset": "my-featureset",
          "params": {
            "query_text": "검색어",
            "query_embedding_str": "[...]"
          }
        }
      }
    }
  }
}

요약

구성요소 기술
오케스트레이션 AWS Step Functions
컴퓨팅 AWS Lambda (Python 3.11)
문서 파싱 Upstage Document Parse (SageMaker)
Feature 추출 Bedrock (Claude, Titan)
검색 엔진 OpenSearch + LTR Plugin
스토리지 S3 (문서, 중간 결과, 체크포인트)

Q&A

감사합니다