> ## Documentation Index
> Fetch the complete documentation index at: https://docs.wolfia.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Conversation Insights API: Wolfia Analytics

> Retrieve and analyze all conversations across web, Slack, and AI agent sessions via the Wolfia API, for custom dashboards, monitoring, and compliance auditing.

## Overview

The conversation insights API enables you to retrieve and analyze all conversations happening across your Wolfia workspace. This includes interactions from the web UI, Slack, agent conversations, and more. Use this endpoint to build custom analytics dashboards, monitor AI performance, track user engagement, or export conversation data for compliance and auditing.

## The conversation insights endpoint

**URL:** `GET https://api.wolfia.com/v1/insights/conversations`

**Authentication:** API key required (see [API overview](/how-to/api-overview) for setup)

### Request format

```bash theme={null}
curl -X GET 'https://api.wolfia.com/v1/insights/conversations?page=1&page_size=10' \
  -H "X-API-Key: wolfia-api-YOUR_KEY_HERE" \
  -H "Content-Type: application/json"
```

### Query parameters

| Parameter   | Type    | Required | Default | Description                              |
| ----------- | ------- | -------- | ------- | ---------------------------------------- |
| `page`      | integer | No       | 1       | Page number for pagination (must be > 0) |
| `page_size` | integer | No       | 10      | Number of results per page (1-100)       |

## Understanding source types

Conversations in Wolfia come from multiple sources, each tracked and returned by the API:

### web\_ui

Conversations where users interact with Wolfia through the web application interface.

### slack\_dm

Conversations from direct messages with the Wolfia bot in Slack.

### browser\_extension

Conversations initiated through the Wolfia browser extension on vendor portals and web forms.

<Note>
  **Source type filtering:** The current API returns all source types. Contact support if you need source-specific filtering for your use case.
</Note>

## Response format

### Success response (200 OK)

```json theme={null}
{
  "data": [
    {
      "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
      "question": "What are our security policies for data encryption?",
      "answer": "Your organization requires AES-256 encryption for data at rest and TLS 1.3 for data in transit. All encryption keys are managed through AWS KMS with automatic rotation every 90 days.",
      "user_name": "Jane Smith",
      "user_email": "jane.smith@company.com",
      "user_profile_image": "https://example.com/avatar.jpg",
      "source_type": "web_ui",
      "answered_at": 1731628800000000
    },
    {
      "id": "b2c3d4e5-f6a7-8901-bcde-f12345678901",
      "question": "When is the next security audit?",
      "answer": "The next SOC 2 Type II audit is scheduled for Q1 2025, beginning January 15th.",
      "user_name": "John Doe",
      "user_email": "john.doe@company.com",
      "user_profile_image": "https://example.com/avatar2.jpg",
      "source_type": "slack_dm",
      "answered_at": 1731542400000000
    }
  ],
  "total": 247,
  "page": 1,
  "page_size": 10,
  "total_pages": 25
}
```

### Response fields

| Field                       | Type           | Description                                                         |
| --------------------------- | -------------- | ------------------------------------------------------------------- |
| `data`                      | array          | Array of conversation objects                                       |
| `data[].id`                 | string (UUID)  | Unique identifier for the conversation                              |
| `data[].question`           | string         | User's question (cleaned of markdown/citations)                     |
| `data[].answer`             | string         | AI-generated answer (cleaned of markdown/citations)                 |
| `data[].user_name`          | string \| null | Name of the user who asked the question                             |
| `data[].user_email`         | string \| null | Email of the user who asked the question                            |
| `data[].user_profile_image` | string \| null | Profile image URL of the user                                       |
| `data[].source_type`        | string         | Source of the conversation (web\_ui, slack\_dm, browser\_extension) |
| `data[].answered_at`        | integer        | Timestamp in microseconds when answered                             |
| `total`                     | integer        | Total number of conversations across all pages                      |
| `page`                      | integer        | Current page number                                                 |
| `page_size`                 | integer        | Number of results per page                                          |
| `total_pages`               | integer        | Total number of pages available                                     |

<Note>
  **Timestamps:** The `answered_at` field uses microseconds since Unix epoch. Divide by 1,000,000 to convert to seconds for most programming languages.
</Note>

### Error responses

