Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions redisvl/utils/vectorize/voyageai.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@
# ignore that voyageai isn't imported
# mypy: disable-error-code="name-defined"

# Token limits for VoyageAI models (used for token-aware batching)
VOYAGE_TOTAL_TOKEN_LIMITS = {
"voyage-context-3": 32_000,
"voyage-3.5-lite": 1_000_000,
"voyage-3.5": 320_000,
"voyage-2": 320_000,
"voyage-3-large": 120_000,
"voyage-code-3": 120_000,
"voyage-large-2-instruct": 120_000,
"voyage-finance-2": 120_000,
"voyage-multilingual-2": 120_000,
"voyage-law-2": 120_000,
"voyage-large-2": 120_000,
"voyage-3": 120_000,
"voyage-3-lite": 120_000,
"voyage-code-2": 120_000,
"voyage-multimodal-3": 32_000,
"voyage-multimodal-3.5": 32_000,
}


class VoyageAIVectorizer(BaseVectorizer):
"""The VoyageAIVectorizer class utilizes VoyageAI's API to generate
Expand Down Expand Up @@ -87,6 +107,21 @@ class VoyageAIVectorizer(BaseVectorizer):
input_type="query"
)

# Using contextualized embeddings (voyage-context-3)
context_vectorizer = VoyageAIVectorizer(
model="voyage-context-3",
api_config={"api_key": "your-voyageai-api-key"}
)
# Context models automatically use contextualized_embed API
context_embeddings = context_vectorizer.embed_many(
contents=["chunk 1", "chunk 2", "chunk 3"],
input_type="document"
)

# Token counting for API usage management
token_counts = vectorizer.count_tokens(["text one", "text two"])
print(f"Token counts: {token_counts}")

"""

model_config = ConfigDict(arbitrary_types_allowed=True)
Expand Down Expand Up @@ -448,6 +483,80 @@ def _serialize_for_cache(self, content: Any) -> Union[bytes, str]:
return content.to_bytes()
return super()._serialize_for_cache(content)

def _is_context_model(self) -> bool:
"""
Check if the current model is a contextualized embedding model.

Contextualized models (like voyage-context-3) use a different API
endpoint and expect inputs formatted differently.

Returns:
bool: True if the model is a context model, False otherwise.
"""
return "context" in self.model

def count_tokens(self, texts: List[str]) -> List[int]:
"""
Count tokens for the given texts using VoyageAI's tokenization API.

This is useful for managing API usage and optimizing batching strategies.

Args:
texts: List of texts to count tokens for.

Returns:
List[int]: List of token counts for each text.

Raises:
ValueError: If tokenization fails.

Example:
>>> vectorizer = VoyageAIVectorizer(model="voyage-3.5")
>>> token_counts = vectorizer.count_tokens(["Hello world", "Another text"])
>>> print(token_counts) # [2, 2]
"""
if not texts:
return []

try:
token_lists = self._client.tokenize(texts, model=self.model)
return [len(token_list) for token_list in token_lists]
except Exception as e:
raise ValueError(f"Token counting failed: {e}")

async def acount_tokens(self, texts: List[str]) -> List[int]:
"""
Asynchronously count tokens for the given texts using VoyageAI's tokenization API.

This is useful for managing API usage and optimizing batching strategies.

Note: The underlying VoyageAI tokenize API is synchronous, so this method
provides async compatibility but doesn't offer true async performance benefits.

Args:
texts: List of texts to count tokens for.

Returns:
List[int]: List of token counts for each text.

Raises:
ValueError: If tokenization fails.

Example:
>>> vectorizer = VoyageAIVectorizer(model="voyage-3.5")
>>> token_counts = await vectorizer.acount_tokens(["Hello world", "Another text"])
>>> print(token_counts) # [2, 2]
"""
if not texts:
return []

try:
# Note: VoyageAI's tokenize is synchronous even on AsyncClient
token_lists = self._aclient.tokenize(texts, model=self.model)
return [len(token_list) for token_list in token_lists]
except Exception as e:
raise ValueError(f"Token counting failed: {e}")

@property
def type(self) -> str:
return "voyageai"
177 changes: 177 additions & 0 deletions tests/integration/test_vectorizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,180 @@ def test_deprecated_text_parameter_warning():
embeddings = vectorizer.embed_many(texts=TEST_TEXTS)
assert isinstance(embeddings, list)
assert len(embeddings) == len(TEST_TEXTS)


# VoyageAI-specific tests for token counting and context model detection
@pytest.mark.requires_api_keys
def test_voyageai_count_tokens():
"""Test VoyageAI token counting functionality."""
vectorizer = VoyageAIVectorizer(model="voyage-3.5")
texts = ["Hello world", "This is a longer test sentence."]

token_counts = vectorizer.count_tokens(texts)
assert isinstance(token_counts, list)
assert len(token_counts) == len(texts)
assert all(isinstance(count, int) and count > 0 for count in token_counts)

# Empty list should return empty list
assert vectorizer.count_tokens([]) == []


