Python Examples
Complete, runnable examples demonstrating ThinkHive Python SDK features.
RAG Pipeline
A complete RAG pipeline with full tracing.
rag_pipeline.py
import os
import thinkhive
from openai import OpenAI
from pinecone import Pinecone
# Initialize
thinkhive.init(service_name="rag-agent")
client = OpenAI()
pc = Pinecone()
index = pc.Index("knowledge-base")
def get_embedding(text: str) -> list[float]:
"""Get embedding for text."""
response = client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
@thinkhive.trace_retrieval()
def search_documents(query: str, top_k: int = 5) -> list[str]:
"""Search for relevant documents."""
embedding = get_embedding(query)
results = index.query(
vector=embedding,
top_k=top_k,
include_metadata=True,
)
return [match.metadata["text"] for match in results.matches]
@thinkhive.trace_llm(model_name="gpt-4", provider="openai")
def generate_answer(question: str, context: str) -> str:
"""Generate answer based on context."""
response = client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": f"Answer based on this context:\n\n{context}",
},
{"role": "user", "content": question},
],
temperature=0.7,
)
return response.choices[0].message.content
def rag_chat(question: str) -> str:
"""Complete RAG pipeline."""
tracer = thinkhive.get_tracer()
with tracer.start_as_current_span("rag_pipeline") as span:
span.set_attribute("question", question)
# Retrieve relevant documents
documents = search_documents(question)
context = "\n\n".join(documents)
span.set_attribute("document_count", len(documents))
# Generate answer
answer = generate_answer(question, context)
return answer
if __name__ == "__main__":
questions = [
"What is ThinkHive?",
"How does RAG evaluation work?",
"What are the pricing tiers?",
]
for q in questions:
print(f"Q: {q}")
a = rag_chat(q)
print(f"A: {a}\n")LangChain Integration
Using ThinkHive with LangChain.
langchain_example.py
import thinkhive
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
thinkhive.init(service_name="langchain-agent")
# Simple chain with tracing
@thinkhive.trace_llm(model_name="gpt-4", provider="openai")
def run_simple_chain(topic: str) -> str:
"""Run a simple LangChain chain."""
model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template(
"Write a short paragraph about {topic}"
)
chain = prompt | model | StrOutputParser()
return chain.invoke({"topic": topic})
# Tool with tracing
@thinkhive.trace_tool(tool_name="calculator")
def calculate(expression: str) -> str:
"""Calculate a mathematical expression."""
try:
result = eval(expression)
return str(result)
except Exception as e:
return f"Error: {e}"
@thinkhive.trace_tool(tool_name="search")
def web_search(query: str) -> str:
"""Search the web (mock implementation)."""
return f"Search results for: {query}"
# Agent with multiple tools
def create_agent():
"""Create a LangChain agent with ThinkHive tracing."""
model = ChatOpenAI(model="gpt-4")
tools = [
Tool(
name="Calculator",
func=calculate,
description="Useful for math calculations",
),
Tool(
name="Search",
func=web_search,
description="Search the web for information",
),
]
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant with access to tools."),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
agent = create_openai_functions_agent(model, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools)
return executor
@thinkhive.trace_llm(model_name="gpt-4", provider="openai")
def run_agent(question: str) -> str:
"""Run the agent with tracing."""
executor = create_agent()
result = executor.invoke({"input": question})
return result["output"]
if __name__ == "__main__":
# Simple chain
result = run_simple_chain("AI observability")
print(f"Chain result: {result}\n")
# Agent with tools
answer = run_agent("What is 42 * 17?")
print(f"Agent answer: {answer}")Multi-Turn Conversation
Tracking conversation context.
conversation.py
import thinkhive
from openai import OpenAI
from dataclasses import dataclass, field
from typing import List, Dict
thinkhive.init(service_name="conversation-agent")
client = OpenAI()
@dataclass
class Conversation:
"""Manages a multi-turn conversation."""
system_prompt: str
conversation_id: str = field(default_factory=lambda: f"conv_{id(object())}")
messages: List[Dict[str, str]] = field(default_factory=list)
def __post_init__(self):
self.messages.append({
"role": "system",
"content": self.system_prompt,
})
@thinkhive.trace_llm(model_name="gpt-4", provider="openai")
def chat(self, user_message: str) -> str:
"""Send a message and get a response."""
from opentelemetry import trace
# Get current span and add context
span = trace.get_current_span()
span.set_attribute("conversation.id", self.conversation_id)
span.set_attribute("conversation.turn", len(self.messages))
# Add user message
self.messages.append({
"role": "user",
"content": user_message,
})
# Generate response
response = client.chat.completions.create(
model="gpt-4",
messages=self.messages,
)
assistant_message = response.choices[0].message.content
# Add assistant response
self.messages.append({
"role": "assistant",
"content": assistant_message,
})
return assistant_message
if __name__ == "__main__":
conv = Conversation(
system_prompt="You are a helpful customer support agent for ThinkHive."
)
# Multi-turn conversation
turns = [
"Hi, I need help with my account",
"I forgot my password",
"Can you send me a reset link?",
]
for user_msg in turns:
print(f"User: {user_msg}")
response = conv.chat(user_msg)
print(f"Agent: {response}\n")Async Example
Using async/await with ThinkHive.
async_example.py
import asyncio
import thinkhive
from openai import AsyncOpenAI
thinkhive.init(service_name="async-agent")
client = AsyncOpenAI()
@thinkhive.trace_llm(model_name="gpt-4", provider="openai")
async def async_chat(message: str) -> str:
"""Async chat completion."""
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": message}],
)
return response.choices[0].message.content
@thinkhive.trace_retrieval()
async def async_search(query: str) -> list[str]:
"""Async document search (mock)."""
await asyncio.sleep(0.1) # Simulate async operation
return [f"Document about {query}"]
async def async_rag(question: str) -> str:
"""Async RAG pipeline."""
tracer = thinkhive.get_tracer()
with tracer.start_as_current_span("async_rag"):
# Parallel retrieval and processing
docs = await async_search(question)
context = "\n".join(docs)
answer = await async_chat(f"Context: {context}\n\nQuestion: {question}")
return answer
async def batch_process(questions: list[str]) -> list[str]:
"""Process multiple questions in parallel."""
tracer = thinkhive.get_tracer()
with tracer.start_as_current_span("batch_process") as span:
span.set_attribute("batch_size", len(questions))
# Process all questions concurrently
tasks = [async_rag(q) for q in questions]
answers = await asyncio.gather(*tasks)
return answers
if __name__ == "__main__":
questions = [
"What is ThinkHive?",
"How do I get started?",
"What are the pricing options?",
]
answers = asyncio.run(batch_process(questions))
for q, a in zip(questions, answers):
print(f"Q: {q}")
print(f"A: {a}\n")Error Handling
Robust error handling with tracing.
error_handling.py
import thinkhive
from openai import OpenAI, RateLimitError, APIError
import time
thinkhive.init(service_name="robust-agent")
client = OpenAI()
class RetryableError(Exception):
"""Error that can be retried."""
pass
@thinkhive.trace_llm(model_name="gpt-4", provider="openai")
def robust_llm_call(prompt: str, max_retries: int = 3) -> str:
"""LLM call with retry logic."""
from opentelemetry import trace
last_error = None
for attempt in range(1, max_retries + 1):
span = trace.get_current_span()
span.set_attribute("retry.attempt", attempt)
span.set_attribute("retry.max", max_retries)
try:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
except RateLimitError as e:
last_error = e
span.set_attribute("error.type", "rate_limit")
if attempt < max_retries:
wait_time = 2 ** attempt
span.add_event("retry_wait", {"seconds": wait_time})
time.sleep(wait_time)
except APIError as e:
last_error = e
span.set_attribute("error.type", "api_error")
span.set_attribute("error.message", str(e))
raise
raise last_error
if __name__ == "__main__":
try:
result = robust_llm_call("What is ThinkHive?")
print(f"Result: {result}")
except Exception as e:
print(f"Failed after retries: {e}")Running Examples
- Install dependencies:
pip install thinkhive openai pinecone-client langchain langchain-openai- Set environment variables:
export THINKHIVE_API_KEY=thk_your_key
export OPENAI_API_KEY=sk_your_key
export PINECONE_API_KEY=your_pinecone_key- Run an example:
python rag_pipeline.pyView Your Traces: After running examples, visit app.thinkhive.ai/traces to see the captured data.
Next Steps
- JavaScript SDK - TypeScript/JavaScript examples
- MCP Server - Claude Code integration
- Guides - In-depth feature guides