Overview
The conversation insights API enables you to retrieve and analyze all conversations happening across your Wolfia workspace. This includes interactions from the web UI, Slack integration, agent conversations, and more. Use this endpoint to build custom analytics dashboards, monitor AI performance, track user engagement, or export conversation data for compliance and auditing.
The conversation insights endpoint
URL: GET https://api.wolfia.com/insights/conversations
Authentication: API key required (see API overview for setup)
curl -X GET 'https://api.wolfia.com/insights/conversations?page=1&page_size=10' \
-H "X-API-Key: wolfia-api-YOUR_KEY_HERE" \
-H "Content-Type: application/json"
Query parameters
Parameter Type Required Default Description pageinteger No 1 Page number for pagination (must be > 0) page_sizeinteger No 10 Number of results per page (1-100)
Understanding source types
Conversations in Wolfia come from multiple sources, each tracked and returned by the API:
web_ui
Conversations where users interact with Wolfia through the web application interface.
slack_dm
Conversations from direct messages with the Wolfia bot in Slack.
browser_extension
Conversations initiated through the Wolfia browser extension on vendor portals and web forms.
Source type filtering: The current API returns all source types. Contact support if you need source-specific filtering for your use case.
Success response (200 OK)
{
"data" : [
{
"id" : "a1b2c3d4-e5f6-7890-abcd-ef1234567890" ,
"question" : "What are our security policies for data encryption?" ,
"answer" : "Your organization requires AES-256 encryption for data at rest and TLS 1.3 for data in transit. All encryption keys are managed through AWS KMS with automatic rotation every 90 days." ,
"user_name" : "Jane Smith" ,
"user_email" : "jane.smith@company.com" ,
"user_profile_image" : "https://example.com/avatar.jpg" ,
"source_type" : "web_ui" ,
"answered_at" : 1731628800000000
},
{
"id" : "b2c3d4e5-f6a7-8901-bcde-f12345678901" ,
"question" : "When is the next security audit?" ,
"answer" : "The next SOC 2 Type II audit is scheduled for Q1 2025, beginning January 15th." ,
"user_name" : "John Doe" ,
"user_email" : "john.doe@company.com" ,
"user_profile_image" : "https://example.com/avatar2.jpg" ,
"source_type" : "slack_dm" ,
"answered_at" : 1731542400000000
}
],
"total" : 247 ,
"page" : 1 ,
"page_size" : 10 ,
"total_pages" : 25
}
Response fields
Field Type Description dataarray Array of conversation objects data[].idstring (UUID) Unique identifier for the conversation data[].questionstring User’s question (cleaned of markdown/citations) data[].answerstring AI-generated answer (cleaned of markdown/citations) data[].user_namestring | null Name of the user who asked the question data[].user_emailstring | null Email of the user who asked the question data[].user_profile_imagestring | null Profile image URL of the user data[].source_typestring Source of the conversation (web_ui, slack_dm, browser_extension) data[].answered_atinteger Timestamp in microseconds when answered totalinteger Total number of conversations across all pages pageinteger Current page number page_sizeinteger Number of results per page total_pagesinteger Total number of pages available
Timestamps: The answered_at field uses microseconds since Unix epoch. Divide by 1,000,000 to convert to seconds for most programming languages.
Error responses
Status Code Error What it means How to fix 400 Bad Request Invalid query parameters (e.g., page_size > 100) Check your pagination parameters 401 Unauthorized Invalid or expired API key Verify your API key is correct and active 403 Forbidden Insufficient permissions Ensure you have admin role 500 Server Error Something went wrong on our end Retry with exponential backoff
Integration examples
Python: Export conversations to CSV
Export all conversations to a CSV file for analysis:
import requests
import csv
import os
from datetime import datetime
WOLFIA_API_KEY = os.environ[ 'WOLFIA_API_KEY' ]
OUTPUT_FILE = 'wolfia_conversations.csv'
def export_conversations_to_csv ( output_file = OUTPUT_FILE ):
"""
Export all conversations from Wolfia to a CSV file.
Handles pagination automatically.
"""
all_conversations = []
page = 1
page_size = 100 # Maximum allowed
print ( "Fetching conversations from Wolfia API..." )
while True :
try :
response = requests.get(
'https://api.wolfia.com/insights/conversations' ,
headers = {
'X-API-Key' : WOLFIA_API_KEY ,
'Content-Type' : 'application/json'
},
params = {
'page' : page,
'page_size' : page_size
},
timeout = 30
)
if response.status_code != 200 :
print ( f "Error: HTTP { response.status_code } - { response.text } " )
break
data = response.json()
conversations = data[ 'data' ]
if not conversations:
break
all_conversations.extend(conversations)
print ( f "Fetched page { page } / { data[ 'total_pages' ] } ( { len (all_conversations) } / { data[ 'total' ] } conversations)" )
if page >= data[ 'total_pages' ]:
break
page += 1
except Exception as e:
print ( f "Error fetching conversations: { str (e) } " )
break
# Write to CSV
if all_conversations:
with open (output_file, 'w' , newline = '' , encoding = 'utf-8' ) as csvfile:
fieldnames = [
'id' , 'question' , 'answer' , 'user_name' , 'user_email' ,
'source_type' , 'answered_at' , 'answered_at_human'
]
writer = csv.DictWriter(csvfile, fieldnames = fieldnames)
writer.writeheader()
for conv in all_conversations:
# Convert microseconds to readable datetime
timestamp_seconds = conv[ 'answered_at' ] / 1_000_000
answered_at_human = datetime.fromtimestamp(timestamp_seconds).isoformat()
writer.writerow({
'id' : conv[ 'id' ],
'question' : conv[ 'question' ],
'answer' : conv[ 'answer' ],
'user_name' : conv.get( 'user_name' , '' ),
'user_email' : conv.get( 'user_email' , '' ),
'source_type' : conv[ 'source_type' ],
'answered_at' : conv[ 'answered_at' ],
'answered_at_human' : answered_at_human
})
print ( f " \n ✅ Exported { len (all_conversations) } conversations to { output_file } " )
else :
print ( " \n ⚠️ No conversations found" )
return all_conversations
if __name__ == '__main__' :
export_conversations_to_csv()
Python: Real-time conversation monitoring
Monitor new conversations in real-time and send alerts:
import requests
import os
import time
from datetime import datetime
WOLFIA_API_KEY = os.environ[ 'WOLFIA_API_KEY' ]
CHECK_INTERVAL_SECONDS = 300 # Check every 5 minutes
class ConversationMonitor :
def __init__ ( self ):
self .last_conversation_id = None
def fetch_recent_conversations ( self , page_size = 10 ):
"""Fetch the most recent conversations."""
try :
response = requests.get(
'https://api.wolfia.com/insights/conversations' ,
headers = {
'X-API-Key' : WOLFIA_API_KEY ,
'Content-Type' : 'application/json'
},
params = {
'page' : 1 ,
'page_size' : page_size
},
timeout = 30
)
if response.status_code == 200 :
return response.json()[ 'data' ]
else :
print ( f "Error fetching conversations: { response.status_code } " )
return []
except Exception as e:
print ( f "Error: { str (e) } " )
return []
def check_for_new_conversations ( self ):
"""Check for new conversations since last check."""
conversations = self .fetch_recent_conversations()
if not conversations:
return []
# First run - store the most recent ID
if self .last_conversation_id is None :
self .last_conversation_id = conversations[ 0 ][ 'id' ]
print ( f "Monitoring started. Last conversation: { self .last_conversation_id } " )
return []
# Find new conversations
new_conversations = []
for conv in conversations:
if conv[ 'id' ] == self .last_conversation_id:
break
new_conversations.append(conv)
if new_conversations:
self .last_conversation_id = new_conversations[ 0 ][ 'id' ]
return new_conversations
def process_new_conversation ( self , conversation ):
"""Process a new conversation (send alert, log, etc.)."""
timestamp_seconds = conversation[ 'answered_at' ] / 1_000_000
answered_at = datetime.fromtimestamp(timestamp_seconds).strftime( '%Y-%m- %d %H:%M:%S' )
print ( f " \n 🔔 New conversation ( { conversation[ 'source_type' ] } )" )
print ( f " User: { conversation[ 'user_name' ] } ( { conversation[ 'user_email' ] } )" )
print ( f " Time: { answered_at } " )
print ( f " Question: { conversation[ 'question' ][: 100 ] } ..." )
# Add your custom logic here:
# - Send Slack notification
# - Log to monitoring system
# - Trigger workflow
# - Store in database
def run ( self ):
"""Run the monitoring loop."""
print ( "Starting conversation monitor..." )
print ( f "Checking every { CHECK_INTERVAL_SECONDS } seconds" )
while True :
try :
new_conversations = self .check_for_new_conversations()
for conv in reversed (new_conversations): # Process oldest first
self .process_new_conversation(conv)
time.sleep( CHECK_INTERVAL_SECONDS )
except KeyboardInterrupt :
print ( " \n\n 🛑 Monitor stopped" )
break
except Exception as e:
print ( f " \n ❌ Error in monitoring loop: { str (e) } " )
time.sleep( CHECK_INTERVAL_SECONDS )
if __name__ == '__main__' :
monitor = ConversationMonitor()
monitor.run()
JavaScript: Filter by source type
Retrieve and filter conversations by source type:
const axios = require ( 'axios' );
const WOLFIA_API_KEY = process . env . WOLFIA_API_KEY ;
async function getConversationsBySourceType ( sourceType = 'web_ui' ) {
/**
* Fetch all conversations and filter by source type.
*
* @param {string} sourceType - Source type to filter: 'web_ui', 'slack_dm', 'browser_extension'
* @returns {Array} Filtered conversations
*/
const allConversations = [];
let page = 1 ;
const pageSize = 100 ;
console . log ( `Fetching ${ sourceType } conversations...` );
while ( true ) {
try {
const response = await axios . get (
'https://api.wolfia.com/insights/conversations' ,
{
headers: {
'X-API-Key' : WOLFIA_API_KEY ,
'Content-Type' : 'application/json'
},
params: {
page: page ,
page_size: pageSize
},
timeout: 30000
}
);
const { data , total_pages } = response . data ;
allConversations . push ( ... data );
console . log ( `Fetched page ${ page } / ${ total_pages } ` );
if ( page >= total_pages ) {
break ;
}
page ++ ;
} catch ( error ) {
console . error ( 'Error fetching conversations:' , error . message );
break ;
}
}
// Filter by source type
const filtered = allConversations . filter ( conv => conv . source_type === sourceType );
console . log ( ` \n 📊 Results:` );
console . log ( ` Total conversations: ${ allConversations . length } ` );
console . log ( ` ${ sourceType } conversations: ${ filtered . length } ` );
return filtered ;
}
async function analyzeConversationSources () {
/**
* Analyze conversation distribution across source types.
*/
const allConversations = [];
let page = 1 ;
const pageSize = 100 ;
console . log ( 'Fetching all conversations for analysis...' );
while ( true ) {
try {
const response = await axios . get (
'https://api.wolfia.com/insights/conversations' ,
{
headers: {
'X-API-Key' : WOLFIA_API_KEY ,
'Content-Type' : 'application/json'
},
params: {
page: page ,
page_size: pageSize
},
timeout: 30000
}
);
const { data , total_pages } = response . data ;
allConversations . push ( ... data );
console . log ( `Fetched page ${ page } / ${ total_pages } ` );
if ( page >= total_pages ) {
break ;
}
page ++ ;
} catch ( error ) {
console . error ( 'Error:' , error . message );
break ;
}
}
// Analyze by source type
const sourceStats = allConversations . reduce (( acc , conv ) => {
acc [ conv . source_type ] = ( acc [ conv . source_type ] || 0 ) + 1 ;
return acc ;
}, {});
// Analyze by user
const userStats = allConversations . reduce (( acc , conv ) => {
const email = conv . user_email || 'Unknown' ;
acc [ email ] = ( acc [ email ] || 0 ) + 1 ;
return acc ;
}, {});
console . log ( ' \n 📊 Conversation Analysis:' );
console . log ( ' \n By Source Type:' );
Object . entries ( sourceStats )
. sort (( a , b ) => b [ 1 ] - a [ 1 ])
. forEach (([ source , count ]) => {
const percentage = (( count / allConversations . length ) * 100 ). toFixed ( 1 );
console . log ( ` ${ source } : ${ count } ( ${ percentage } %)` );
});
console . log ( ' \n Top 10 Most Active Users:' );
Object . entries ( userStats )
. sort (( a , b ) => b [ 1 ] - a [ 1 ])
. slice ( 0 , 10 )
. forEach (([ email , count ]) => {
console . log ( ` ${ email } : ${ count } conversations` );
});
return { sourceStats , userStats };
}
// Example usage
( async () => {
try {
// Get only Slack DM conversations
const slackConversations = await getConversationsBySourceType ( 'slack_dm' );
// Analyze all conversations
await analyzeConversationSources ();
} catch ( error ) {
console . error ( 'Error:' , error . message );
process . exit ( 1 );
}
})();
Best practices
Fetch all conversations efficiently
Always use the maximum page size (100) to minimize API calls: def fetch_all_conversations ():
conversations = []
page = 1
while True :
response = requests.get(
'https://api.wolfia.com/insights/conversations' ,
headers = { 'X-API-Key' : API_KEY },
params = { 'page' : page, 'page_size' : 100 }
)
data = response.json()
conversations.extend(data[ 'data' ])
if page >= data[ 'total_pages' ]:
break
page += 1
return conversations
Handle large datasets with streaming
For very large conversation histories, process pages as you fetch them: def process_conversations_streaming ( processor_func ):
page = 1
while True :
response = requests.get(
'https://api.wolfia.com/insights/conversations' ,
headers = { 'X-API-Key' : API_KEY },
params = { 'page' : page, 'page_size' : 100 }
)
data = response.json()
# Process this page immediately
for conversation in data[ 'data' ]:
processor_func(conversation)
if page >= data[ 'total_pages' ]:
break
page += 1
# Usage
def export_to_db ( conversation ):
# Insert into database
pass
process_conversations_streaming(export_to_db)
Cache results for frequently accessed data
Cache conversation data to reduce API calls: import pickle
from datetime import datetime, timedelta
CACHE_FILE = 'conversations_cache.pkl'
CACHE_DURATION = timedelta( hours = 1 )
def get_conversations_cached ():
try :
with open ( CACHE_FILE , 'rb' ) as f:
cache = pickle.load(f)
if datetime.now() - cache[ 'timestamp' ] < CACHE_DURATION :
return cache[ 'data' ]
except FileNotFoundError :
pass
# Fetch fresh data
conversations = fetch_all_conversations()
# Save to cache
with open ( CACHE_FILE , 'wb' ) as f:
pickle.dump({
'timestamp' : datetime.now(),
'data' : conversations
}, f)
return conversations
Timestamp handling
Conversation timestamps use microseconds since Unix epoch. Here’s how to work with them:
from datetime import datetime
# Convert from microseconds to datetime
answered_at_us = 1731628800000000
answered_at_datetime = datetime.fromtimestamp(answered_at_us / 1_000_000 )
print (answered_at_datetime) # 2024-11-14 12:00:00
# Convert datetime to microseconds
dt = datetime.now()
timestamp_us = int (dt.timestamp() * 1_000_000 )
// Convert from microseconds to JavaScript Date
const answeredAtUs = 1731628800000000 ;
const answeredAtDate = new Date ( answeredAtUs / 1000 ); // Divide by 1000 for milliseconds
console . log ( answeredAtDate . toISOString ());
// Convert Date to microseconds
const now = new Date ();
const timestampUs = now . getTime () * 1000 ;
Error handling and retry logic
Implement exponential backoff
Retry failed requests with increasing delays: import time
from typing import Optional
def fetch_with_retry ( page : int , max_retries : int = 3 ) -> Optional[ dict ]:
for attempt in range (max_retries):
try :
response = requests.get(
'https://api.wolfia.com/insights/conversations' ,
headers = { 'X-API-Key' : API_KEY },
params = { 'page' : page, 'page_size' : 100 },
timeout = 30
)
if response.status_code == 200 :
return response.json()
if response.status_code >= 500 :
# Server error - retry
if attempt < max_retries - 1 :
wait_time = 2 ** attempt # 1s, 2s, 4s
time.sleep(wait_time)
continue
else :
# Client error - don't retry
print ( f "Client error: { response.status_code } - { response.text } " )
return None
except requests.exceptions.Timeout:
if attempt < max_retries - 1 :
wait_time = 2 ** attempt
time.sleep(wait_time)
continue
except Exception as e:
print ( f "Error: { str (e) } " )
return None
return None
Maintain detailed logs of API interactions: import logging
logging.basicConfig(
filename = 'wolfia_insights.log' ,
level = logging. INFO ,
format = ' %(asctime)s - %(levelname)s - %(message)s '
)
def fetch_conversations_with_logging ():
logging.info( "Starting conversation fetch" )
try :
response = requests.get(
'https://api.wolfia.com/insights/conversations' ,
headers = { 'X-API-Key' : API_KEY },
params = { 'page' : 1 , 'page_size' : 100 }
)
if response.status_code == 200 :
logging.info( f "Successfully fetched page 1" )
return response.json()
else :
logging.error(
f "API error: status= { response.status_code } , "
f "response= { response.text } "
)
except Exception as e:
logging.exception( f "Exception during fetch: { str (e) } " )
return None
Data privacy and compliance
When working with conversation data:
PII handling: Conversation data contains personally identifiable information (user emails, names). Handle according to your privacy policies.
Data retention: Implement appropriate data retention policies for exported conversation data.
Access control: Restrict access to conversation insights to authorized personnel only.
Audit logging: Log all API access and data exports for compliance auditing.
Use cases
Analytics and reporting
Build custom dashboards to track:
Conversation volume trends over time
Most active users and teams
Source type distribution (web UI vs Slack vs agents)
Peak usage hours and days
Quality assurance
Monitor AI performance by:
Reviewing conversation quality across different source types
Identifying common user questions
Analyzing response patterns
Tracking user satisfaction
Compliance and auditing
Export conversation history for:
Regulatory compliance requirements
Security audits
Legal discovery
Data governance
Integration monitoring
Track integration health:
Monitor Slack integration usage
Verify agent conversation flows
Analyze cross-channel performance
Identify integration issues
Rate limits and quotas
The conversation insights API follows standard Wolfia API rate limits:
Default rate limit: 100 requests per minute per API key
Burst allowance: Short bursts above the limit are allowed
Quota reset: Rate limit window resets every minute
If you hit rate limits:
Implement exponential backoff
Spread requests over time
Cache results when appropriate
Contact support for higher limits if needed
Getting help
If you encounter issues with the conversation insights API:
Check authentication: Verify your API key is active in API settings
Review pagination: Ensure page and page_size parameters are valid
Validate permissions: Confirm you have admin role for API access
Check timestamps: Verify you’re handling microsecond timestamps correctly
Contact support: Email support@wolfia.com with your API key ID (never share the full key)
Next steps