Skip to main content

Overview

The conversation insights API enables you to retrieve and analyze all conversations happening across your Wolfia workspace. This includes interactions from the web UI, Slack integration, agent conversations, and more. Use this endpoint to build custom analytics dashboards, monitor AI performance, track user engagement, or export conversation data for compliance and auditing.

The conversation insights endpoint

URL: GET https://api.wolfia.com/insights/conversations Authentication: API key required (see API overview for setup)

Request format

curl -X GET 'https://api.wolfia.com/insights/conversations?page=1&page_size=10' \
  -H "X-API-Key: wolfia-api-YOUR_KEY_HERE" \
  -H "Content-Type: application/json"

Query parameters

ParameterTypeRequiredDefaultDescription
pageintegerNo1Page number for pagination (must be > 0)
page_sizeintegerNo10Number of results per page (1-100)

Understanding source types

Conversations in Wolfia come from multiple sources, each tracked and returned by the API:

web_ui

Conversations where users interact with Wolfia through the web application interface.

slack_dm

Conversations from direct messages with the Wolfia bot in Slack.

browser_extension

Conversations initiated through the Wolfia browser extension on vendor portals and web forms.
Source type filtering: The current API returns all source types. Contact support if you need source-specific filtering for your use case.

Response format

Success response (200 OK)

{
  "data": [
    {
      "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
      "question": "What are our security policies for data encryption?",
      "answer": "Your organization requires AES-256 encryption for data at rest and TLS 1.3 for data in transit. All encryption keys are managed through AWS KMS with automatic rotation every 90 days.",
      "user_name": "Jane Smith",
      "user_email": "jane.smith@company.com",
      "user_profile_image": "https://example.com/avatar.jpg",
      "source_type": "web_ui",
      "answered_at": 1731628800000000
    },
    {
      "id": "b2c3d4e5-f6a7-8901-bcde-f12345678901",
      "question": "When is the next security audit?",
      "answer": "The next SOC 2 Type II audit is scheduled for Q1 2025, beginning January 15th.",
      "user_name": "John Doe",
      "user_email": "john.doe@company.com",
      "user_profile_image": "https://example.com/avatar2.jpg",
      "source_type": "slack_dm",
      "answered_at": 1731542400000000
    }
  ],
  "total": 247,
  "page": 1,
  "page_size": 10,
  "total_pages": 25
}

Response fields

FieldTypeDescription
dataarrayArray of conversation objects
data[].idstring (UUID)Unique identifier for the conversation
data[].questionstringUser’s question (cleaned of markdown/citations)
data[].answerstringAI-generated answer (cleaned of markdown/citations)
data[].user_namestring | nullName of the user who asked the question
data[].user_emailstring | nullEmail of the user who asked the question
data[].user_profile_imagestring | nullProfile image URL of the user
data[].source_typestringSource of the conversation (web_ui, slack_dm, browser_extension)
data[].answered_atintegerTimestamp in microseconds when answered
totalintegerTotal number of conversations across all pages
pageintegerCurrent page number
page_sizeintegerNumber of results per page
total_pagesintegerTotal number of pages available
Timestamps: The answered_at field uses microseconds since Unix epoch. Divide by 1,000,000 to convert to seconds for most programming languages.

Error responses

Status CodeErrorWhat it meansHow to fix
400Bad RequestInvalid query parameters (e.g., page_size > 100)Check your pagination parameters
401UnauthorizedInvalid or expired API keyVerify your API key is correct and active
403ForbiddenInsufficient permissionsEnsure you have admin role
500Server ErrorSomething went wrong on our endRetry with exponential backoff

Integration examples

Python: Export conversations to CSV

Export all conversations to a CSV file for analysis:
import requests
import csv
import os
from datetime import datetime

WOLFIA_API_KEY = os.environ['WOLFIA_API_KEY']
OUTPUT_FILE = 'wolfia_conversations.csv'

