Skip to content

Open In Colab

RAG with Milvus

Step Tech Execution
Embedding OpenAI (text-embedding-3-small) 🌐 Remote
Vector store Milvus πŸ’» Local
Gen AI OpenAI (gpt-4o) 🌐 Remote

A recipe πŸ§‘β€πŸ³ πŸ₯ πŸ’š

This is a code recipe that uses Milvus, the world's most advanced open-source vector database, to perform RAG over documents parsed by Docling.

In this notebook, we accomplish the following: * Parse documents using Docling's document conversion capabilities * Perform hierarchical chunking of the documents using Docling * Generate text embeddings with OpenAI * Perform RAG using Milvus, the world's most advanced open-source vector database

Note: For best results, please use GPU acceleration to run this notebook. Here are two options for running this notebook: 1. Locally on a MacBook with an Apple Silicon chip. Converting all documents in the notebook takes ~2 minutes on a MacBook M2 due to Docling's usage of MPS accelerators. 2. Run this notebook on Google Colab. Converting all documents in the notebook takes ~8 minutes on a Google Colab T4 GPU.

Preparation

Dependencies and Environment

To start, install the required dependencies by running the following command:

! pip install --upgrade "pymilvus[milvus-lite]" docling openai torch

If you are using Google Colab, to enable dependencies just installed, you may need to restart the runtime (click on the "Runtime" menu at the top of the screen, and select "Restart session" from the dropdown menu).

GPU Checking

Part of what makes Docling so remarkable is the fact that it can run on commodity hardware. This means that this notebook can be run on a local machine with GPU acceleration. If you're using a MacBook with a silicon chip, Docling integrates seamlessly with Metal Performance Shaders (MPS). MPS provides out-of-the-box GPU acceleration for macOS, seamlessly integrating with PyTorch and TensorFlow, offering energy-efficient performance on Apple Silicon, and broad compatibility with all Metal-supported GPUs.

The code below checks to see if a GPU is available, either via CUDA or MPS.

import torch

# Check if GPU or MPS is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"CUDA GPU is enabled: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    print("MPS GPU is enabled.")
else:
    raise OSError(
        "No GPU or MPS device found. Please check your environment and ensure GPU or MPS support is configured."
    )
MPS GPU is enabled.

Setting Up API Keys

We will use OpenAI as the LLM in this example. You should prepare the OPENAI_API_KEY as an environment variable.

import os

os.environ["OPENAI_API_KEY"] = "sk-***********"

Prepare the LLM and Embedding Model

We initialize the OpenAI client to prepare the embedding model.

from openai import OpenAI

openai_client = OpenAI()

Define a function to generate text embeddings using OpenAI client. We use the text-embedding-3-small model as an example.

def emb_text(text):
    return (
        openai_client.embeddings.create(input=text, model="text-embedding-3-small")
        .data[0]
        .embedding
    )

Generate a test embedding and print its dimension and first few elements.

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])
1536
[0.009889289736747742, -0.005578675772994757, 0.00683477520942688, -0.03805781528353691, -0.01824733428657055, -0.04121600463986397, -0.007636285852640867, 0.03225184231996536, 0.018949154764413834, 9.352207416668534e-05]

Process Data Using Docling

Docling can parse various document formats into a unified representation (Docling Document), which can then be exported to different output formats. For a full list of supported input and output formats, please refer to the official documentation.

In this tutorial, we will use a Markdown file (source) as the input. We will process the document using a HierarchicalChunker provided by Docling to generate structured, hierarchical chunks suitable for downstream RAG tasks.

from docling_core.transforms.chunker import HierarchicalChunker

from docling.document_converter import DocumentConverter

converter = DocumentConverter()
chunker = HierarchicalChunker()

# Convert the input file to Docling Document
source = "https://milvus.io/docs/overview.md"
doc = converter.convert(source).document

# Perform hierarchical chunking
texts = [chunk.text for chunk in chunker.chunk(doc)]

Load Data into Milvus

Create the collection

With data in hand, we can create a MilvusClient instance and insert the data into a Milvus collection.

from pymilvus import MilvusClient

milvus_client = MilvusClient(uri="./milvus_demo.db")
collection_name = "my_rag_collection"

