Build
Python SDK
Full API reference: sync and async clients, all operations, filters, error handling, TLS, retries, and circuit breakers.
Installation
pip install kyrodbOptional extras for development, protobuf regeneration, or numpy-accelerated validation:
pip install "kyrodb[numpy]" # faster vector validation
pip install "kyrodb[dev]" # lint, type, and test toolingQuick Start
Synchronous Client
import os
from kyrodb import KyroDBClient
with KyroDBClient(
target=os.environ["KYRODB_GRPC_ENDPOINT"],
api_key=os.environ["KYRODB_API_KEY"],
timeout_s=10.0,
) as client:
client.wait_for_ready(timeout_s=5.0)
# Insert a vector
client.insert(
doc_id=1,
embedding=[0.1, 0.2, 0.3, 0.4],
metadata={"source": "quickstart"},
namespace="default",
)
# Search
result = client.search(
query_embedding=[0.1, 0.2, 0.3, 0.4],
k=10,
namespace="default",
)
print(f"Found {result.total_found} results")Asynchronous Client
import asyncio, os
from kyrodb import AsyncKyroDBClient
async def main():
async with AsyncKyroDBClient(
target=os.environ["KYRODB_GRPC_ENDPOINT"],
api_key=os.environ["KYRODB_API_KEY"],
) as client:
await client.wait_for_ready(timeout_s=5.0)
await client.insert(
doc_id=1,
embedding=[0.1, 0.2, 0.3, 0.4],
metadata={"source": "async-example"},
namespace="default",
)
result = await client.search(
query_embedding=[0.1, 0.2, 0.3, 0.4],
k=10,
namespace="default",
)
print(result.total_found)
asyncio.run(main())Client Configuration
| Parameter | Default | Description |
|---|---|---|
target | 127.0.0.1:50051 | gRPC endpoint address |
api_key | None | API key for authentication |
tls | None | TLS configuration (required for non-loopback targets) |
default_namespace | "" | Default namespace for all operations |
default_timeout_s | 30.0 | Default per-call timeout in seconds |
retry_policy | RetryPolicy() | Retry configuration for transient failures |
circuit_breaker_policy | None | Circuit breaker for sustained failures |
Operations
Insert
Insert a single vector. doc_id must be a positive integer.
ack = client.insert(
doc_id=42,
embedding=[0.1, 0.2, 0.3, 0.4],
metadata={"category": "product", "name": "Widget A"},
namespace="products",
)Bulk Insert
Stream-insert multiple vectors in a single call.
from kyrodb import InsertItem
items = [
InsertItem(doc_id=i, embedding=[0.0] * 768, metadata={"idx": str(i)})
for i in range(1, 1001)
]
ack = client.bulk_insert(items)Search
Run k-NN similarity search. k must be between 1 and 1000. Use min_score to filter low-confidence results and ef_search to trade recall for speed.
result = client.search(
query_embedding=[0.1, 0.2, 0.3, 0.4],
k=10,
min_score=0.7,
namespace="products",
include_embeddings=True,
ef_search=200,
)
for hit in result.hits:
print(f"id={hit.doc_id} score={hit.score:.4f} metadata={hit.metadata}")Query by ID
Fetch a single vector by its document ID.
result = client.query(
doc_id=42,
include_embedding=True,
namespace="products",
)
print(result.metadata)Bulk Query
Fetch multiple vectors by ID in a single request.
result = client.bulk_query(
doc_ids=[1, 2, 3, 4, 5],
include_embeddings=False,
namespace="products",
)
print(f"Found {result.total_found} of {result.total_requested}")Update Metadata
Update metadata on an existing vector. Set merge=True (default) to merge with existing metadata, or merge=False to replace.
client.update_metadata(
doc_id=42,
metadata={"category": "premium", "updated": "true"},
merge=True,
namespace="products",
)Delete
Delete a single vector by ID.
client.delete(doc_id=42, namespace="products")Batch Delete
Delete multiple vectors by ID in a single call.
result = client.batch_delete_ids(
doc_ids=[1, 2, 3, 4, 5],
namespace="products",
)
print(f"Deleted {result.deleted_count} vectors")Metadata Filters
Compose filters to narrow search results by metadata. All filter builders are top-level imports from kyrodb.
| Builder | Description |
|---|---|
exact(key, value) | Exact string match |
in_values(key, values) | Match any of the given values |
range_match(key, gte=, lte=, gt=, lt=) | Range comparison on string values |
all_of(filters) | Logical AND of multiple filters |
any_of(filters) | Logical OR of multiple filters |
negate(filter) | Logical NOT |
from kyrodb import all_of, exact, in_values, negate
# Match products in the "hardware" category AND either "pro" or "enterprise" tier
filt = all_of([
exact("category", "hardware"),
in_values("tier", ["pro", "enterprise"]),
])
result = client.search(
query_embedding=[0.1, 0.2, 0.3, 0.4],
k=10,
filter=filt,
namespace="products",
)
# Exclude a specific source
excluded = negate(exact("source", "deprecated"))
result = client.search(
query_embedding=[0.1, 0.2, 0.3, 0.4],
k=10,
filter=excluded,
)Error Handling
All gRPC failures map to typed exceptions under KyroDBError. Each exception includes the failing operation, code, details, and target.
| Exception | gRPC Code | Retry? |
|---|---|---|
InvalidArgumentError | INVALID_ARGUMENT | No |
AuthenticationError | UNAUTHENTICATED | No |
PermissionDeniedError | PERMISSION_DENIED | No |
NotFoundError | NOT_FOUND | No |
QuotaExceededError | RESOURCE_EXHAUSTED | With backoff |
DeadlineExceededError | DEADLINE_EXCEEDED | Yes |
ServiceUnavailableError | UNAVAILABLE | Yes |
CircuitOpenError | UNAVAILABLE | After cooldown |
from kyrodb import KyroDBError, AuthenticationError, ServiceUnavailableError
try:
result = client.search(query_embedding=[0.1, 0.2, 0.3, 0.4], k=10)
except AuthenticationError:
print("Invalid API key — check KYRODB_API_KEY")
except ServiceUnavailableError as e:
print(f"Service down: {e.details} — retry with backoff")
except KyroDBError as e:
print(f"{e.operation} failed [{e.code.name}]: {e.details}")Timeouts & Retries
Every call accepts an optional timeout_s override. Set timeout_s=None for unbounded timeout (not recommended in production). Read operations (search, query, health) are retry-enabled by default. Writes are not retried.
import os
from kyrodb import KyroDBClient, RetryPolicy, CircuitBreakerPolicy
client = KyroDBClient(
target=os.environ["KYRODB_GRPC_ENDPOINT"],
api_key=os.environ["KYRODB_API_KEY"],
default_timeout_s=15.0,
retry_policy=RetryPolicy(
max_retries=3,
initial_backoff_s=0.1,
max_backoff_s=2.0,
backoff_multiplier=2.0,
),
circuit_breaker_policy=CircuitBreakerPolicy(
failure_threshold=5,
recovery_timeout_s=30.0,
),
)TLS & Security
TLS is required for all non-loopback targets. The API key is sent as gRPC metadata (x-api-key). mTLS requires both certificate chain and private key.
import os
from kyrodb import KyroDBClient, TLSConfig
# Server-verified TLS (most common for managed cloud)
client = KyroDBClient(
target="your-instance.kyrodb.cloud:443",
api_key=os.environ["KYRODB_API_KEY"],
tls=TLSConfig(), # uses system CA bundle
)
# mTLS with custom certificates
with open("ca.pem", "rb") as f:
ca = f.read()
with open("client.crt", "rb") as f:
cert = f.read()
with open("client.key", "rb") as f:
key = f.read()
client = KyroDBClient(
target="db.internal:50051",
api_key=os.environ["KYRODB_API_KEY"],
tls=TLSConfig(
root_certificates=ca,
certificate_chain=cert,
private_key=key,
),
)API Key Rotation
Rotate keys without restarting the client. Create a new key first, then update the client:
# Static rotation
client.set_api_key("kyro_live_new_key_here")
# Dynamic provider (fetches key per call)
client.set_api_key_provider(lambda: fetch_key_from_vault())Best Practices
- Reuse one client per process or worker. Do not create a new client per request.
- Always set explicit timeouts. Infinite waits are bad production hygiene.
- Load API keys and endpoints from environment variables or a secrets manager.
- Rotate keys by creating the replacement first, updating clients, then revoking the old key.
- Use
bulk_insertfor batch ingestion — it streams via gRPC and avoids per-request overhead. - For batch inference workloads, reuse a persistent client and issue multiple
searchcalls instead of reconnecting per query.
Next Steps
- REST API if you prefer the managed HTTP gateway over direct gRPC.
- Namespaces for data partitioning within a project.
- Errors for HTTP error codes and retry guidance.