| Status Code | Error        | What it means                                     | How to fix                                |
| ----------- | ------------ | ------------------------------------------------- | ----------------------------------------- |
| 400         | Bad Request  | Invalid query parameters (e.g., page\_size > 100) | Check your pagination parameters          |
| 401         | Unauthorized | Invalid or expired API key                        | Verify your API key is correct and active |
| 403         | Forbidden    | Insufficient permissions                          | Ensure you have admin role                |
| 500         | Server Error | Something went wrong on our end                   | Retry with exponential backoff            |

## Integration examples

### Python: Export conversations to CSV

Export all conversations to a CSV file for analysis:

```python theme={null}
import requests
import csv
import os
from datetime import datetime

WOLFIA_API_KEY = os.environ['WOLFIA_API_KEY']
OUTPUT_FILE = 'wolfia_conversations.csv'

def export_conversations_to_csv(output_file=OUTPUT_FILE):
    """
    Export all conversations from Wolfia to a CSV file.
    Handles pagination automatically.
    """
    all_conversations = []
    page = 1
    page_size = 100  # Maximum allowed

    print("Fetching conversations from Wolfia API...")

    while True:
        try:
            response = requests.get(
                'https://api.wolfia.com/v1/insights/conversations',
                headers={
                    'X-API-Key': WOLFIA_API_KEY,
                    'Content-Type': 'application/json'
                },
                params={
                    'page': page,
                    'page_size': page_size
                },
                timeout=30
            )

            if response.status_code != 200:
                print(f"Error: HTTP {response.status_code} - {response.text}")
                break

            data = response.json()
            conversations = data['data']

            if not conversations:
                break

            all_conversations.extend(conversations)
            print(f"Fetched page {page}/{data['total_pages']} ({len(all_conversations)}/{data['total']} conversations)")

            if page >= data['total_pages']:
                break

            page += 1

        except Exception as e:
            print(f"Error fetching conversations: {str(e)}")
            break

    # Write to CSV
    if all_conversations:
        with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
            fieldnames = [
                'id', 'question', 'answer', 'user_name', 'user_email',
                'source_type', 'answered_at', 'answered_at_human'
            ]
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()

            for conv in all_conversations:
                # Convert microseconds to readable datetime
                timestamp_seconds = conv['answered_at'] / 1_000_000
                answered_at_human = datetime.fromtimestamp(timestamp_seconds).isoformat()

                writer.writerow({
                    'id': conv['id'],
                    'question': conv['question'],
                    'answer': conv['answer'],
                    'user_name': conv.get('user_name', ''),
                    'user_email': conv.get('user_email', ''),
                    'source_type': conv['source_type'],
                    'answered_at': conv['answered_at'],
                    'answered_at_human': answered_at_human
                })

        print(f"\n✅ Exported {len(all_conversations)} conversations to {output_file}")
    else:
        print("\n⚠️  No conversations found")

    return all_conversations

if __name__ == '__main__':
    export_conversations_to_csv()
```

### Python: Real-time conversation monitoring

Monitor new conversations in real-time and send alerts:

