Searchdoc | luga-indexing-v2
LTR은 다양한 Feature로 검색 결과를 랭킹
→ Feature를 추출하려면 먼저 문서를 인덱싱해야 함
문서 → Chunk → Feature 추출 → OpenSearch 인덱싱 → LTR 학습/검색
ENTR-2 → ENTR-3 → ENTR-4 → ENTR-5
| 단계 | 이름 | 역할 |
|---|---|---|
| ENTR-2 | Workspace Loader | S3에서 대상 문서 목록 작성 |
| ENTR-3 | Source Processing | PDF 파싱 + source.json + TOC 생성 |
| ENTR-4 | Chunking | LLM 기반 Context Chunking |
| ENTR-5 | Feature & Indexing | Feature 추출 + OpenSearch 인덱싱 |
Step Functions 가 전체 워크플로우 오케스트레이션
각 ENTR 단계 = Lambda 함수
VPC 내부 배포 | Lambda → OpenSearch 직접 통신
Map State: 배열의 각 요소를 병렬 처리하는 루프
Iterator: Map 내부에서 각 요소에 대해 실행할 State들
비유: Map = for문, Iterator = for문 안의 코드 블록
"ProcessDocumentTypes": {
"Type": "Map", // Map State 선언
"ItemsPath": "$.fileIndexes.body.collections", // 순회할 배열 지정
"MaxConcurrency": 2, // 동시 실행 수 제한
"Parameters": { // 각 반복에 전달할 파라미터
"workspace.$": "$.workspace", // 원본 데이터에서 복사
"file_type.$": "$$.Map.Item.Value.file_type" // 현재 배열 요소의 값
},
"Iterator": { // 각 요소에 대해 실행할 State들
"StartAt": "ProcessDocumentBatch",
"States": { ... }
}
}
$$.Map.Item.Value = 현재 순회 중인 배열 요소
1단계 Map ProcessDocumentTypes
└ collections 배열 순회 (file_type별)
2단계 Map ProcessDocumentBatch
└ files 배열 순회 (개별 문서)
Iterator
└ ENTR-3 → ENTR-4 → ENTR-5 순차 실행
MaxConcurrency: 18 → 최대 18개 문서 동시 처리
{
"StartAt": "Entr2",
"States": {
"Entr2": {
"Type": "Task",
"ResultPath": "$.fileIndexes", // 결과를 fileIndexes에 저장
"Next": "ProcessDocumentTypes"
},
"ProcessDocumentTypes": {
"Type": "Map",
"ItemsPath": "$.fileIndexes.body.collections", // [{file_type, files}]
"Iterator": {
"StartAt": "ProcessDocumentBatch",
"States": {
"ProcessDocumentBatch": {
"Type": "Map",
"ItemsPath": "$.files", // [{index: 0}, {index: 1}, ...]
"MaxConcurrency": 18,
"Iterator": {
"StartAt": "Entr3Task",
"States": {
"Entr3Task": { "Next": "Entr4Task" },
"Entr4Task": { "Next": "Entr5Task" },
"Entr5Task": { "End": true }
}
}
}
}
}
}
}
}
S3 버킷에서 처리할 문서 목록 생성
Input: workspace, source_prefix
Output: 파일 타입별 문서 목록 (collections)
# 1. S3 스토리지 초기화
s3_source_storage = S3Storage(bucket_name=BUCKET_NAME, prefix=source_prefix)
# 2. 워크스페이스 로더 생성
loader = WorkspaceLoader(
s3_storage=s3_source_storage,
workspace_repository=workspace_repo,
source_repository=source_repo
)
# 3. 옵션 설정
options = WorkspaceLoadOptions(
supported_extensions={'.pdf'}, # PDF만 처리
scan_subdirectories=True, # 하위 디렉토리 포함
calculate_stats=True # 통계 계산
)
# 4. 워크스페이스 로딩
workspace = loader.load_workspace(workspace_name, options)
{
"workspace_name": "project-a",
"total_files": 50,
"total_size": 1024000,
"collections": [
{
"file_type": "pdf",
"file_count": 50,
"total_size": 1024000,
"files": [
{ "index": 0 },
{ "index": 1 },
{ "index": 2 }
]
}
]
}
collections → Step Functions Map State의 ItemsPath로 사용
PDF 파싱 → source.json + TOC 생성
Input: workspace, source_index
Output: source_id, pages, toc_tree
1 PDF 파일 로드 (S3 → Lambda /tmp)
2 Upstage DP PDF 파싱 → 텍스트 추출
3 Claude TOC 추출 → toc_list 생성
4 Claude TOC 분석 → toc_tree 생성
5 source.json 저장 (S3)
PDF → 구조화된 텍스트 추출
def request_dp(self, file_path: Path, endpoint_name: str = None) -> dict:
# SageMaker Endpoint 호출 (Upstage Document Parse)
m = MultipartEncoder(fields={
'document': (file_path.name, open(file_path, 'rb')),
'model': 'document-parse',
'ocr': 'auto', # OCR 자동 적용
'coordinates': 'true', # 좌표 정보 포함
'output_formats': '["text"]'
})
response = self.runtime_sm_client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType=m.content_type,
Body=m.to_string()
)
return json.loads(response["Body"].read())
출력: 페이지별 텍스트 + heading1/paragraph/table 구분
heading1 요소에서 목차 후보 추출
def analyze_pages(self, file_path, params):
toc_list = []
current_heading = None
for page in extracted_pages:
for element in page["elements"]:
text = element["content"]["text"]
category = element["category"] # heading1, paragraph, table...
# heading1이고 TOC 패턴에 맞으면 새 섹션 시작
if category == "heading1" and self.check_toc_header(text):
if current_heading:
toc_list.append({
"title": current_heading["title"],
"page_no": current_heading["page_no"],
"body": current_body # 해당 섹션의 본문
})
current_heading = {"title": text, "page_no": page_no}
current_body = ""
else:
current_body += text + "\n" # 본문에 추가
Claude Sonnet로 계층 구조 분석
class TocExtractor(FeatureExtractor):
def __init__(self, model_id="anthropic.claude-3-5-sonnet-20240620-v1:0"):
self.client = boto3.client('bedrock-runtime')
def _extract(self, text: str) -> str:
prompt = f"""
# MISSION
- Act as a Table Of Contents(TOC) extractor
- Create a TOC Tree from the input
# RULES
- Use '#' to represent depth (max 6)
- Identifier format: 'id.id.id.' always ending with '.'
- Output markdown formatted TOC
# INPUT
{text}
# OUTPUT FORMAT
# 1. Section Title
## 1.1. Subsection
### 1.1.1. Sub-subsection
"""
response = self.client.invoke_model(
modelId=self.model_id,
body=json.dumps({"messages": [{"role": "user", "content": prompt}]})
)
{
"source_id": "3f30285548c6668704245fc8d974da21...",
"name": "Technical_Specification.pdf",
"category": "Volume 4 - Technical Reference",
"total_pages": 156,
"pages": {
"1": { "page_no": 1, "text": "..." },
"2": { "page_no": 2, "text": "..." }
},
"toc_list": [
{ "title": "1. Introduction", "page_no": 1, "body": "..." },
{ "title": "2. Scope", "page_no": 5, "body": "..." }
],
"toc_tree": {
"identifier": "root",
"children": [
{
"identifier": "1.",
"title": "Introduction",
"body": "...",
"children": [...]
}
]
},
"processing_status": "COMPLETED"
}
LLM 기반 Context Chunking - 의미 단위 분할
Input: source_id (ENTR-3 결과)
Output: chunk_id 목록, 총 청크 수
Fixed-size가 아닌 LLM 기반 문맥 청킹
class ContextualizedChunkingStrategy(ChunkingStrategy):
def __init__(self):
# Claude Sonnet으로 문단 구분
self.paragraph_extractor = ParagraphExtractor(
model_id="anthropic.claude-3-5-sonnet-20240620-v1:0"
)
def chunk_text(self, text: str) -> List[str]:
"""텍스트를 의미 단위로 분할"""
if not text:
return []
# LLM이 문맥을 파악하여 적절한 단위로 분할
return self.paragraph_extractor.extract(text)
TOC Tree의 각 노드(섹션) 단위로 청킹 수행
Claude Sonnet에게 문단 분할 요청
# 입력: 라인별로 분리된 텍스트
context = """
0 1.1 General Requirements
1 The contractor shall...
2 All materials must...
...
"""
prompt = f"""
# MISSION
- Act as a Document Analyzer
- Group lines into contextually related paragraphs
# RULES
- Each paragraph must contain enough content to understand a single context
- Related content MUST be kept together in one paragraph
- Include introductory text, structured content, and conclusions together
# OUTPUT FORMAT
0
1
2
...
"""
핵심: LTR에서 Feature 추출 시 충분한 문맥 필요
→ "1.1 General Requirements" 전체가 하나의 Chunk
→ 검색 결과로 완전한 섹션 반환 가능
def create_chunk_iterator(self, source: Source, checkpoint, checkpoint_manager):
"""TOC Tree를 순회하며 각 노드를 청킹"""
toc_tree = source.toc_tree
# 루트 노드 처리
if toc_tree.get("body"):
yield from self._process_toc_node_iterator(toc_tree, ...)
# 자식 노드들 재귀 처리
if "children" in toc_tree:
for node in toc_tree["children"]:
yield from self._process_node_and_children(node, ...)
def _process_toc_node_iterator(self, toc_node, ...):
"""단일 TOC 노드(섹션)를 청킹"""
body_lines = toc_node.get("body", "").split('\n')
# LLM 기반 청킹 (ContextualizedChunkingStrategy)
chunks_buffer = list(self.chunking_strategy.chunk_text(body_lines))
for chunk_text in chunks_buffer:
chunk = Chunk.create(
source_id=source.source_id,
chunk_no=self._current_chunk_no,
text=chunk_text,
metadata=metadata # 섹션 정보 포함
)
yield chunk
Lambda 타임아웃 대응 - 노드 단위 체크포인트
def _process_node_and_children(self, node, checkpoint, checkpoint_manager, ...):
node_identifier = self._get_node_id(node)
# 이미 처리된 노드인지 확인
if checkpoint.is_node_processed(node_identifier):
if self._verify_node_processing(source_id, node_identifier):
return # 이미 처리됨, 스킵
# 노드 처리 후 체크포인트 저장
self._save_node_results(source_id, node_identifier, results)
checkpoint.processed_nodes[node_identifier] = datetime.now()
checkpoint_manager.save_checkpoint(checkpoint)
Lambda 15분 제한 → 타임아웃 시 마지막 처리된 노드부터 재개
Feature 추출 + OpenSearch 인덱싱
Input: source_id, chunks (ENTR-4 결과)
Output: OpenSearch 문서 인덱싱 완료
각 Chunk에서 추출할 Feature 목록 정의
def create_feature_registry() -> FeatureRegistry:
registry = FeatureRegistry()
# Text Features (Claude로 생성)
registry.register('english_translation', ChunkTextEnExtractor(...))
registry.register('keywords', KeywordExtractor(...))
registry.register('chunk_summary', ChunkSummaryExtractor(...))
registry.register('qr', QRFeatureExtractor(...))
# Embedding Features (Titan으로 생성)
registry.register('chunk_embedding', ChunkTextEmbeddingExtractor(...))
registry.register('qr_1_embedding', QR1EmbeddingFeatureExtractor(...))
return registry
Claude Haiku로 키워드 추출
class KeywordExtractor(FeatureExtractor):
def _extract(self, document: Document, feature_repository) -> List[Feature]:
chunk_text = feature_repository.get_feature(doc_id, "chunk_text_eng")
prompt = f"""
# MISSION: Extract key information from ITB construction documents
# TARGET ITEMS
* Project Name, Location, Scope of Works, Time for Completion,
* Contract price, Bid Bond, Performance Bond, Technical Items...
# INPUT TEXT
{chunk_text}
# OUTPUT FORMAT
value1, value2, value3, ...
"""
response = self.client.invoke_model(modelId="claude-3-haiku", ...)
keywords = self._parse_response(response) # ["Taiba IPP", "power generation", ...]
return [Feature.create(doc_id=document.doc_id, name="keywords", value=keywords)]
Titan v2로 벡터 임베딩
class ChunkTextEmbeddingExtractor(FeatureExtractor):
def __init__(self, model_id="amazon.titan-embed-text-v2:0"):
self.client = boto3.client('bedrock-runtime')
def _extract(self, document: Document, feature_repository) -> List[Feature]:
# 영어 번역된 텍스트로 임베딩 생성
chunk_text_eng = feature_repository.get_feature(
doc_id=document.doc_id,
feature_name="chunk_text_eng"
)
# Titan Embedding 호출
embedding = self.get_embedding(chunk_text_eng.value) # 1024 dimension
return [Feature.create(
doc_id=document.doc_id,
name="chunk_embedding",
value=embedding # [0.123, -0.456, ...] (1024 floats)
)]
이 패턴으로 qr_embedding, summary_embedding 등 확장
인덱싱된 필드 → LTR Featureset에서 사용
| 인덱싱 필드 | → | LTR Feature |
|---|---|---|
| chunk_text | → | txt_en_1 (BM25) |
| absolute_title | → | txt_en_3 (BM25) |
| qr | → | txt_en_4 (BM25) |
| keywords | → | txt_en_5 (BM25) |
| chunk_embedding | → | emb_en_1 (KNN) |
| qr_1_embedding | → | emb_en_2 (KNN) |
Feature가 추출된 Document를 OpenSearch에 저장
# OpenSearch 클라이언트 초기화
opensearch_client = OpenSearchClient(
endpoint=opensearch_secret.get('endpoint'),
username=opensearch_secret.get('username'),
password=opensearch_secret.get('password')
)
opensearch_repository = OpenSearchRepository(
client=opensearch_client,
config=OpenSearchConfig(
index_name=index_name,
dimension=1024, # Titan embedding dimension
synonyms_path='analyzers/synonyms.txt',
compound_nouns_path='analyzers/compound_nouns.txt'
)
)
한국어 동의어/복합명사 Analyzer 설정 포함
ENTR-2: S3 스캔 → 문서 목록
ENTR-3: PDF 파싱 + LLM TOC 분석 → source.json (toc_tree)
ENTR-4: LLM 기반 Context Chunking → chunk_id 목록
ENTR-5: Feature 추출 + 인덱싱 → OpenSearch 문서
// Step Functions 입력
{
"workspace": "project-a",
"index_name": "itb-project-a-v1",
"source_prefix": "collection/project-a/",
"addendum_no": 0
}
// 실행 결과
- 총 문서: 50개
- 총 청크: 2,847개
- 총 Feature: 28,470개 (10 features × 2,847 chunks)
- OpenSearch 인덱싱: 2,847 documents
인덱싱 파이프라인 → OpenSearch → LTR 학습/검색
| 인덱싱 결과 | LTR 활용 |
|---|---|
| chunk_text, keywords, qr 등 | Text Match Feature (BM25) |
| chunk_embedding, qr_embedding 등 | KNN Feature (Cosine Similarity) |
| OpenSearch Index | SLTR Query, Featureset, Model |
{
"query": {
"bool": {
"should": [
{ "match": { "chunk_text": "검색어" } },
{ "knn": { "chunk_embedding": { "vector": [...], "k": 10 } } }
]
}
},
"rescore": {
"query": {
"rescore_query": {
"sltr": {
"model": "my-ltr-model",
"featureset": "my-featureset",
"params": {
"query_text": "검색어",
"query_embedding_str": "[...]"
}
}
}
}
}
}
| 구성요소 | 기술 |
|---|---|
| 오케스트레이션 | AWS Step Functions |
| 컴퓨팅 | AWS Lambda (Python 3.11) |
| 문서 파싱 | Upstage Document Parse (SageMaker) |
| Feature 추출 | Bedrock (Claude, Titan) |
| 검색 엔진 | OpenSearch + LTR Plugin |
| 스토리지 | S3 (문서, 중간 결과, 체크포인트) |
감사합니다