As for the argument of MilvusClient: - Setting the uri as a local file, e.g../milvus.db, is the most convenient method, as it automatically utilizes Milvus Lite to store all data in this file. - If you have large scale of data, you can set up a more performant Milvus server on docker or kubernetes. In this setup, please use the server uri, e.g.http://localhost:19530, as your uri. - If you want to use Zilliz Cloud, the fully managed cloud service for Milvus, adjust the uri and token, which correspond to the Public Endpoint and Api key in Zilliz Cloud.

Check if the collection already exists and drop it if it does.

if milvus_client.has_collection(collection_name):
    milvus_client.drop_collection(collection_name)

Create a new collection with specified parameters.

If we don’t specify any field information, Milvus will automatically create a default id field for primary key, and a vector field to store the vector data. A reserved JSON field is used to store non-schema-defined fields and their values.

milvus_client.create_collection(
    collection_name=collection_name,
    dimension=embedding_dim,
    metric_type="IP",  # Inner product distance
    consistency_level="Strong",  # Supported values are (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`). See https://milvus.io/docs/consistency.md#Consistency-Level for more details.
)

Insert data

from tqdm import tqdm

data = []

for i, chunk in enumerate(tqdm(texts, desc="Processing chunks")):
    embedding = emb_text(chunk)
    data.append({"id": i, "vector": embedding, "text": chunk})

milvus_client.insert(collection_name=collection_name, data=data)
Processing chunks: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 38/38 [00:14<00:00,  2.59it/s]





{'insert_count': 38, 'ids': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37], 'cost': 0}

Build RAG

Retrieve data for a query

Let’s specify a query question about the website we just scraped.

question = (
    "What are the three deployment modes of Milvus, and what are their differences?"
)

Search for the question in the collection and retrieve the semantic top-3 matches.

search_res = milvus_client.search(
    collection_name=collection_name,
    data=[emb_text(question)],
    limit=3,
    search_params={"metric_type": "IP", "params": {}},
    output_fields=["text"],
)

Let’s take a look at the search results of the query

import json

retrieved_lines_with_distances = [
    (res["entity"]["text"], res["distance"]) for res in search_res[0]
]
print(json.dumps(retrieved_lines_with_distances, indent=4))
[
    [
        "Milvus offers three deployment modes, covering a wide range of data scales\u2014from local prototyping in Jupyter Notebooks to massive Kubernetes clusters managing tens of billions of vectors:",
        0.6503315567970276
    ],
    [
        "Milvus Lite is a Python library that can be easily integrated into your applications. As a lightweight version of Milvus, it\u2019s ideal for quick prototyping in Jupyter Notebooks or running on edge devices with limited resources. Learn more.\nMilvus Standalone is a single-machine server deployment, with all components bundled into a single Docker image for convenient deployment. Learn more.\nMilvus Distributed can be deployed on Kubernetes clusters, featuring a cloud-native architecture designed for billion-scale or even larger scenarios. This architecture ensures redundancy in critical components. Learn more.",
        0.6281915903091431
    ],
    [
        "What is Milvus?\nUnstructured Data, Embeddings, and Milvus\nWhat Makes Milvus so Fast\uff1f\nWhat Makes Milvus so Scalable\nTypes of Searches Supported by Milvus\nComprehensive Feature Set",
        0.6117826700210571
    ]
]

Use LLM to get a RAG response

Convert the retrieved documents into a string format.

context = "\n".join(
    [line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)

Define system and user prompts for the Lanage Model. This prompt is assembled with the retrieved documents from Milvus.

SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""
USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""

Use OpenAI ChatGPT to generate a response based on the prompts.

response = openai_client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
)
print(response.choices[0].message.content)
The three deployment modes of Milvus are:

1. **Milvus Lite**: This is a Python library that integrates easily into your applications. It's a lightweight version ideal for quick prototyping in Jupyter Notebooks or for running on edge devices with limited resources.

2. **Milvus Standalone**: This mode is a single-machine server deployment where all components are bundled into a single Docker image, making it convenient to deploy.

3. **Milvus Distributed**: This mode is designed for deployment on Kubernetes clusters. It features a cloud-native architecture suited for managing scenarios at a billion-scale or larger, ensuring redundancy in critical components.