@pytest.mark.requires_api_keys
@pytest.mark.asyncio
async def test_voyageai_acount_tokens():
"""Test VoyageAI async token counting functionality."""
vectorizer = VoyageAIVectorizer(model="voyage-3.5")
texts = ["Hello world", "This is a longer test sentence."]

token_counts = await vectorizer.acount_tokens(texts)
assert isinstance(token_counts, list)
assert len(token_counts) == len(texts)
assert all(isinstance(count, int) and count > 0 for count in token_counts)

# Empty list should return empty list
assert await vectorizer.acount_tokens([]) == []


def test_voyageai_token_limits():
"""Test VoyageAI token limit constants."""
from redisvl.utils.vectorize.voyageai import VOYAGE_TOTAL_TOKEN_LIMITS

# Verify token limits are defined correctly
assert VOYAGE_TOTAL_TOKEN_LIMITS.get("voyage-context-3") == 32_000
assert VOYAGE_TOTAL_TOKEN_LIMITS.get("voyage-3.5-lite") == 1_000_000
assert VOYAGE_TOTAL_TOKEN_LIMITS.get("voyage-3.5") == 320_000
assert VOYAGE_TOTAL_TOKEN_LIMITS.get("voyage-multimodal-3") == 32_000
assert VOYAGE_TOTAL_TOKEN_LIMITS.get("voyage-multimodal-3.5") == 32_000

# Default for unknown models
assert VOYAGE_TOTAL_TOKEN_LIMITS.get("unknown-model", 120_000) == 120_000


def test_voyageai_context_model_detection():
"""Test detection of contextualized embedding models."""
# Test the context model detection logic directly
# The method checks if "context" is in the model name
assert "context" not in "voyage-3.5"
assert "context" in "voyage-context-3"
assert "context" not in "voyage-multimodal-3.5"

# Verify the detection would work correctly for known models
test_cases = [
("voyage-3.5", False),
("voyage-context-3", True),
("voyage-multimodal-3.5", False),
("voyage-3-large", False),
]
for model_name, expected in test_cases:
# The _is_context_model method simply checks: "context" in self.model
assert ("context" in model_name) == expected, f"Failed for {model_name}"


@pytest.mark.requires_api_keys
def test_voyageai_multimodal_text_only():
"""Test VoyageAI multimodal vectorizer with text-only input."""
vectorizer = VoyageAIVectorizer(model="voyage-multimodal-3")

# Test single text embedding via embed()
embedding = vectorizer.embed("A red apple on a wooden table")
assert isinstance(embedding, list)
assert len(embedding) > 0
assert all(isinstance(x, float) for x in embedding)

# Test another text embedding to verify consistency
embedding2 = vectorizer.embed("A cat sleeping on a couch")
assert isinstance(embedding2, list)
assert len(embedding2) == len(embedding)


@pytest.mark.requires_api_keys
def test_voyageai_multimodal_image():
"""Test VoyageAI multimodal vectorizer with image input."""
import os
import tempfile

from PIL import Image

vectorizer = VoyageAIVectorizer(model="voyage-multimodal-3")

# Create a simple test image
img = Image.new("RGB", (100, 100), color="red")
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
img.save(f, format="PNG")
temp_path = f.name

try:
# Test embed_image
embedding = vectorizer.embed_image(temp_path)
assert isinstance(embedding, list)
assert len(embedding) > 0
assert all(isinstance(x, float) for x in embedding)
finally:
os.unlink(temp_path)


@pytest.mark.requires_api_keys
def test_voyageai_multimodal_video():
"""Test VoyageAI multimodal vectorizer with video input."""
import os
import subprocess
import tempfile

from PIL import Image

vectorizer = VoyageAIVectorizer(model="voyage-multimodal-3.5")

# Create a minimal test video using ffmpeg
with tempfile.TemporaryDirectory() as tmpdir:
# Create 3 frames
for i in range(3):
img = Image.new("RGB", (64, 64), color=(i * 80, 100, 150))
img.save(os.path.join(tmpdir, f"frame_{i:03d}.png"))

video_path = os.path.join(tmpdir, "test_video.mp4")

# Create video from frames
result = subprocess.run(
[
"ffmpeg",
"-y",
"-framerate",
"1",
"-i",
os.path.join(tmpdir, "frame_%03d.png"),
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-t",
"3",
video_path,
],
capture_output=True,
)

if result.returncode != 0:
pytest.skip("ffmpeg not available or failed to create test video")

# Test embed_video
embedding = vectorizer.embed_video(video_path)
assert isinstance(embedding, list)
assert len(embedding) > 0
assert all(isinstance(x, float) for x in embedding)


@pytest.mark.requires_api_keys
@pytest.mark.asyncio
async def test_voyageai_multimodal_async():
"""Test VoyageAI multimodal vectorizer async methods."""
vectorizer = VoyageAIVectorizer(model="voyage-multimodal-3")

# Test async text embedding
embedding = await vectorizer.aembed("A beautiful sunset over mountains")
assert isinstance(embedding, list)
assert len(embedding) > 0

# Test async batch
texts = ["Ocean waves", "Forest trees"]
embeddings = await vectorizer.aembed_many(texts)
assert len(embeddings) == 2
Loading