Call LLMs from expressions
This tutorial shows you how to call LLM APIs from Xorq expressions. You’ll learn to set up OpenAI, handle batch processing, manage errors, and optimize costs through caching.
After completing this tutorial, you can integrate LLM-powered features into production data pipelines with proper error handling and cost controls.
Prerequisites
You need:
- Xorq installed (see Install Xorq)
- OpenAI Python client:
pip install openai - OpenAI API key from platform.openai.com
- Basic understanding of LLM APIs
Set up your API key
Before writing code, set your OpenAI API key as an environment variable.
export OPENAI_API_KEY="your-api-key-here"set OPENAI_API_KEY=your-api-key-here$env:OPENAI_API_KEY="your-api-key-here"Never commit API keys to version control. Use environment variables or a secrets manager in production.
Why call LLMs from expressions?
You have customer reviews, support tickets, or social media posts that need AI analysis. A simple loop that calls the OpenAI API for each row gives you slow processing, no error handling, expensive duplicate calls, and code that doesn’t integrate with your data pipeline.
This matters because with 10,000 customer reviews, a naive loop might crash after $50 in API costs when request 3,847 times out. You restart manually, then realize 2,000 reviews were duplicates that you processed twice, wasting another $20.
The solution: integrate LLM calls into Xorq expressions with batch processing, error handling, and caching. You define your analysis function once, wrap it for table operations, and Xorq handles execution efficiently.
Tutorial file structure
This tutorial creates three separate Python files:
your-project/
├── sentiment_analyzer.py # Basic LLM sentiment function
├── batch_processor.py # DataFrame batch processing
└── xorq_sentiment.py # Xorq integration
Each section tells you which file to create. Don’t combine them into one file!
Create sentiment analysis function
Start by writing a basic sentiment analyzer. Create a file called sentiment_analyzer.py:
# sentiment_analyzer.py
import functools
import os
from openai import OpenAI
@functools.cache
def get_client():
return OpenAI(
api_key=os.environ["OPENAI_API_KEY"],
timeout=10.0,
max_retries=2,
)
def analyze_sentiment(text):
if not text or text.strip() == "":
return "NEUTRAL"
messages = [
{
"role": "system",
"content": "You are a sentiment analyzer. Respond with only one word: POSITIVE, NEGATIVE, or NEUTRAL."
},
{
"role": "user",
"content": f"Analyze the sentiment: {text}"
}
]
response = get_client().chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=10,
temperature=0,
)
return response.choices[0].message.content.strip()
if __name__ == "__main__":
test_text = "This product exceeded my expectations!"
result = analyze_sentiment(test_text)
print(f"Text: {test_text}")
print(f"Sentiment: {result}")- 1
- Cache the client to reuse connections. Set 10-second timeout and allow two retries for transient failures.
- 2
- Handle empty strings before making API calls (saves money and avoids errors).
- 3
- Create a focused prompt with system context and user instruction.
- 4
-
Call OpenAI with GPT-3.5-turbo. Limit response to 10 tokens with
temperature=0for consistent results. - 5
- Test the function on sample text.
Run the file:
python sentiment_analyzer.pyYou see:
Text: This product exceeded my expectations!
Sentiment: POSITIVE
What does success look like? Your function validates input before calling the API. Empty text returns immediately without wasting a call. The temperature=0 setting gives deterministic results: same input produces the same output, which matters for reproducible analysis.
Understanding the parameters: higher timeouts (30+ seconds) work for long generations but delay error detection. Lower timeouts (3-5 seconds) catch problems faster but might fail on legitimate slow responses. Two retries balance reliability with fast failure.
Add error handling
API calls fail. Timeouts happen. Rate limits trigger. Your code needs to handle these gracefully.
Update sentiment_analyzer.py to add retry logic with exponential backoff:
This version adds retry logic with exponential backoff to sentiment_analyzer.py. It wraps the OpenAI API call in a try-except loop that handles rate limits with increasing wait times, retries timeouts immediately, and returns “ERROR” for other failures instead of crashing your pipeline.
# sentiment_analyzer.py
import functools
import os
import time
from openai import OpenAI
@functools.cache
def get_client():
return OpenAI(
api_key=os.environ["OPENAI_API_KEY"],
timeout=10.0,
max_retries=2,
)
def analyze_sentiment(text):
if not text or text.strip() == "":
return "NEUTRAL"
messages = [
{
"role": "system",
"content": "You are a sentiment analyzer. Respond with only one word: POSITIVE, NEGATIVE, or NEUTRAL."
},
{
"role": "user",
"content": f"Analyze the sentiment: {text}"
}
]
max_attempts = 3
for attempt in range(max_attempts):
try:
response = get_client().chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=10,
temperature=0,
)
return response.choices[0].message.content.strip()
except Exception as e:
error_msg = str(e).lower()
if "rate limit" in error_msg and attempt < max_attempts - 1:
wait_time = (attempt + 1) * 2
print(f"Rate limit hit, waiting {wait_time} seconds...")
time.sleep(wait_time)
continue
elif "timeout" in error_msg and attempt < max_attempts - 1:
print(f"Timeout on attempt {attempt + 1}, retrying...")
continue
else:
print(f"Error analyzing text: {e}")
return "ERROR"
if __name__ == "__main__":
test_texts = [
"Absolutely love this!",
"Terrible experience.",
"",
"It's okay, nothing special."
]
for text in test_texts:
result = analyze_sentiment(text)
print(f"'{text[:30]}...' → {result}")- 1
- Try up to three times before giving up.
- 2
- Catch all exceptions and inspect the error message.
- 3
- If you hit rate limits, wait with exponential backoff (2s, 4s, 6s) before retrying.
- 4
- If you get timeouts, retry immediately without waiting.
- 5
- If you exhaust retries or hit other errors, return “ERROR” to avoid crashing the pipeline.
- 6
- Test with multiple text samples including edge cases.
This diff shows the error handling you’re adding. You’re wrapping the API call in a retry loop with exponential backoff for rate limits (2s, 4s, 6s), immediate retry for timeouts, and a safe “ERROR” fallback for other exceptions. This prevents pipeline crashes when processing thousands of rows.
def analyze_sentiment(text):
if not text or text.strip() == "":
return "NEUTRAL"
messages = [
{
"role": "system",
"content": "You are a sentiment analyzer. Respond with only one word: POSITIVE, NEGATIVE, or NEUTRAL."
},
{
"role": "user",
"content": f"Analyze the sentiment: {text}"
}
]
- response = get_client().chat.completions.create(
- model="gpt-3.5-turbo",
- messages=messages,
- max_tokens=10,
- temperature=0,
- )
-
- return response.choices[0].message.content.strip()
+ max_attempts = 3
+ for attempt in range(max_attempts):
+ try:
+ response = get_client().chat.completions.create(
+ model="gpt-3.5-turbo",
+ messages=messages,
+ max_tokens=10,
+ temperature=0,
+ )
+ return response.choices[0].message.content.strip()
+
+ except Exception as e:
+ error_msg = str(e).lower()
+
+ if "rate limit" in error_msg and attempt < max_attempts - 1:
+ wait_time = (attempt + 1) * 2
+ print(f"Rate limit hit, waiting {wait_time} seconds...")
+ time.sleep(wait_time)
+ continue
+
+ elif "timeout" in error_msg and attempt < max_attempts - 1:
+ print(f"Timeout on attempt {attempt + 1}, retrying...")
+ continue
+
+ else:
+ print(f"Error analyzing text: {e}")
+ return "ERROR"
if __name__ == "__main__":
- test_text = "This product exceeded my expectations!"
- result = analyze_sentiment(test_text)
- print(f"Text: {test_text}")
- print(f"Sentiment: {result}")
+ test_texts = [
+ "Absolutely love this!",
+ "Terrible experience.",
+ "",
+ "It's okay, nothing special."
+ ]
+
+ for text in test_texts:
+ result = analyze_sentiment(text)
+ print(f"'{text[:30]}...' → {result}")Run the updated file:
python sentiment_analyzer.pyYou see:
'Absolutely love this!...' → POSITIVE
'Terrible experience....' → NEGATIVE
'...' → NEUTRAL
'It's okay, nothing special....' → NEUTRAL
What does success look like? Your function handles errors gracefully without crashing. Rate limit errors trigger exponential backoff. Timeouts retry immediately. All other errors return a safe fallback value. This prevents pipeline failures in production when processing thousands of rows.
In practice, exponential backoff for rate limits is critical. If you retry too fast, you just hit the limit again. The increasing delays (2s, 4s, 6s) give the API time to recover.
Optimize costs with caching
Duplicate text wastes money. If you analyze “This is great!” five times, you make five API calls and pay five times. Caching solves this.
Add caching to sentiment_analyzer.py:
This version adds file-based caching to eliminate duplicate API calls. It uses MD5 hashing to create cache keys, checks for cached results before calling the API, and stores successful results to disk. This saves money by avoiding redundant calls for identical text.
# sentiment_analyzer.py
import functools
import os
import time
import hashlib
import json
from pathlib import Path
from openai import OpenAI
@functools.cache
def get_client():
return OpenAI(
api_key=os.environ["OPENAI_API_KEY"],
timeout=10.0,
max_retries=2,
)
cache_dir = Path("./sentiment_cache")
cache_dir.mkdir(exist_ok=True)
def get_cache_key(text):
return hashlib.md5(text.encode()).hexdigest()
def analyze_sentiment(text):
if not text or text.strip() == "":
return "NEUTRAL"
cache_key = get_cache_key(text)
cache_file = cache_dir / f"{cache_key}.json"
if cache_file.exists():
with open(cache_file, 'r') as f:
cached_result = json.load(f)
print(f"Cache hit: '{text[:30]}...'")
return cached_result["sentiment"]
messages = [
{
"role": "system",
"content": "You are a sentiment analyzer. Respond with only one word: POSITIVE, NEGATIVE, or NEUTRAL."
},
{
"role": "user",
"content": f"Analyze the sentiment: {text}"
}
]
max_attempts = 3
for attempt in range(max_attempts):
try:
response = get_client().chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=10,
temperature=0,
)
sentiment = response.choices[0].message.content.strip()
with open(cache_file, 'w') as f:
json.dump({"text": text, "sentiment": sentiment}, f)
return sentiment
except Exception as e:
error_msg = str(e).lower()
if "rate limit" in error_msg and attempt < max_attempts - 1:
wait_time = (attempt + 1) * 2
print(f"Rate limit hit, waiting {wait_time} seconds...")
time.sleep(wait_time)
continue
elif "timeout" in error_msg and attempt < max_attempts - 1:
print(f"Timeout on attempt {attempt + 1}, retrying...")
continue
else:
print(f"Error analyzing text: {e}")
return "ERROR"
if __name__ == "__main__":
test_texts = [
"This is amazing!",
"This is terrible.",
"This is amazing!", # Duplicate
"This is okay."
]
print("Processing with cache:")
for text in test_texts:
result = analyze_sentiment(text)
print(f"'{text}' → {result}")- 1
- Create a directory to store cached results.
- 2
- Generate a unique cache key from the text using MD5 hash.
- 3
- Check cache before calling the API.
- 4
- Compute the cache key and file path.
- 5
- If the cache file exists, read and return the cached result (no API call).
- 6
- Save the result to cache for future use.
- 7
- Test with duplicate text to see cache hits.
This diff adds the caching layer. You’re importing hashing and file utilities, creating a cache directory, generating MD5 cache keys, checking for cached results before API calls, and writing successful results to disk. This prevents expensive duplicate API calls for identical text.
+import hashlib
+import json
+from pathlib import Path
@functools.cache
def get_client():
return OpenAI(
api_key=os.environ["OPENAI_API_KEY"],
timeout=10.0,
max_retries=2,
)
+cache_dir = Path("./sentiment_cache")
+cache_dir.mkdir(exist_ok=True)
+
+def get_cache_key(text):
+ return hashlib.md5(text.encode()).hexdigest()
+
def analyze_sentiment(text):
if not text or text.strip() == "":
return "NEUTRAL"
+ cache_key = get_cache_key(text)
+ cache_file = cache_dir / f"{cache_key}.json"
+
+ if cache_file.exists():
+ with open(cache_file, 'r') as f:
+ cached_result = json.load(f)
+ print(f"Cache hit: '{text[:30]}...'")
+ return cached_result["sentiment"]
+
messages = [
{
"role": "system",
"content": "You are a sentiment analyzer. Respond with only one word: POSITIVE, NEGATIVE, or NEUTRAL."
},
{
"role": "user",
"content": f"Analyze the sentiment: {text}"
}
]
max_attempts = 3
for attempt in range(max_attempts):
try:
response = get_client().chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=10,
temperature=0,
)
- return response.choices[0].message.content.strip()
+ sentiment = response.choices[0].message.content.strip()
+
+ with open(cache_file, 'w') as f:
+ json.dump({"text": text, "sentiment": sentiment}, f)
+
+ return sentiment
except Exception as e:
error_msg = str(e).lower()
if "rate limit" in error_msg and attempt < max_attempts - 1:
wait_time = (attempt + 1) * 2
print(f"Rate limit hit, waiting {wait_time} seconds...")
time.sleep(wait_time)
continue
elif "timeout" in error_msg and attempt < max_attempts - 1:
print(f"Timeout on attempt {attempt + 1}, retrying...")
continue
else:
print(f"Error analyzing text: {e}")
return "ERROR"
if __name__ == "__main__":
test_texts = [
- "Absolutely love this!",
- "Terrible experience.",
- "",
- "It's okay, nothing special."
+ "This is amazing!",
+ "This is terrible.",
+ "This is amazing!", # Duplicate
+ "This is okay."
]
+ print("Processing with cache:")
for text in test_texts:
result = analyze_sentiment(text)
- print(f"'{text[:30]}...' → {result}")
+ print(f"'{text}' → {result}")Run the file:
python sentiment_analyzer.pyYou see:
Processing with cache:
'This is amazing!' → POSITIVE
'This is terrible.' → NEGATIVE
Cache hit: 'This is amazing!...'
'This is amazing!' → POSITIVE
'This is okay.' → NEUTRAL
What does success look like? The third text (“This is amazing!”) hits the cache instead of calling the API. With 10,000 reviews where 2,000 are duplicates, you save 2,000 API calls. At $0.002 per call with GPT-3.5-turbo, that’s $4 saved.
Cache aggressively for static datasets (historical reviews, archived documents). Cache less for real-time streams where text changes constantly.
Process tables in batches
Now you wrap your function to process entire DataFrames.
Create a new file called batch_processor.py in the same directory as sentiment_analyzer.py. Don’t add this code to the existing file!
Create batch_processor.py:
# batch_processor.py
import pandas as pd
from urllib.parse import unquote_plus
from sentiment_analyzer import analyze_sentiment
def process_batch(df, text_column, output_column):
df[output_column] = df[text_column].apply(
lambda text: analyze_sentiment(unquote_plus(str(text)))
)
return df
if __name__ == "__main__":
test_df = pd.DataFrame({
"review_text": [
"Best purchase ever!",
"Would not recommend.",
"Good value for money.",
"Complete waste of time."
]
})
result_df = process_batch(test_df, "review_text", "sentiment")
print("\nBatch processing results:")
print(result_df)- 1
- Create a function that takes a DataFrame and column names.
- 2
-
Apply sentiment analysis to each row in the text column. Handle URL encoding with
unquote_plus. - 3
- Create test data with four reviews.
- 4
- Process the entire batch and display results.
Run the file:
python batch_processor.pyYou see:
Batch processing results:
review_text sentiment
0 Best purchase ever! POSITIVE
1 Would not recommend. NEGATIVE
2 Good value for money. POSITIVE
3 Complete waste of time. NEGATIVE
Understanding batch processing: This processes rows one at a time within a batch operation. The error handling you added means if one API call fails, the pipeline continues processing other rows. You don’t lose all your work because row 847 had a network timeout.
The unquote_plus function handles URL-encoded text that might appear in web-scraped data. If your text contains %20 for spaces or %2C for commas, this decodes it before analysis.
Integrate with Xorq expressions
Connect everything to Xorq’s expression system.
Create another new file called xorq_sentiment.py. You should now have 3 files: - sentiment_analyzer.py (basic LLM function) - batch_processor.py (processes DataFrames) - xorq_sentiment.py (Xorq integration)
Create xorq_sentiment.py:
# xorq_sentiment.py
import xorq.api as xo
from xorq.flight.utils import schema_concat, schema_contains
from xorq.common.utils.toolz_utils import curry
from batch_processor import process_batch
@curry
def sentiment_batch(df, input_col, output_col):
return process_batch(df, input_col, output_col)
input_col = "text"
output_col = "sentiment"
schema_in = schema_contains(xo.schema({input_col: "!str"}))
schema_out = schema_concat(to_concat=xo.schema({output_col: "!str"}))
sentiment_expr = xo.expr.relations.flight_udxf(
process_df=sentiment_batch(input_col=input_col, output_col=output_col),
maybe_schema_in=schema_in,
maybe_schema_out=schema_out,
name="SentimentAnalyzer"
)
if __name__ == "__main__":
hn = xo.examples.hn_posts_nano.fetch(table_name="hackernews")
expr = (
hn.filter(hn.text.notnull())
.select(hn.text)
.limit(3)
.pipe(sentiment_expr)
)
print("\nXorq expression results:")
df = expr.execute()
print(df)- 1
-
Use
@curryto create a partially applicable function for Xorq integration. - 2
- Define input and output column names.
- 3
- Specify schema requirements: input must have a non-null string column, output adds a new string column.
- 4
- Wrap the batch function in a Xorq expression that validates schemas and integrates with the execution engine.
- 5
- Load the HackerNews example dataset.
- 6
- Build a pipeline: filter for non-null text, select text column, limit to three rows, apply sentiment analysis.
- 7
- Execute the full pipeline and display results.
Run the file:
python xorq_sentiment.pyYou see:
Xorq expression results:
text sentiment
0 ClickHouse is incredibly fast for analytics POSITIVE
1 Had some issues with the documentation NEGATIVE
2 It's a solid choice for OLAP workloads POSITIVE
What does success look like? You integrated your LLM calls into Xorq’s expression system. The schema validation checks if your input table has the required text column before making any API calls. Deferred execution means Xorq optimizes the entire pipeline (filter, select, limit, sentiment analysis) before running anything.
Think of the expression wrapper as adding type safety and optimization to your LLM calls. You define what columns you need and what columns you add, and Xorq enforces those contracts at pipeline build time, not runtime.
Extend to other use cases
The pattern extends beyond sentiment analysis. Here are variations for different LLM tasks.
Text classification - Categorize by topic:
def classify_topic(text):
if not text or text.strip() == "":
return "UNKNOWN"
messages = [
{
"role": "system",
"content": "Classify as TECHNICAL, BUSINESS, or SUPPORT."
},
{
"role": "user",
"content": f"Classify: {text}"
}
]
# Add error handling and caching as shown aboveText summarization - Create concise summaries:
def summarize_text(text):
if not text or text.strip() == "":
return ""
messages = [
{
"role": "system",
"content": "Create a one-sentence summary."
},
{
"role": "user",
"content": f"Summarize: {text}"
}
]
response = get_client().chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=50, # Longer for summaries
temperature=0.3, # Slight creativity
)
return response.choices[0].message.content.strip()Entity extraction - Extract company names:
def extract_companies(text):
if not text or text.strip() == "":
return "[]"
messages = [
{
"role": "system",
"content": "Extract company names as JSON array."
},
{
"role": "user",
"content": f"Extract companies: {text}"
}
]
response = get_client().chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=100,
temperature=0,
)
return response.choices[0].message.content.strip()Each variation follows the same structure: validate input, create focused prompt, call API with appropriate parameters, add error handling, add caching, wrap for batch processing, integrate with expressions. The error handling, caching, and batch processing patterns work identically across all these use cases.
Complete example
Here’s the full working implementation combining all components:
# complete_sentiment_pipeline.py
import functools
import os
import time
import hashlib
import json
from pathlib import Path
from urllib.parse import unquote_plus
import pandas as pd
from openai import OpenAI
import xorq.api as xo
from xorq.flight.utils import schema_concat, schema_contains
from xorq.common.utils.toolz_utils import curry
# Client setup
@functools.cache
def get_client():
return OpenAI(
api_key=os.environ["OPENAI_API_KEY"],
timeout=10.0,
max_retries=2,
)
# Caching setup
cache_dir = Path("./sentiment_cache")
cache_dir.mkdir(exist_ok=True)
def get_cache_key(text):
return hashlib.md5(text.encode()).hexdigest()
# Sentiment analysis with error handling and caching
def analyze_sentiment(text):
if not text or text.strip() == "":
return "NEUTRAL"
cache_key = get_cache_key(text)
cache_file = cache_dir / f"{cache_key}.json"
if cache_file.exists():
with open(cache_file, 'r') as f:
return json.load(f)["sentiment"]
messages = [
{
"role": "system",
"content": "You are a sentiment analyzer. Respond with only one word: POSITIVE, NEGATIVE, or NEUTRAL."
},
{
"role": "user",
"content": f"Analyze the sentiment: {text}"
}
]
max_attempts = 3
for attempt in range(max_attempts):
try:
response = get_client().chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=10,
temperature=0,
)
sentiment = response.choices[0].message.content.strip()
with open(cache_file, 'w') as f:
json.dump({"text": text, "sentiment": sentiment}, f)
return sentiment
except Exception as e:
error_msg = str(e).lower()
if "rate limit" in error_msg and attempt < max_attempts - 1:
wait_time = (attempt + 1) * 2
time.sleep(wait_time)
continue
elif "timeout" in error_msg and attempt < max_attempts - 1:
continue
else:
return "ERROR"
# Batch processing
def process_batch(df, text_column, output_column):
df[output_column] = df[text_column].apply(
lambda text: analyze_sentiment(unquote_plus(str(text)))
)
return df
# Xorq integration
@curry
def sentiment_batch(df, input_col, output_col):
return process_batch(df, input_col, output_col)
input_col = "text"
output_col = "sentiment"
schema_in = schema_contains(xo.schema({input_col: "!str"}))
schema_out = schema_concat(to_concat=xo.schema({output_col: "!str"}))
sentiment_expr = xo.expr.relations.flight_udxf(
process_df=sentiment_batch(input_col=input_col, output_col=output_col),
maybe_schema_in=schema_in,
maybe_schema_out=schema_out,
name="SentimentAnalyzer"
)
# Apply to data
if __name__ == "__main__":
hn = xo.examples.hn_posts_nano.fetch(table_name="hackernews")
expr = (
hn.filter(hn.text.notnull())
.select(hn.text)
.limit(5)
.pipe(sentiment_expr)
)
df = expr.execute()
print(df)Notice how the complete implementation separates concerns: client configuration (timeout, retries), caching logic (file storage, cache keys), error handling (rate limits, timeouts), batch processing (DataFrame operations), and expression integration (schema validation). Each piece is testable independently.
What you learned
You’ve learned how to call LLM APIs from Xorq expressions with production-ready patterns. Here’s what you accomplished:
- Set up OpenAI client with 10-second timeout and 2 retries for production reliability
- Built sentiment analyzer that handles rate limits with exponential backoff (2s, 4s, 6s delays)
- Implemented MD5-based caching that saves $4 on 10,000 reviews with 2,000 duplicates
- Created batch processor that continues on individual failures without losing work
- Wrapped LLM calls in Xorq expressions with schema validation (
!strinput, adds sentiment column) - Extended the pattern to classification, summarization, and entity extraction use cases
The key insight? Production LLM integration requires more than just API calls. Error handling prevents pipeline crashes. Caching eliminates wasteful duplicate calls. Batch processing respects rate limits. Schema validation catches errors early. The pattern you learned scales from 100 rows to 10 million rows with the same code.
Next steps
Now that you know how to call LLMs from expressions, continue learning:
- Add LLM analysis with UDXFs dives deeper into the UDXF pattern and schema architecture
- Train your first model shows ML model training workflows
- Compare model performance demonstrates systematic model evaluation