def export_conversations_to_csv(output_file=OUTPUT_FILE):
    """
    Export all conversations from Wolfia to a CSV file.
    Handles pagination automatically.
    """
    all_conversations = []
    page = 1
    page_size = 100  # Maximum allowed

    print("Fetching conversations from Wolfia API...")

    while True:
        try:
            response = requests.get(
                'https://api.wolfia.com/insights/conversations',
                headers={
                    'X-API-Key': WOLFIA_API_KEY,
                    'Content-Type': 'application/json'
                },
                params={
                    'page': page,
                    'page_size': page_size
                },
                timeout=30
            )

            if response.status_code != 200:
                print(f"Error: HTTP {response.status_code} - {response.text}")
                break

            data = response.json()
            conversations = data['data']

            if not conversations:
                break

            all_conversations.extend(conversations)
            print(f"Fetched page {page}/{data['total_pages']} ({len(all_conversations)}/{data['total']} conversations)")

            if page >= data['total_pages']:
                break

            page += 1

        except Exception as e:
            print(f"Error fetching conversations: {str(e)}")
            break

    # Write to CSV
    if all_conversations:
        with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
            fieldnames = [
                'id', 'question', 'answer', 'user_name', 'user_email',
                'source_type', 'answered_at', 'answered_at_human'
            ]
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()

            for conv in all_conversations:
                # Convert microseconds to readable datetime
                timestamp_seconds = conv['answered_at'] / 1_000_000
                answered_at_human = datetime.fromtimestamp(timestamp_seconds).isoformat()

                writer.writerow({
                    'id': conv['id'],
                    'question': conv['question'],
                    'answer': conv['answer'],
                    'user_name': conv.get('user_name', ''),
                    'user_email': conv.get('user_email', ''),
                    'source_type': conv['source_type'],
                    'answered_at': conv['answered_at'],
                    'answered_at_human': answered_at_human
                })

        print(f"\n✅ Exported {len(all_conversations)} conversations to {output_file}")
    else:
        print("\n⚠️  No conversations found")

    return all_conversations

if __name__ == '__main__':
    export_conversations_to_csv()

Python: Real-time conversation monitoring

Monitor new conversations in real-time and send alerts:
import requests
import os
import time
from datetime import datetime

WOLFIA_API_KEY = os.environ['WOLFIA_API_KEY']
CHECK_INTERVAL_SECONDS = 300  # Check every 5 minutes

class ConversationMonitor:
    def __init__(self):
        self.last_conversation_id = None

    def fetch_recent_conversations(self, page_size=10):
        """Fetch the most recent conversations."""
        try:
            response = requests.get(
                'https://api.wolfia.com/insights/conversations',
                headers={
                    'X-API-Key': WOLFIA_API_KEY,
                    'Content-Type': 'application/json'
                },
                params={
                    'page': 1,
                    'page_size': page_size
                },
                timeout=30
            )

            if response.status_code == 200:
                return response.json()['data']
            else:
                print(f"Error fetching conversations: {response.status_code}")
                return []

        except Exception as e:
            print(f"Error: {str(e)}")
            return []

    def check_for_new_conversations(self):
        """Check for new conversations since last check."""
        conversations = self.fetch_recent_conversations()

        if not conversations:
            return []

        # First run - store the most recent ID
        if self.last_conversation_id is None:
            self.last_conversation_id = conversations[0]['id']
            print(f"Monitoring started. Last conversation: {self.last_conversation_id}")
            return []

        # Find new conversations
        new_conversations = []
        for conv in conversations:
            if conv['id'] == self.last_conversation_id:
                break
            new_conversations.append(conv)

        if new_conversations:
            self.last_conversation_id = new_conversations[0]['id']

        return new_conversations

    def process_new_conversation(self, conversation):
        """Process a new conversation (send alert, log, etc.)."""
        timestamp_seconds = conversation['answered_at'] / 1_000_000
        answered_at = datetime.fromtimestamp(timestamp_seconds).strftime('%Y-%m-%d %H:%M:%S')

        print(f"\n🔔 New conversation ({conversation['source_type']})")
        print(f"   User: {conversation['user_name']} ({conversation['user_email']})")
        print(f"   Time: {answered_at}")
        print(f"   Question: {conversation['question'][:100]}...")

        # Add your custom logic here:
        # - Send Slack notification
        # - Log to monitoring system
        # - Trigger workflow
        # - Store in database

    def run(self):
        """Run the monitoring loop."""
        print("Starting conversation monitor...")
        print(f"Checking every {CHECK_INTERVAL_SECONDS} seconds")

        while True:
            try:
                new_conversations = self.check_for_new_conversations()

                for conv in reversed(new_conversations):  # Process oldest first
                    self.process_new_conversation(conv)

                time.sleep(CHECK_INTERVAL_SECONDS)

            except KeyboardInterrupt:
                print("\n\n🛑 Monitor stopped")
                break
            except Exception as e:
                print(f"\n❌ Error in monitoring loop: {str(e)}")
                time.sleep(CHECK_INTERVAL_SECONDS)

if __name__ == '__main__':
    monitor = ConversationMonitor()
    monitor.run()

JavaScript: Filter by source type

Retrieve and filter conversations by source type:
const axios = require('axios');

const WOLFIA_API_KEY = process.env.WOLFIA_API_KEY;

