SDKsPythonExamples

Python Examples

Complete, runnable examples demonstrating ThinkHive Python SDK features.

RAG Pipeline

A complete RAG pipeline with full tracing.

rag_pipeline.py
import os
import thinkhive
from openai import OpenAI
from pinecone import Pinecone
 
# Initialize
thinkhive.init(service_name="rag-agent")
 
client = OpenAI()
pc = Pinecone()
index = pc.Index("knowledge-base")
 
 
def get_embedding(text: str) -> list[float]:
    """Get embedding for text."""
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text,
    )
    return response.data[0].embedding
 
 
@thinkhive.trace_retrieval()
def search_documents(query: str, top_k: int = 5) -> list[str]:
    """Search for relevant documents."""
    embedding = get_embedding(query)
 
    results = index.query(
        vector=embedding,
        top_k=top_k,
        include_metadata=True,
    )
 
    return [match.metadata["text"] for match in results.matches]
 
 
@thinkhive.trace_llm(model_name="gpt-4", provider="openai")
def generate_answer(question: str, context: str) -> str:
    """Generate answer based on context."""
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {
                "role": "system",
                "content": f"Answer based on this context:\n\n{context}",
            },
            {"role": "user", "content": question},
        ],
        temperature=0.7,
    )
    return response.choices[0].message.content
 
 
def rag_chat(question: str) -> str:
    """Complete RAG pipeline."""
    tracer = thinkhive.get_tracer()
 
    with tracer.start_as_current_span("rag_pipeline") as span:
        span.set_attribute("question", question)
 
        # Retrieve relevant documents
        documents = search_documents(question)
        context = "\n\n".join(documents)
 
        span.set_attribute("document_count", len(documents))
 
        # Generate answer
        answer = generate_answer(question, context)
 
        return answer
 
 
if __name__ == "__main__":
    questions = [
        "What is ThinkHive?",
        "How does RAG evaluation work?",
        "What are the pricing tiers?",
    ]
 
    for q in questions:
        print(f"Q: {q}")
        a = rag_chat(q)
        print(f"A: {a}\n")

LangChain Integration

Using ThinkHive with LangChain.

langchain_example.py
import thinkhive
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
 
thinkhive.init(service_name="langchain-agent")
 
 
# Simple chain with tracing
@thinkhive.trace_llm(model_name="gpt-4", provider="openai")
def run_simple_chain(topic: str) -> str:
    """Run a simple LangChain chain."""
    model = ChatOpenAI(model="gpt-4")
    prompt = ChatPromptTemplate.from_template(
        "Write a short paragraph about {topic}"
    )
    chain = prompt | model | StrOutputParser()
 
    return chain.invoke({"topic": topic})
 
 
# Tool with tracing
@thinkhive.trace_tool(tool_name="calculator")
def calculate(expression: str) -> str:
    """Calculate a mathematical expression."""
    try:
        result = eval(expression)
        return str(result)
    except Exception as e:
        return f"Error: {e}"
 
 
@thinkhive.trace_tool(tool_name="search")
def web_search(query: str) -> str:
    """Search the web (mock implementation)."""
    return f"Search results for: {query}"
 
 
# Agent with multiple tools
def create_agent():
    """Create a LangChain agent with ThinkHive tracing."""
    model = ChatOpenAI(model="gpt-4")
 
    tools = [
        Tool(
            name="Calculator",
            func=calculate,
            description="Useful for math calculations",
        ),
        Tool(
            name="Search",
            func=web_search,
            description="Search the web for information",
        ),
    ]
 
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful assistant with access to tools."),
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}"),
    ])
 
    agent = create_openai_functions_agent(model, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools)
 
    return executor
 
 
@thinkhive.trace_llm(model_name="gpt-4", provider="openai")
def run_agent(question: str) -> str:
    """Run the agent with tracing."""
    executor = create_agent()
    result = executor.invoke({"input": question})
    return result["output"]
 
 
if __name__ == "__main__":
    # Simple chain
    result = run_simple_chain("AI observability")
    print(f"Chain result: {result}\n")
 
    # Agent with tools
    answer = run_agent("What is 42 * 17?")
    print(f"Agent answer: {answer}")

Multi-Turn Conversation

Tracking conversation context.

conversation.py
import thinkhive
from openai import OpenAI
from dataclasses import dataclass, field
from typing import List, Dict
 
thinkhive.init(service_name="conversation-agent")
 