```python theme={null}
import requests
import os
import time
from datetime import datetime

WOLFIA_API_KEY = os.environ['WOLFIA_API_KEY']
CHECK_INTERVAL_SECONDS = 300  # Check every 5 minutes

class ConversationMonitor:
    def __init__(self):
        self.last_conversation_id = None

    def fetch_recent_conversations(self, page_size=10):
        """Fetch the most recent conversations."""
        try:
            response = requests.get(
                'https://api.wolfia.com/v1/insights/conversations',
                headers={
                    'X-API-Key': WOLFIA_API_KEY,
                    'Content-Type': 'application/json'
                },
                params={
                    'page': 1,
                    'page_size': page_size
                },
                timeout=30
            )

            if response.status_code == 200:
                return response.json()['data']
            else:
                print(f"Error fetching conversations: {response.status_code}")
                return []

        except Exception as e:
            print(f"Error: {str(e)}")
            return []

    def check_for_new_conversations(self):
        """Check for new conversations since last check."""
        conversations = self.fetch_recent_conversations()

        if not conversations:
            return []

        # First run - store the most recent ID
        if self.last_conversation_id is None:
            self.last_conversation_id = conversations[0]['id']
            print(f"Monitoring started. Last conversation: {self.last_conversation_id}")
            return []

        # Find new conversations
        new_conversations = []
        for conv in conversations:
            if conv['id'] == self.last_conversation_id:
                break
            new_conversations.append(conv)

        if new_conversations:
            self.last_conversation_id = new_conversations[0]['id']

        return new_conversations

    def process_new_conversation(self, conversation):
        """Process a new conversation (send alert, log, etc.)."""
        timestamp_seconds = conversation['answered_at'] / 1_000_000
        answered_at = datetime.fromtimestamp(timestamp_seconds).strftime('%Y-%m-%d %H:%M:%S')

        print(f"\n🔔 New conversation ({conversation['source_type']})")
        print(f"   User: {conversation['user_name']} ({conversation['user_email']})")
        print(f"   Time: {answered_at}")
        print(f"   Question: {conversation['question'][:100]}...")

        # Add your custom logic here:
        # - Send Slack notification
        # - Log to monitoring system
        # - Trigger workflow
        # - Store in database

    def run(self):
        """Run the monitoring loop."""
        print("Starting conversation monitor...")
        print(f"Checking every {CHECK_INTERVAL_SECONDS} seconds")

        while True:
            try:
                new_conversations = self.check_for_new_conversations()

                for conv in reversed(new_conversations):  # Process oldest first
                    self.process_new_conversation(conv)

                time.sleep(CHECK_INTERVAL_SECONDS)

            except KeyboardInterrupt:
                print("\n\n🛑 Monitor stopped")
                break
            except Exception as e:
                print(f"\n❌ Error in monitoring loop: {str(e)}")
                time.sleep(CHECK_INTERVAL_SECONDS)

if __name__ == '__main__':
    monitor = ConversationMonitor()
    monitor.run()
```

### JavaScript: Filter by source type

Retrieve and filter conversations by source type:

```javascript theme={null}
const axios = require('axios');

const WOLFIA_API_KEY = process.env.WOLFIA_API_KEY;

async function getConversationsBySourceType(sourceType = 'web_ui') {
  /**
   * Fetch all conversations and filter by source type.
   *
   * @param {string} sourceType - Source type to filter: 'web_ui', 'slack_dm', 'browser_extension'
   * @returns {Array} Filtered conversations
   */
  const allConversations = [];
  let page = 1;
  const pageSize = 100;

  console.log(`Fetching ${sourceType} conversations...`);

  while (true) {
    try {
      const response = await axios.get(
        'https://api.wolfia.com/v1/insights/conversations',
        {
          headers: {
            'X-API-Key': WOLFIA_API_KEY,
            'Content-Type': 'application/json'
          },
          params: {
            page: page,
            page_size: pageSize
          },
          timeout: 30000
        }
      );

      const { data, total_pages } = response.data;
      allConversations.push(...data);

      console.log(`Fetched page ${page}/${total_pages}`);

      if (page >= total_pages) {
        break;
      }

      page++;

    } catch (error) {
      console.error('Error fetching conversations:', error.message);
      break;
    }
  }

  // Filter by source type
  const filtered = allConversations.filter(conv => conv.source_type === sourceType);

  console.log(`\n📊 Results:`);
  console.log(`   Total conversations: ${allConversations.length}`);
  console.log(`   ${sourceType} conversations: ${filtered.length}`);

  return filtered;
}

async function analyzeConversationSources() {
  /**
   * Analyze conversation distribution across source types.
   */
  const allConversations = [];
  let page = 1;
  const pageSize = 100;

  console.log('Fetching all conversations for analysis...');

  while (true) {
    try {
      const response = await axios.get(
        'https://api.wolfia.com/v1/insights/conversations',
        {
          headers: {
            'X-API-Key': WOLFIA_API_KEY,
            'Content-Type': 'application/json'
          },
          params: {
            page: page,
            page_size: pageSize
          },
          timeout: 30000
        }
      );

      const { data, total_pages } = response.data;
      allConversations.push(...data);

      console.log(`Fetched page ${page}/${total_pages}`);

      if (page >= total_pages) {
        break;
      }

      page++;

    } catch (error) {
      console.error('Error:', error.message);
      break;
    }
  }

  // Analyze by source type
  const sourceStats = allConversations.reduce((acc, conv) => {
    acc[conv.source_type] = (acc[conv.source_type] || 0) + 1;
    return acc;
  }, {});

  // Analyze by user
  const userStats = allConversations.reduce((acc, conv) => {
    const email = conv.user_email || 'Unknown';
    acc[email] = (acc[email] || 0) + 1;
    return acc;
  }, {});

  console.log('\n📊 Conversation Analysis:');
  console.log('\nBy Source Type:');
  Object.entries(sourceStats)
    .sort((a, b) => b[1] - a[1])
    .forEach(([source, count]) => {
      const percentage = ((count / allConversations.length) * 100).toFixed(1);
      console.log(`   ${source}: ${count} (${percentage}%)`);
    });

  console.log('\nTop 10 Most Active Users:');
  Object.entries(userStats)
    .sort((a, b) => b[1] - a[1])
    .slice(0, 10)
    .forEach(([email, count]) => {
      console.log(`   ${email}: ${count} conversations`);
    });

  return { sourceStats, userStats };
}

// Example usage
(async () => {
  try {
    // Get only Slack DM conversations
    const slackConversations = await getConversationsBySourceType('slack_dm');

    // Analyze all conversations
    await analyzeConversationSources();

  } catch (error) {
    console.error('Error:', error.message);
    process.exit(1);
  }
})();
```