async function getConversationsBySourceType(sourceType = 'web_ui') {
  /**
   * Fetch all conversations and filter by source type.
   *
   * @param {string} sourceType - Source type to filter: 'web_ui', 'slack_dm', 'browser_extension'
   * @returns {Array} Filtered conversations
   */
  const allConversations = [];
  let page = 1;
  const pageSize = 100;

  console.log(`Fetching ${sourceType} conversations...`);

  while (true) {
    try {
      const response = await axios.get(
        'https://api.wolfia.com/insights/conversations',
        {
          headers: {
            'X-API-Key': WOLFIA_API_KEY,
            'Content-Type': 'application/json'
          },
          params: {
            page: page,
            page_size: pageSize
          },
          timeout: 30000
        }
      );

      const { data, total_pages } = response.data;
      allConversations.push(...data);

      console.log(`Fetched page ${page}/${total_pages}`);

      if (page >= total_pages) {
        break;
      }

      page++;

    } catch (error) {
      console.error('Error fetching conversations:', error.message);
      break;
    }
  }

  // Filter by source type
  const filtered = allConversations.filter(conv => conv.source_type === sourceType);

  console.log(`\n📊 Results:`);
  console.log(`   Total conversations: ${allConversations.length}`);
  console.log(`   ${sourceType} conversations: ${filtered.length}`);

  return filtered;
}

async function analyzeConversationSources() {
  /**
   * Analyze conversation distribution across source types.
   */
  const allConversations = [];
  let page = 1;
  const pageSize = 100;

  console.log('Fetching all conversations for analysis...');

  while (true) {
    try {
      const response = await axios.get(
        'https://api.wolfia.com/insights/conversations',
        {
          headers: {
            'X-API-Key': WOLFIA_API_KEY,
            'Content-Type': 'application/json'
          },
          params: {
            page: page,
            page_size: pageSize
          },
          timeout: 30000
        }
      );

      const { data, total_pages } = response.data;
      allConversations.push(...data);

      console.log(`Fetched page ${page}/${total_pages}`);

      if (page >= total_pages) {
        break;
      }

      page++;

    } catch (error) {
      console.error('Error:', error.message);
      break;
    }
  }

  // Analyze by source type
  const sourceStats = allConversations.reduce((acc, conv) => {
    acc[conv.source_type] = (acc[conv.source_type] || 0) + 1;
    return acc;
  }, {});

  // Analyze by user
  const userStats = allConversations.reduce((acc, conv) => {
    const email = conv.user_email || 'Unknown';
    acc[email] = (acc[email] || 0) + 1;
    return acc;
  }, {});

  console.log('\n📊 Conversation Analysis:');
  console.log('\nBy Source Type:');
  Object.entries(sourceStats)
    .sort((a, b) => b[1] - a[1])
    .forEach(([source, count]) => {
      const percentage = ((count / allConversations.length) * 100).toFixed(1);
      console.log(`   ${source}: ${count} (${percentage}%)`);
    });

  console.log('\nTop 10 Most Active Users:');
  Object.entries(userStats)
    .sort((a, b) => b[1] - a[1])
    .slice(0, 10)
    .forEach(([email, count]) => {
      console.log(`   ${email}: ${count} conversations`);
    });

  return { sourceStats, userStats };
}

// Example usage
(async () => {
  try {
    // Get only Slack DM conversations
    const slackConversations = await getConversationsBySourceType('slack_dm');

    // Analyze all conversations
    await analyzeConversationSources();

  } catch (error) {
    console.error('Error:', error.message);
    process.exit(1);
  }
})();

Best practices

Pagination strategies

Always use the maximum page size (100) to minimize API calls:
def fetch_all_conversations():
    conversations = []
    page = 1

    while True:
        response = requests.get(
            'https://api.wolfia.com/insights/conversations',
            headers={'X-API-Key': API_KEY},
            params={'page': page, 'page_size': 100}
        )

        data = response.json()
        conversations.extend(data['data'])

        if page >= data['total_pages']:
            break

        page += 1

    return conversations
For very large conversation histories, process pages as you fetch them:
def process_conversations_streaming(processor_func):
    page = 1

    while True:
        response = requests.get(
            'https://api.wolfia.com/insights/conversations',
            headers={'X-API-Key': API_KEY},
            params={'page': page, 'page_size': 100}
        )

        data = response.json()

        # Process this page immediately
        for conversation in data['data']:
            processor_func(conversation)

        if page >= data['total_pages']:
            break

        page += 1

# Usage
def export_to_db(conversation):
    # Insert into database
    pass

process_conversations_streaming(export_to_db)
Cache conversation data to reduce API calls:
import pickle
from datetime import datetime, timedelta

CACHE_FILE = 'conversations_cache.pkl'
CACHE_DURATION = timedelta(hours=1)