client = OpenAI()
 
 
@dataclass
class Conversation:
    """Manages a multi-turn conversation."""
 
    system_prompt: str
    conversation_id: str = field(default_factory=lambda: f"conv_{id(object())}")
    messages: List[Dict[str, str]] = field(default_factory=list)
 
    def __post_init__(self):
        self.messages.append({
            "role": "system",
            "content": self.system_prompt,
        })
 
    @thinkhive.trace_llm(model_name="gpt-4", provider="openai")
    def chat(self, user_message: str) -> str:
        """Send a message and get a response."""
        from opentelemetry import trace
 
        # Get current span and add context
        span = trace.get_current_span()
        span.set_attribute("conversation.id", self.conversation_id)
        span.set_attribute("conversation.turn", len(self.messages))
 
        # Add user message
        self.messages.append({
            "role": "user",
            "content": user_message,
        })
 
        # Generate response
        response = client.chat.completions.create(
            model="gpt-4",
            messages=self.messages,
        )
 
        assistant_message = response.choices[0].message.content
 
        # Add assistant response
        self.messages.append({
            "role": "assistant",
            "content": assistant_message,
        })
 
        return assistant_message
 
 
if __name__ == "__main__":
    conv = Conversation(
        system_prompt="You are a helpful customer support agent for ThinkHive."
    )
 
    # Multi-turn conversation
    turns = [
        "Hi, I need help with my account",
        "I forgot my password",
        "Can you send me a reset link?",
    ]
 
    for user_msg in turns:
        print(f"User: {user_msg}")
        response = conv.chat(user_msg)
        print(f"Agent: {response}\n")

Async Example

Using async/await with ThinkHive.

async_example.py
import asyncio
import thinkhive
from openai import AsyncOpenAI
 
thinkhive.init(service_name="async-agent")
 
client = AsyncOpenAI()
 
 
@thinkhive.trace_llm(model_name="gpt-4", provider="openai")
async def async_chat(message: str) -> str:
    """Async chat completion."""
    response = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": message}],
    )
    return response.choices[0].message.content
 
 
@thinkhive.trace_retrieval()
async def async_search(query: str) -> list[str]:
    """Async document search (mock)."""
    await asyncio.sleep(0.1)  # Simulate async operation
    return [f"Document about {query}"]
 
 
async def async_rag(question: str) -> str:
    """Async RAG pipeline."""
    tracer = thinkhive.get_tracer()
 
    with tracer.start_as_current_span("async_rag"):
        # Parallel retrieval and processing
        docs = await async_search(question)
        context = "\n".join(docs)
 
        answer = await async_chat(f"Context: {context}\n\nQuestion: {question}")
 
        return answer
 
 
async def batch_process(questions: list[str]) -> list[str]:
    """Process multiple questions in parallel."""
    tracer = thinkhive.get_tracer()
 
    with tracer.start_as_current_span("batch_process") as span:
        span.set_attribute("batch_size", len(questions))
 
        # Process all questions concurrently
        tasks = [async_rag(q) for q in questions]
        answers = await asyncio.gather(*tasks)
 
        return answers
 
 
if __name__ == "__main__":
    questions = [
        "What is ThinkHive?",
        "How do I get started?",
        "What are the pricing options?",
    ]
 
    answers = asyncio.run(batch_process(questions))
 
    for q, a in zip(questions, answers):
        print(f"Q: {q}")
        print(f"A: {a}\n")

Error Handling

Robust error handling with tracing.

error_handling.py
import thinkhive
from openai import OpenAI, RateLimitError, APIError
import time
 
thinkhive.init(service_name="robust-agent")
 
client = OpenAI()
 
 
class RetryableError(Exception):
    """Error that can be retried."""
    pass
 
 
@thinkhive.trace_llm(model_name="gpt-4", provider="openai")
def robust_llm_call(prompt: str, max_retries: int = 3) -> str:
    """LLM call with retry logic."""
    from opentelemetry import trace
 
    last_error = None
 
    for attempt in range(1, max_retries + 1):
        span = trace.get_current_span()
        span.set_attribute("retry.attempt", attempt)
        span.set_attribute("retry.max", max_retries)
 
        try:
            response = client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": prompt}],
            )
            return response.choices[0].message.content
 
        except RateLimitError as e:
            last_error = e
            span.set_attribute("error.type", "rate_limit")
            if attempt < max_retries:
                wait_time = 2 ** attempt
                span.add_event("retry_wait", {"seconds": wait_time})
                time.sleep(wait_time)
 
        except APIError as e:
            last_error = e
            span.set_attribute("error.type", "api_error")
            span.set_attribute("error.message", str(e))
            raise
 
    raise last_error
 
 
if __name__ == "__main__":
    try:
        result = robust_llm_call("What is ThinkHive?")
        print(f"Result: {result}")
    except Exception as e:
        print(f"Failed after retries: {e}")

Running Examples

  1. Install dependencies:
pip install thinkhive openai pinecone-client langchain langchain-openai
  1. Set environment variables:
export THINKHIVE_API_KEY=thk_your_key
export OPENAI_API_KEY=sk_your_key
export PINECONE_API_KEY=your_pinecone_key
  1. Run an example:
python rag_pipeline.py

View Your Traces: After running examples, visit app.thinkhive.ai/traces to see the captured data.

Next Steps