## Best practices

### Pagination strategies

<AccordionGroup>
  <Accordion title="Fetch all conversations efficiently">
    Always use the maximum page size (100) to minimize API calls:

    ```python theme={null}
    def fetch_all_conversations():
        conversations = []
        page = 1

        while True:
            response = requests.get(
                'https://api.wolfia.com/v1/insights/conversations',
                headers={'X-API-Key': API_KEY},
                params={'page': page, 'page_size': 100}
            )

            data = response.json()
            conversations.extend(data['data'])

            if page >= data['total_pages']:
                break

            page += 1

        return conversations
    ```
  </Accordion>

  <Accordion title="Handle large datasets with streaming">
    For very large conversation histories, process pages as you fetch them:

    ```python theme={null}
    def process_conversations_streaming(processor_func):
        page = 1

        while True:
            response = requests.get(
                'https://api.wolfia.com/v1/insights/conversations',
                headers={'X-API-Key': API_KEY},
                params={'page': page, 'page_size': 100}
            )

            data = response.json()

            # Process this page immediately
            for conversation in data['data']:
                processor_func(conversation)

            if page >= data['total_pages']:
                break

            page += 1

    # Usage
    def export_to_db(conversation):
        # Insert into database
        pass

    process_conversations_streaming(export_to_db)
    ```
  </Accordion>

  <Accordion title="Cache results for frequently accessed data">
    Cache conversation data to reduce API calls:

    ```python theme={null}
    import pickle
    from datetime import datetime, timedelta

    CACHE_FILE = 'conversations_cache.pkl'
    CACHE_DURATION = timedelta(hours=1)

    def get_conversations_cached():
        try:
            with open(CACHE_FILE, 'rb') as f:
                cache = pickle.load(f)
                if datetime.now() - cache['timestamp'] < CACHE_DURATION:
                    return cache['data']
        except FileNotFoundError:
            pass

        # Fetch fresh data
        conversations = fetch_all_conversations()

        # Save to cache
        with open(CACHE_FILE, 'wb') as f:
            pickle.dump({
                'timestamp': datetime.now(),
                'data': conversations
            }, f)

        return conversations
    ```
  </Accordion>
</AccordionGroup>

### Timestamp handling

Conversation timestamps use microseconds since Unix epoch. Here's how to work with them:

```python theme={null}
from datetime import datetime

# Convert from microseconds to datetime
answered_at_us = 1731628800000000
answered_at_datetime = datetime.fromtimestamp(answered_at_us / 1_000_000)
print(answered_at_datetime)  # 2024-11-14 12:00:00

# Convert datetime to microseconds
dt = datetime.now()
timestamp_us = int(dt.timestamp() * 1_000_000)
```

```javascript theme={null}
// Convert from microseconds to JavaScript Date
const answeredAtUs = 1731628800000000;
const answeredAtDate = new Date(answeredAtUs / 1000); // Divide by 1000 for milliseconds
console.log(answeredAtDate.toISOString());

// Convert Date to microseconds
const now = new Date();
const timestampUs = now.getTime() * 1000;
```