def get_conversations_cached():
    try:
        with open(CACHE_FILE, 'rb') as f:
            cache = pickle.load(f)
            if datetime.now() - cache['timestamp'] < CACHE_DURATION:
                return cache['data']
    except FileNotFoundError:
        pass

    # Fetch fresh data
    conversations = fetch_all_conversations()

    # Save to cache
    with open(CACHE_FILE, 'wb') as f:
        pickle.dump({
            'timestamp': datetime.now(),
            'data': conversations
        }, f)

    return conversations

Timestamp handling

Conversation timestamps use microseconds since Unix epoch. Here’s how to work with them:
from datetime import datetime

# Convert from microseconds to datetime
answered_at_us = 1731628800000000
answered_at_datetime = datetime.fromtimestamp(answered_at_us / 1_000_000)
print(answered_at_datetime)  # 2024-11-14 12:00:00

# Convert datetime to microseconds
dt = datetime.now()
timestamp_us = int(dt.timestamp() * 1_000_000)
// Convert from microseconds to JavaScript Date
const answeredAtUs = 1731628800000000;
const answeredAtDate = new Date(answeredAtUs / 1000); // Divide by 1000 for milliseconds
console.log(answeredAtDate.toISOString());

// Convert Date to microseconds
const now = new Date();
const timestampUs = now.getTime() * 1000;

Error handling and retry logic

Retry failed requests with increasing delays:
import time
from typing import Optional

def fetch_with_retry(page: int, max_retries: int = 3) -> Optional[dict]:
    for attempt in range(max_retries):
        try:
            response = requests.get(
                'https://api.wolfia.com/insights/conversations',
                headers={'X-API-Key': API_KEY},
                params={'page': page, 'page_size': 100},
                timeout=30
            )

            if response.status_code == 200:
                return response.json()

            if response.status_code >= 500:
                # Server error - retry
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt  # 1s, 2s, 4s
                    time.sleep(wait_time)
                    continue
            else:
                # Client error - don't retry
                print(f"Client error: {response.status_code} - {response.text}")
                return None

        except requests.exceptions.Timeout:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                time.sleep(wait_time)
                continue
        except Exception as e:
            print(f"Error: {str(e)}")
            return None

    return None
Maintain detailed logs of API interactions:
import logging

logging.basicConfig(
    filename='wolfia_insights.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def fetch_conversations_with_logging():
    logging.info("Starting conversation fetch")

    try:
        response = requests.get(
            'https://api.wolfia.com/insights/conversations',
            headers={'X-API-Key': API_KEY},
            params={'page': 1, 'page_size': 100}
        )

        if response.status_code == 200:
            logging.info(f"Successfully fetched page 1")
            return response.json()
        else:
            logging.error(
                f"API error: status={response.status_code}, "
                f"response={response.text}"
            )

    except Exception as e:
        logging.exception(f"Exception during fetch: {str(e)}")

    return None

Data privacy and compliance

When working with conversation data:
  • PII handling: Conversation data contains personally identifiable information (user emails, names). Handle according to your privacy policies.
  • Data retention: Implement appropriate data retention policies for exported conversation data.
  • Access control: Restrict access to conversation insights to authorized personnel only.
  • Audit logging: Log all API access and data exports for compliance auditing.

Use cases

Analytics and reporting

Build custom dashboards to track:
  • Conversation volume trends over time
  • Most active users and teams
  • Source type distribution (web UI vs Slack vs agents)
  • Peak usage hours and days

Quality assurance

Monitor AI performance by:
  • Reviewing conversation quality across different source types
  • Identifying common user questions
  • Analyzing response patterns
  • Tracking user satisfaction

Compliance and auditing

Export conversation history for:
  • Regulatory compliance requirements
  • Security audits
  • Legal discovery
  • Data governance

Integration monitoring

Track integration health:
  • Monitor Slack integration usage
  • Verify agent conversation flows
  • Analyze cross-channel performance
  • Identify integration issues

Rate limits and quotas

The conversation insights API follows standard Wolfia API rate limits:
  • Default rate limit: 100 requests per minute per API key
  • Burst allowance: Short bursts above the limit are allowed
  • Quota reset: Rate limit window resets every minute
If you hit rate limits:
  • Implement exponential backoff
  • Spread requests over time
  • Cache results when appropriate
  • Contact support for higher limits if needed

Getting help

If you encounter issues with the conversation insights API:
  • Check authentication: Verify your API key is active in API settings
  • Review pagination: Ensure page and page_size parameters are valid
  • Validate permissions: Confirm you have admin role for API access
  • Check timestamps: Verify you’re handling microsecond timestamps correctly
  • Contact support: Email support@wolfia.com with your API key ID (never share the full key)

Next steps