Files
ChatbotAI/app/Services/KnowledgeBase/IngestionService.php
2026-05-18 08:56:23 +08:00

439 lines
15 KiB
PHP

<?php
namespace App\Services\KnowledgeBase;
use App\Models\DocumentChunk;
use App\Models\DocumentVersion;
use App\Models\KnowledgeItem;
use App\Models\ProcessingLog;
use App\Services\Document\ChunkingService;
use App\Services\Document\PdfExtractorService;
use App\Services\Ollama\OllamaService;
use App\Services\Qdrant\QdrantService;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Str;
use RuntimeException;
/**
* IngestionService
*
* Menyelaras keseluruhan proses ingestion dokumen:
* Extract → Chunk → Embed → Qdrant Sync
*
* Ini adalah "orchestrator" — ia koordinasi semua service lain.
* Setiap langkah dilog dalam processing_logs untuk monitoring.
*/
class IngestionService
{
public function __construct(
private readonly PdfExtractorService $extractor,
private readonly ChunkingService $chunker,
private readonly OllamaService $ollama,
private readonly QdrantService $qdrant,
) {}
private function normalizeExtractedText(string $text): string
{
$text = str_replace(["\r\n", "\r"], "\n", $text);
// Buang control character pelik kecuali newline dan tab
$text = preg_replace('/[^\P{C}\n\t]+/u', '', $text);
// Tukar multiple whitespace kepada satu space, tapi kekalkan line break asas
$text = preg_replace("/[ \t]+/u", ' ', $text);
$text = preg_replace("/\n{3,}/u", "\n\n", $text);
return trim($text);
}
/**
* Proses penuh satu document version.
* Dipanggil oleh ProcessUploadedDocumentJob.
*
* @throws RuntimeException Jika proses gagal pada mana-mana langkah
*/
public function processDocumentVersion(DocumentVersion $version): void
{
$startTime = microtime(true);
Log::info("Mula proses document version {$version->id}", [
'document_id' => $version->document_id,
'version' => $version->version_number,
]);
// ── Langkah 1: Extract ──────────────────────────────────────────────
$version->updateStatus(DocumentVersion::STATUS_EXTRACTING);
ProcessingLog::record(
DocumentVersion::class,
$version->id,
ProcessingLog::STAGE_EXTRACT,
ProcessingLog::STATUS_STARTED
);
$extraction = $this->extractor->extract(
$version->stored_path,
config('knowledgebase.upload.storage_disk', 'local')
);
if (!$extraction['success']) {
$version->updateStatus(DocumentVersion::STATUS_EXTRACTION_FAILED, $extraction['error']);
ProcessingLog::record(
DocumentVersion::class,
$version->id,
ProcessingLog::STAGE_EXTRACT,
ProcessingLog::STATUS_FAILED,
$extraction['error']
);
throw new RuntimeException(
"Pengekstrakan teks gagal: " . $extraction['error']
);
}
// Kemaskini page count jika dapat
if ($extraction['page_count'] > 0) {
$version->update(['page_count' => $extraction['page_count']]);
}
ProcessingLog::record(
DocumentVersion::class,
$version->id,
ProcessingLog::STAGE_EXTRACT,
ProcessingLog::STATUS_COMPLETED,
null,
['page_count' => $extraction['page_count']]
);
// ── Langkah 2: Chunk ─────────────────────────────────────────────────
$version->updateStatus(DocumentVersion::STATUS_CHUNKING);
ProcessingLog::record(
DocumentVersion::class,
$version->id,
ProcessingLog::STAGE_CHUNK,
ProcessingLog::STATUS_STARTED
);
// Normalize teks sebelum dihantar ke chunker
$normalizedText = $this->normalizeExtractedText($extraction['full_text']);
$chunks = $this->chunker->chunk(
$normalizedText,
$extraction['pages']
);
if (empty($chunks)) {
$version->updateStatus(DocumentVersion::STATUS_FAILED, 'Tiada chunk dihasilkan dari teks.');
ProcessingLog::record(
DocumentVersion::class,
$version->id,
ProcessingLog::STAGE_CHUNK,
ProcessingLog::STATUS_FAILED,
'Tiada chunk dihasilkan'
);
throw new RuntimeException('Tiada chunk dihasilkan dari dokumen.');
}
// Deactivate chunk versi sebelumnya (jika ini bukan versi pertama)
$this->deactivatePreviousChunks($version);
// Simpan chunk baru dalam MySQL
$savedChunks = $this->saveChunks($version, $chunks);
ProcessingLog::record(
DocumentVersion::class,
$version->id,
ProcessingLog::STAGE_CHUNK,
ProcessingLog::STATUS_COMPLETED,
null,
['chunk_count' => count($savedChunks)]
);
// ── Langkah 3: Embed & Qdrant ────────────────────────────────────────
$version->updateStatus(DocumentVersion::STATUS_EMBEDDING);
ProcessingLog::record(
DocumentVersion::class,
$version->id,
ProcessingLog::STAGE_EMBED,
ProcessingLog::STATUS_STARTED
);
$this->embedAndSyncChunks($version, $savedChunks);
// ── Selesai ──────────────────────────────────────────────────────────
$version->updateStatus(DocumentVersion::STATUS_INDEXED);
// Aktifkan dokumen jika ini versi pertama yang berjaya
$document = $version->document;
if ($document->status !== 'active') {
$document->update([
'status' => 'active',
'is_active' => true,
]);
}
$duration = round(microtime(true) - $startTime, 2);
ProcessingLog::record(
DocumentVersion::class,
$version->id,
ProcessingLog::STAGE_COMPLETE,
ProcessingLog::STATUS_COMPLETED,
null,
['duration_seconds' => $duration, 'chunk_count' => count($savedChunks)]
);
Log::info("Dokumen version {$version->id} berjaya diproses dalam {$duration}s", [
'chunk_count' => count($savedChunks),
]);
}
/**
* Embed dan sync satu knowledge item ke Qdrant.
* Dipanggil selepas create/update knowledge item.
*/
public function processKnowledgeItem(KnowledgeItem $item): void
{
$text = $item->getEmbeddableText();
if (empty(trim($text))) {
throw new RuntimeException('Knowledge item tidak mempunyai kandungan untuk di-embed.');
}
// Jika ada qdrant_point_id lama, update
// Jika tiada, jana UUID baru
$pointId = $item->qdrant_point_id ?? (string) Str::uuid();
$vector = $this->ollama->embed($text);
$payload = $this->buildKnowledgeItemPayload($item);
$this->qdrant->ensureCollectionExists();
$this->qdrant->upsertPoint($pointId, $vector, $payload);
$item->markAsEmbedded($pointId);
Log::info("KnowledgeItem {$item->id} berjaya di-embed.", [
'type' => $item->item_type,
'category_id' => $item->category_id,
]);
}
/**
* Deactivate semua chunk dalam Qdrant untuk versi lama.
* Chunk dalam MySQL kekal — hanya is_active di Qdrant dikemaskini.
*/
public function deactivateVersionInQdrant(DocumentVersion $version): void
{
$chunks = $version->chunks()
->whereNotNull('qdrant_point_id')
->where('is_embedded', true)
->get();
if ($chunks->isEmpty()) {
return;
}
$pointIds = $chunks->pluck('qdrant_point_id')->toArray();
$this->qdrant->updatePayloadBatch($pointIds, [
'is_active' => false,
'status' => 'inactive',
]);
// Kemaskini MySQL juga
$version->chunks()->update(['is_active' => false]);
}
/**
* Deactivate knowledge item dalam Qdrant.
*/
public function deactivateKnowledgeItemInQdrant(KnowledgeItem $item): void
{
if ($item->qdrant_point_id) {
$this->qdrant->updatePayload($item->qdrant_point_id, [
'is_active' => false,
'status' => 'inactive',
]);
}
}
// =========================================================================
// PRIVATE HELPERS
// =========================================================================
/**
* Deactivate chunk dari versi sebelumnya.
*/
private function deactivatePreviousChunks(DocumentVersion $currentVersion): void
{
$previousVersions = DocumentVersion::where('document_id', $currentVersion->document_id)
->where('id', '!=', $currentVersion->id)
->where('processing_status', DocumentVersion::STATUS_INDEXED)
->get();
foreach ($previousVersions as $prev) {
$this->deactivateVersionInQdrant($prev);
// Tandakan versi lama bukan current lagi
$prev->update(['is_current' => false]);
}
}
/**
* Simpan semua chunk dalam MySQL.
*
* @return DocumentChunk[]
*/
private function saveChunks(DocumentVersion $version, array $chunks): array
{
$document = $version->document;
return DB::transaction(function () use ($version, $document, $chunks) {
$saved = [];
foreach ($chunks as $chunk) {
$saved[] = DocumentChunk::create([
'document_id' => $document->id,
'document_version_id' => $version->id,
'chunk_index' => $chunk['chunk_index'],
'page_number' => $chunk['page_number'] ?? null,
'content' => $chunk['content'],
'token_count' => $chunk['word_count'] ?? null,
'section_heading' => $chunk['section_heading'] ?? null,
'is_active' => true,
'is_embedded' => false,
]);
}
// Set versi ini sebagai current
$version->update(['is_current' => true]);
return $saved;
});
}
/**
* Jana embedding dan sync semua chunk ke Qdrant.
*/
private function embedAndSyncChunks(DocumentVersion $version, array $chunks): void
{
$document = $version->document;
$category = $document->category;
$this->qdrant->ensureCollectionExists();
$batchSize = 10; // Proses 10 chunk sekali untuk elak timeout Ollama
$chunkBatches = array_chunk($chunks, $batchSize);
foreach ($chunkBatches as $batch) {
$points = [];
foreach ($batch as $chunk) {
try {
// Guna getEmbeddableText() — final_text > cleaned_text > content
// Semasa ingestion pertama, final_text dan cleaned_text adalah null
// jadi ia akan fallback ke content (raw extraction)
$vector = $this->ollama->embed($chunk->getEmbeddableText());
$pointId = (string) Str::uuid();
$points[] = [
'id' => $pointId,
'vector' => $vector,
'payload' => $this->buildChunkPayload($chunk, $version, $document, $category),
];
$chunk->markAsEmbedded($pointId);
} catch (RuntimeException $e) {
Log::error("Gagal embed chunk {$chunk->id}", [
'error' => $e->getMessage(),
]);
throw $e;
}
}
if (!empty($points)) {
$this->qdrant->upsertPoints($points);
}
}
ProcessingLog::record(
DocumentVersion::class,
$version->id,
ProcessingLog::STAGE_QDRANT,
ProcessingLog::STATUS_COMPLETED,
null,
['synced_points' => count($chunks)]
);
}
/**
* Bina Qdrant payload untuk chunk PDF.
* Payload ini yang akan digunakan untuk filter dan display sumber.
*/
private function buildChunkPayload(
DocumentChunk $chunk,
DocumentVersion $version,
$document,
$category
): array {
return [
'knowledge_type' => 'pdf_chunk',
'source_type' => 'pdf',
'category_id' => $category->id,
'category_name' => $category->name,
'category_slug' => $category->slug,
'document_id' => $document->id,
'document_version_id' => $version->id,
'document_chunk_id' => $chunk->id,
'knowledge_item_id' => null,
'title' => $document->title,
'page_number' => $chunk->page_number,
'chunk_index' => $chunk->chunk_index,
'section_heading' => $chunk->section_heading,
'text' => mb_substr($chunk->getEmbeddableText(), 0, 1000),
// Excerpt teks yang di-embed (final_text > cleaned_text > content)
'is_active' => true,
'status' => 'active',
'tags' => $document->tags ?? [],
'effective_date' => $document->effective_date?->toDateString(),
'language' => $document->language,
'created_at' => now()->toIso8601String(),
];
}
/**
* Bina Qdrant payload untuk knowledge item (FAQ, polisi, dll.)
*/
private function buildKnowledgeItemPayload(KnowledgeItem $item): array
{
return [
'knowledge_type' => $item->item_type,
'source_type' => 'manual',
'category_id' => $item->category_id,
'category_name' => $item->category->name,
'category_slug' => $item->category->slug,
'document_id' => null,
'document_version_id' => null,
'document_chunk_id' => null,
'knowledge_item_id' => $item->id,
'title' => $item->title,
'page_number' => null,
'chunk_index' => 0,
'section_heading' => null,
'text' => mb_substr($item->getEmbeddableText(), 0, 1000),
'is_active' => $item->is_active,
'status' => $item->is_active ? 'active' : 'inactive',
'tags' => $item->tags ?? [],
'effective_date' => $item->effective_date?->toDateString(),
'language' => $item->language,
'created_at' => now()->toIso8601String(),
];
}
}