Build

Python SDK

Full API reference: sync and async clients, all operations, filters, error handling, TLS, retries, and circuit breakers.

Installation

terminal
pip install kyrodb

Optional extras for development, protobuf regeneration, or numpy-accelerated validation:

terminal
pip install "kyrodb[numpy]"   # faster vector validation
pip install "kyrodb[dev]"     # lint, type, and test tooling

Quick Start

Synchronous Client

quickstart.py
import os
from kyrodb import KyroDBClient

with KyroDBClient(
    target=os.environ["KYRODB_GRPC_ENDPOINT"],
    api_key=os.environ["KYRODB_API_KEY"],
    timeout_s=10.0,
) as client:
    client.wait_for_ready(timeout_s=5.0)

    # Insert a vector
    client.insert(
        doc_id=1,
        embedding=[0.1, 0.2, 0.3, 0.4],
        metadata={"source": "quickstart"},
        namespace="default",
    )

    # Search
    result = client.search(
        query_embedding=[0.1, 0.2, 0.3, 0.4],
        k=10,
        namespace="default",
    )
    print(f"Found {result.total_found} results")

Asynchronous Client

async_quickstart.py
import asyncio, os
from kyrodb import AsyncKyroDBClient

async def main():
    async with AsyncKyroDBClient(
        target=os.environ["KYRODB_GRPC_ENDPOINT"],
        api_key=os.environ["KYRODB_API_KEY"],
    ) as client:
        await client.wait_for_ready(timeout_s=5.0)

        await client.insert(
            doc_id=1,
            embedding=[0.1, 0.2, 0.3, 0.4],
            metadata={"source": "async-example"},
            namespace="default",
        )

        result = await client.search(
            query_embedding=[0.1, 0.2, 0.3, 0.4],
            k=10,
            namespace="default",
        )
        print(result.total_found)

asyncio.run(main())

Client Configuration

ParameterDefaultDescription
target127.0.0.1:50051gRPC endpoint address
api_keyNoneAPI key for authentication
tlsNoneTLS configuration (required for non-loopback targets)
default_namespace""Default namespace for all operations
default_timeout_s30.0Default per-call timeout in seconds
retry_policyRetryPolicy()Retry configuration for transient failures
circuit_breaker_policyNoneCircuit breaker for sustained failures

Operations

Insert

Insert a single vector. doc_id must be a positive integer.

insert.py
ack = client.insert(
    doc_id=42,
    embedding=[0.1, 0.2, 0.3, 0.4],
    metadata={"category": "product", "name": "Widget A"},
    namespace="products",
)

Bulk Insert

Stream-insert multiple vectors in a single call.

bulk_insert.py
from kyrodb import InsertItem

items = [
    InsertItem(doc_id=i, embedding=[0.0] * 768, metadata={"idx": str(i)})
    for i in range(1, 1001)
]

ack = client.bulk_insert(items)

Search

Run k-NN similarity search. k must be between 1 and 1000. Use min_score to filter low-confidence results and ef_search to trade recall for speed.

search.py
result = client.search(
    query_embedding=[0.1, 0.2, 0.3, 0.4],
    k=10,
    min_score=0.7,
    namespace="products",
    include_embeddings=True,
    ef_search=200,
)

for hit in result.hits:
    print(f"id={hit.doc_id} score={hit.score:.4f} metadata={hit.metadata}")

Query by ID

Fetch a single vector by its document ID.

query.py
result = client.query(
    doc_id=42,
    include_embedding=True,
    namespace="products",
)
print(result.metadata)

Bulk Query

Fetch multiple vectors by ID in a single request.

bulk_query.py
result = client.bulk_query(
    doc_ids=[1, 2, 3, 4, 5],
    include_embeddings=False,
    namespace="products",
)
print(f"Found {result.total_found} of {result.total_requested}")

Update Metadata

Update metadata on an existing vector. Set merge=True (default) to merge with existing metadata, or merge=False to replace.

update_metadata.py
client.update_metadata(
    doc_id=42,
    metadata={"category": "premium", "updated": "true"},
    merge=True,
    namespace="products",
)

Delete

Delete a single vector by ID.

delete.py
client.delete(doc_id=42, namespace="products")

Batch Delete

Delete multiple vectors by ID in a single call.

batch_delete.py
result = client.batch_delete_ids(
    doc_ids=[1, 2, 3, 4, 5],
    namespace="products",
)
print(f"Deleted {result.deleted_count} vectors")

Metadata Filters

Compose filters to narrow search results by metadata. All filter builders are top-level imports from kyrodb.