### Error handling and retry logic

<AccordionGroup>
  <Accordion title="Implement exponential backoff">
    Retry failed requests with increasing delays:

    ```python theme={null}
    import time
    from typing import Optional

    def fetch_with_retry(page: int, max_retries: int = 3) -> Optional[dict]:
        for attempt in range(max_retries):
            try:
                response = requests.get(
                    'https://api.wolfia.com/v1/insights/conversations',
                    headers={'X-API-Key': API_KEY},
                    params={'page': page, 'page_size': 100},
                    timeout=30
                )

                if response.status_code == 200:
                    return response.json()

                if response.status_code >= 500:
                    # Server error - retry
                    if attempt < max_retries - 1:
                        wait_time = 2 ** attempt  # 1s, 2s, 4s
                        time.sleep(wait_time)
                        continue
                else:
                    # Client error - don't retry
                    print(f"Client error: {response.status_code} - {response.text}")
                    return None

            except requests.exceptions.Timeout:
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt
                    time.sleep(wait_time)
                    continue
            except Exception as e:
                print(f"Error: {str(e)}")
                return None

        return None
    ```
  </Accordion>

  <Accordion title="Log errors for debugging">
    Maintain detailed logs of API interactions:

    ```python theme={null}
    import logging

    logging.basicConfig(
        filename='wolfia_insights.log',
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )

    def fetch_conversations_with_logging():
        logging.info("Starting conversation fetch")

        try:
            response = requests.get(
                'https://api.wolfia.com/v1/insights/conversations',
                headers={'X-API-Key': API_KEY},
                params={'page': 1, 'page_size': 100}
            )

            if response.status_code == 200:
                logging.info(f"Successfully fetched page 1")
                return response.json()
            else:
                logging.error(
                    f"API error: status={response.status_code}, "
                    f"response={response.text}"
                )

        except Exception as e:
            logging.exception(f"Exception during fetch: {str(e)}")

        return None
    ```
  </Accordion>
</AccordionGroup>

### Data privacy and compliance

When working with conversation data:

* **PII handling:** Conversation data contains personally identifiable information (user emails, names). Handle according to your privacy policies.
* **Data retention:** Implement appropriate data retention policies for exported conversation data.
* **Access control:** Restrict access to conversation insights to authorized personnel only.
* **Audit logging:** Log all API access and data exports for compliance auditing.

## Use cases

### Analytics and reporting

Build custom dashboards to track:

* Conversation volume trends over time
* Most active users and teams
* Source type distribution (web UI vs Slack vs agents)
* Peak usage hours and days

### Quality assurance

Monitor AI performance by:

* Reviewing conversation quality across different source types
* Identifying common user questions
* Analyzing response patterns
* Tracking user satisfaction

### Compliance and auditing

Export conversation history for:

* Regulatory compliance requirements
* Security audits
* Legal discovery
* Data governance

### Integration monitoring

Track integration health:

* Monitor Slack app usage
* Verify agent conversation flows
* Analyze cross-channel performance
* Identify integration issues

## Rate limits and quotas

The conversation insights API follows standard Wolfia API rate limits:

* **Default rate limit:** 100 requests per minute per API key
* **Burst allowance:** Short bursts above the limit are allowed
* **Quota reset:** Rate limit window resets every minute

If you hit rate limits:

* Implement exponential backoff
* Spread requests over time
* Cache results when appropriate
* Contact support for higher limits if needed

## Getting help

If you encounter issues with the conversation insights API:

* **Check authentication:** Verify your API key is active in [API settings](https://wolfia.com/settings/api)
* **Review pagination:** Ensure page and page\_size parameters are valid
* **Validate permissions:** Confirm you have admin role for API access
* **Check timestamps:** Verify you're handling microsecond timestamps correctly
* **Contact support:** Email [support@wolfia.com](mailto:support@wolfia.com) with your API key ID (never share the full key)

## Next steps

<CardGroup cols={2}>
  <Card title="API overview" icon="code" href="/how-to/api-overview">
    Learn about API authentication, rate limits, and best practices
  </Card>

  <Card title="Invite users" icon="user-plus" href="/how-to/api-invite-users">
    Automate user onboarding with programmatic invitations
  </Card>
</CardGroup>
