- Existing embedding service integration
- Pyckle API key
- Python 3.9+
This migration takes 15 minutes of code changes. The validation work takes longer — and skipping it is how teams end up with a broken search they can't explain three weeks later. Do it right the first time.
Step 1: Swap the Endpoint (the 10-Minute Part)
The compatible Python SDK you're already using works with Pyckle unchanged. You're not swapping libraries — you're pointing the same client at a different base URL. One environment variable, done.
Model name mapping: text-embedding-3-small maps to bge-large-en-v1.5 for general text retrieval. For code search, use pyckle-embed-code instead — it's trained on code tokens and significantly outperforms general-purpose models on that domain.
import os
from openai import OpenAI
# Before: pointed at your existing provider
# client = OpenAI(api_key=os.environ["CURRENT_PROVIDER_API_KEY"])
# After: one line change
client = OpenAI(
api_key=os.environ["PYCKLE_API_KEY"],
base_url="https://api.pyckle.dev/v1",
)
# Everything else stays the same
response = client.embeddings.create(
model="bge-large-en-v1.5", # or pyckle-embed-code for code
input="your text here",
)
embedding = response.data[0].embedding
print(f"Dimension: {len(embedding)}") # 1024 for bge-large-en-v1.5
Do not mix embeddings from your current provider and new Pyckle embeddings in the same vector index. Cosine similarity scores are not comparable across embedding spaces. A score of 0.85 from one model tells you nothing about what 0.85 means in Pyckle's space. Keep indexes separate until you complete a full re-embed.
Step 2: Understand Why Your Thresholds Will Break
Every embedding model produces vectors in its own geometric space. Your existing similarity threshold — whatever value you use to decide "relevant" vs "not relevant" — was calibrated against your provider's embedding space. That number is now meaningless.
It's not that one model is better or worse. It's that 0.85 cosine similarity in one space is geometrically unrelated to 0.85 in another. Carry your old threshold into the new model and you'll get either too many results or too few, with no obvious signal about which direction the problem is going.
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
def score_pair(client, text_a: str, text_b: str, model: str) -> float:
"""Return cosine similarity for a single text pair."""
resp = client.embeddings.create(model=model, input=[text_a, text_b])
vec_a = np.array(resp.data[0].embedding).reshape(1, -1)
vec_b = np.array(resp.data[1].embedding).reshape(1, -1)
return float(cosine_similarity(vec_a, vec_b)[0][0])
# Quick sanity check: same text should score ~1.0
text = "Pyckle embedding API migration guide"
score = score_pair(client, text, text, model="bge-large-en-v1.5")
print(f"Self-similarity (should be ~1.0): {score:.4f}")
The threshold recalibration in Steps 3 and 4 is not optional busywork — it's the actual migration. The endpoint swap is just plumbing. Your retrieval quality lives or dies on getting the new threshold right.
Step 3: Re-Embed a Calibration Sample
Don't re-embed everything yet. Pull 500 representative chunks from your corpus and embed them with Pyckle. Then assemble 50 known-relevant pairs and 50 known-irrelevant pairs — ideally from labeled examples you already have or can quickly annotate.
If you don't have labeled pairs, build them now. Pick 50 queries you know should match specific documents. Pick 50 queries that should not match documents in your corpus. Without this ground truth, you're flying blind on what threshold to set.
import json
from pathlib import Path
def embed_batch(client, texts: list[str], model: str, batch_size: int = 100) -> list[list[float]]:
"""Embed texts in batches, return list of embedding vectors."""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
resp = client.embeddings.create(model=model, input=batch)
# Preserve order
resp.data.sort(key=lambda x: x.index)
all_embeddings.extend([d.embedding for d in resp.data])
return all_embeddings
# Load your calibration corpus (500 chunks)
with open("calibration_corpus.json") as f:
corpus = json.load(f) # list of {"id": str, "text": str}
texts = [item["text"] for item in corpus]
embeddings = embed_batch(client, texts, model="bge-large-en-v1.5")
# Save for threshold work
calibration_vecs = {
corpus[i]["id"]: embeddings[i]
for i in range(len(corpus))
}
print(f"Embedded {len(calibration_vecs)} chunks")
If you don't have an eval set, build one before migrating. Blind migrations are how you discover problems three weeks later — when a user complains that search "feels worse" and you have no baseline to compare against.
Step 4: Recalibrate Your Similarity Threshold
Score all your labeled pairs. Build two distributions: one for relevant pairs, one for irrelevant pairs. Plot them, then find the threshold where precision and recall cross — that's your new cutoff. If the distributions overlap heavily, your model choice may need revisiting before you go further.
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
# labeled_pairs: list of {"query": str, "doc_id": str, "relevant": bool}
with open("labeled_pairs.json") as f:
labeled_pairs = json.load(f)
# Pre-embed all unique queries
unique_queries = list({p["query"] for p in labeled_pairs})
query_embeddings_raw = embed_batch(client, unique_queries, model="bge-large-en-v1.5")
query_vecs = dict(zip(unique_queries, query_embeddings_raw))
relevant_scores = []
irrelevant_scores = []
for pair in labeled_pairs:
q_vec = np.array(query_vecs[pair["query"]]).reshape(1, -1)
d_vec = np.array(calibration_vecs[pair["doc_id"]]).reshape(1, -1)
score = float(cosine_similarity(q_vec, d_vec)[0][0])
if pair["relevant"]:
relevant_scores.append(score)
else:
irrelevant_scores.append(score)
# Find threshold where precision ≈ recall
thresholds = np.arange(0.0, 1.0, 0.01)
best_threshold = None
min_gap = float("inf")
for t in thresholds:
tp = sum(s >= t for s in relevant_scores)
fp = sum(s >= t for s in irrelevant_scores)
fn = sum(s < t for s in relevant_scores)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
gap = abs(precision - recall)
if gap < min_gap:
min_gap = gap
best_threshold = t
print(f"Recommended threshold: {best_threshold:.2f}")
print(f"Relevant scores — mean: {np.mean(relevant_scores):.3f}, std: {np.std(relevant_scores):.3f}")
print(f"Irrelevant scores — mean: {np.mean(irrelevant_scores):.3f}, std: {np.std(irrelevant_scores):.3f}")
# Optional: plot distributions
plt.hist(relevant_scores, bins=20, alpha=0.6, label="Relevant", color="green")
plt.hist(irrelevant_scores, bins=20, alpha=0.6, label="Irrelevant", color="red")
plt.axvline(best_threshold, color="black", linestyle="--", label=f"Threshold: {best_threshold:.2f}")
plt.legend()
plt.savefig("score_distribution.png", dpi=150)
print("Saved score_distribution.png")
Step 5: Measure Quality Before Full Migration
Run your full eval set against the new model and threshold before touching production data. If you've been following the evaluation guide, you have a scored baseline from your existing setup. Beat it, match it, or make a deliberate tradeoff — but measure it explicitly.
Compare mean reciprocal rank, precision@k, or whatever metric your application cares about. A migration that improves average precision by 4% but tanks recall on your tail queries is not a clean win.
def evaluate_retrieval(client, eval_queries, corpus_vecs, model, threshold, top_k=5):
"""
eval_queries: list of {"query": str, "relevant_ids": list[str]}
corpus_vecs: dict of {doc_id: embedding_vector}
Returns: mean reciprocal rank and precision@k
"""
doc_ids = list(corpus_vecs.keys())
doc_matrix = np.array([corpus_vecs[d] for d in doc_ids])
mrr_scores = []
precision_at_k = []
for item in eval_queries:
q_vec = np.array(
client.embeddings.create(model=model, input=item["query"]).data[0].embedding
).reshape(1, -1)
scores = cosine_similarity(q_vec, doc_matrix)[0]
ranked_ids = [doc_ids[i] for i in np.argsort(scores)[::-1] if scores[i] >= threshold][:top_k]
# MRR
rr = 0.0
for rank, doc_id in enumerate(ranked_ids, start=1):
if doc_id in item["relevant_ids"]:
rr = 1.0 / rank
break
mrr_scores.append(rr)
# Precision@k
hits = sum(1 for d in ranked_ids if d in item["relevant_ids"])
precision_at_k.append(hits / top_k)
return {
"mrr": float(np.mean(mrr_scores)),
"precision_at_5": float(np.mean(precision_at_k)),
"n_queries": len(eval_queries),
}
with open("eval_queries.json") as f:
eval_queries = json.load(f)
results = evaluate_retrieval(
client, eval_queries, calibration_vecs,
model="bge-large-en-v1.5",
threshold=best_threshold,
)
print(json.dumps(results, indent=2))
Run the same evaluation function against your current embedding model before cutting over. Concrete numbers side-by-side — not vibes — are how you justify the migration and catch regressions before they hit users.
Step 6: Re-Embed All Data and Verify
Once eval passes, re-embed your full corpus in batches. Batch size of 100 is safe for most setups. Build a fresh index — don't append to the existing one. When the new index is ready, run your eval set one more time against the full corpus to confirm numbers hold at scale.
If your infrastructure allows it, run in shadow mode first: route live queries to both indexes, log both result sets, compare offline. This catches edge cases your eval set missed. Swap the live pointer only when shadow metrics are stable.
import time
def full_reembed(client, all_documents, model, batch_size=100, delay_secs=0.1):
"""
all_documents: list of {"id": str, "text": str}
Returns: dict of {id: embedding_vector}
"""
result = {}
total = len(all_documents)
for i in range(0, total, batch_size):
batch = all_documents[i : i + batch_size]
texts = [d["text"] for d in batch]
ids = [d["id"] for d in batch]
resp = client.embeddings.create(model=model, input=texts)
resp.data.sort(key=lambda x: x.index)
for j, item in enumerate(resp.data):
result[ids[j]] = item.embedding
progress = min(i + batch_size, total)
print(f"Progress: {progress}/{total} ({100 * progress // total}%)")
time.sleep(delay_secs) # be a good API citizen
return result
# Load full corpus
with open("full_corpus.json") as f:
all_documents = json.load(f)
print(f"Re-embedding {len(all_documents)} documents...")
full_vecs = full_reembed(client, all_documents, model="bge-large-en-v1.5")
# Persist to your vector store
# Replace this with your actual index write (Chroma, Pinecone, pgvector, etc.)
with open("pyckle_index.json", "w") as f:
json.dump({k: v for k, v in full_vecs.items()}, f)
print("Full re-embed complete. Verify eval metrics before swapping live traffic.")
# Final verification
final_results = evaluate_retrieval(
client, eval_queries, full_vecs,
model="bge-large-en-v1.5",
threshold=best_threshold,
)
print("Final eval:", json.dumps(final_results, indent=2))
The full re-embed is the point of no return. Once your application is reading from the new index at the new threshold, the old embeddings are dead weight. Delete them. Keeping both around creates confusion about which index is authoritative and makes debugging retrieval issues much harder.