BuilderDescription
exact(key, value)Exact string match
in_values(key, values)Match any of the given values
range_match(key, gte=, lte=, gt=, lt=)Range comparison on string values
all_of(filters)Logical AND of multiple filters
any_of(filters)Logical OR of multiple filters
negate(filter)Logical NOT
filters.py
from kyrodb import all_of, exact, in_values, negate

# Match products in the "hardware" category AND either "pro" or "enterprise" tier
filt = all_of([
    exact("category", "hardware"),
    in_values("tier", ["pro", "enterprise"]),
])

result = client.search(
    query_embedding=[0.1, 0.2, 0.3, 0.4],
    k=10,
    filter=filt,
    namespace="products",
)

# Exclude a specific source
excluded = negate(exact("source", "deprecated"))
result = client.search(
    query_embedding=[0.1, 0.2, 0.3, 0.4],
    k=10,
    filter=excluded,
)

Error Handling

All gRPC failures map to typed exceptions under KyroDBError. Each exception includes the failing operation, code, details, and target.

ExceptiongRPC CodeRetry?
InvalidArgumentErrorINVALID_ARGUMENTNo
AuthenticationErrorUNAUTHENTICATEDNo
PermissionDeniedErrorPERMISSION_DENIEDNo
NotFoundErrorNOT_FOUNDNo
QuotaExceededErrorRESOURCE_EXHAUSTEDWith backoff
DeadlineExceededErrorDEADLINE_EXCEEDEDYes
ServiceUnavailableErrorUNAVAILABLEYes
CircuitOpenErrorUNAVAILABLEAfter cooldown
error_handling.py
from kyrodb import KyroDBError, AuthenticationError, ServiceUnavailableError

try:
    result = client.search(query_embedding=[0.1, 0.2, 0.3, 0.4], k=10)
except AuthenticationError:
    print("Invalid API key — check KYRODB_API_KEY")
except ServiceUnavailableError as e:
    print(f"Service down: {e.details} — retry with backoff")
except KyroDBError as e:
    print(f"{e.operation} failed [{e.code.name}]: {e.details}")

Timeouts & Retries

Every call accepts an optional timeout_s override. Set timeout_s=None for unbounded timeout (not recommended in production). Read operations (search, query, health) are retry-enabled by default. Writes are not retried.

retry_config.py
import os
from kyrodb import KyroDBClient, RetryPolicy, CircuitBreakerPolicy

client = KyroDBClient(
    target=os.environ["KYRODB_GRPC_ENDPOINT"],
    api_key=os.environ["KYRODB_API_KEY"],
    default_timeout_s=15.0,
    retry_policy=RetryPolicy(
        max_retries=3,
        initial_backoff_s=0.1,
        max_backoff_s=2.0,
        backoff_multiplier=2.0,
    ),
    circuit_breaker_policy=CircuitBreakerPolicy(
        failure_threshold=5,
        recovery_timeout_s=30.0,
    ),
)

TLS & Security

TLS is required for all non-loopback targets. The API key is sent as gRPC metadata (x-api-key). mTLS requires both certificate chain and private key.

tls_config.py
import os
from kyrodb import KyroDBClient, TLSConfig

# Server-verified TLS (most common for managed cloud)
client = KyroDBClient(
    target="your-instance.kyrodb.cloud:443",
    api_key=os.environ["KYRODB_API_KEY"],
    tls=TLSConfig(),  # uses system CA bundle
)

# mTLS with custom certificates
with open("ca.pem", "rb") as f:
    ca = f.read()
with open("client.crt", "rb") as f:
    cert = f.read()
with open("client.key", "rb") as f:
    key = f.read()

client = KyroDBClient(
    target="db.internal:50051",
    api_key=os.environ["KYRODB_API_KEY"],
    tls=TLSConfig(
        root_certificates=ca,
        certificate_chain=cert,
        private_key=key,
    ),
)

API Key Rotation

Rotate keys without restarting the client. Create a new key first, then update the client:

key_rotation.py
# Static rotation
client.set_api_key("kyro_live_new_key_here")

# Dynamic provider (fetches key per call)
client.set_api_key_provider(lambda: fetch_key_from_vault())

Best Practices

  • Reuse one client per process or worker. Do not create a new client per request.
  • Always set explicit timeouts. Infinite waits are bad production hygiene.
  • Load API keys and endpoints from environment variables or a secrets manager.
  • Rotate keys by creating the replacement first, updating clients, then revoking the old key.
  • Use bulk_insert for batch ingestion — it streams via gRPC and avoids per-request overhead.
  • For batch inference workloads, reuse a persistent client and issue multiple search calls instead of reconnecting per query.

Next Steps

  • REST API if you prefer the managed HTTP gateway over direct gRPC.
  • Namespaces for data partitioning within a project.
  • Errors for HTTP error codes and retry guidance.