Skip to content

Publishing from Python

Send notifications to Notifer topics using Python with the requests library or the official SDK (coming soon).

Quick Start

Using requests Library

The simplest way to publish messages:

import requests

response = requests.post(
    "https://app.notifer.io/my-topic",
    data="Your message here"
)

print(response.json())

Output:

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "topic": "my-topic",
  "message": "Your message here",
  "priority": 3,
  "created_at": "2025-11-22T10:30:00Z"
}

Installation

Install requests

pip install requests

Or with virtual environment:

python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
pip install requests

Official SDK (Coming Soon)

pip install notifer

The official SDK will simplify authentication, retries, and error handling.

Basic Publishing

Simple Message

import requests

def send_notification(topic, message):
    """Send simple notification to topic"""
    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message
    )
    response.raise_for_status()
    return response.json()

# Usage
result = send_notification("server-alerts", "Server is down!")
print(f"Message ID: {result['id']}")

With Title and Priority

import requests

def send_alert(topic, title, message, priority=3):
    """Send alert with title and priority"""
    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message,
        headers={
            "X-Title": title,
            "X-Priority": str(priority)
        }
    )
    response.raise_for_status()
    return response.json()

# Usage
send_alert(
    topic="production-alerts",
    title="Database Error",
    message="Connection timeout on prod-db-01",
    priority=5
)

With Tags

import requests

def send_tagged_message(topic, message, tags, priority=3):
    """Send message with tags"""
    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message,
        headers={
            "X-Priority": str(priority),
            "X-Tags": ",".join(tags)
        }
    )
    response.raise_for_status()
    return response.json()

# Usage
send_tagged_message(
    topic="monitoring",
    message="CPU usage at 95%",
    tags=["warning", "cpu", "prod-web-01"],
    priority=4
)

Advanced Features

Markdown Formatting

import requests

def send_markdown_message(topic, message, title=None, priority=3):
    """Send message with Markdown formatting"""
    headers = {
        "X-Markdown": "true",
        "X-Priority": str(priority)
    }

    if title:
        headers["X-Title"] = title

    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message,
        headers=headers
    )
    response.raise_for_status()
    return response.json()

# Usage
markdown_message = """
## Deployment Summary

**Status:** ✅ Success
**Version:** v2.1.0
**Duration:** 3m 45s

### Changes
- Added user authentication
- Fixed payment bug
- Updated dependencies

[View Release Notes](https://github.com/example/repo/releases/v2.1.0)
"""

send_markdown_message(
    topic="deployments",
    message=markdown_message,
    title="Deploy v2.1.0",
    priority=3
)

Private Topics with Authentication

import requests

class NotiferClient:
    """Client for authenticated publishing"""

    def __init__(self, api_key=None, jwt_token=None):
        self.base_url = "https://app.notifer.io"
        self.headers = {}

        if api_key:
            self.headers["Authorization"] = f"Bearer {api_key}"
        elif jwt_token:
            self.headers["Authorization"] = f"Bearer {jwt_token}"

    def publish(self, topic, message, title=None, priority=3, tags=None, markdown=False):
        """Publish message to private topic"""
        headers = self.headers.copy()
        headers["X-Priority"] = str(priority)

        if title:
            headers["X-Title"] = title

        if tags:
            headers["X-Tags"] = ",".join(tags)

        if markdown:
            headers["X-Markdown"] = "true"

        response = requests.post(
            f"{self.base_url}/{topic}",
            data=message,
            headers=headers
        )
        response.raise_for_status()
        return response.json()

# Usage
client = NotiferClient(api_key="your-api-key-here")

client.publish(
    topic="private-alerts",
    title="Critical Issue",
    message="Database connection lost",
    priority=5,
    tags=["critical", "database"]
)

Real-World Examples

Server Monitoring Script

import requests
import psutil
import time

NOTIFER_TOPIC = "server-monitoring"
CPU_THRESHOLD = 80
MEMORY_THRESHOLD = 85

def send_alert(title, message, priority=4):
    """Send monitoring alert"""
    try:
        requests.post(
            f"https://app.notifer.io/{NOTIFER_TOPIC}",
            data=message,
            headers={
                "X-Title": title,
                "X-Priority": str(priority),
                "X-Tags": "monitoring,server"
            },
            timeout=5
        )
    except requests.exceptions.RequestException as e:
        print(f"Failed to send alert: {e}")

def check_system():
    """Check system resources"""
    cpu_percent = psutil.cpu_percent(interval=1)
    memory_percent = psutil.virtual_memory().percent

    if cpu_percent > CPU_THRESHOLD:
        send_alert(
            title=f"High CPU Usage: {cpu_percent}%",
            message=f"CPU usage is at {cpu_percent}% (threshold: {CPU_THRESHOLD}%)",
            priority=4
        )

    if memory_percent > MEMORY_THRESHOLD:
        send_alert(
            title=f"High Memory Usage: {memory_percent}%",
            message=f"Memory usage is at {memory_percent}% (threshold: {MEMORY_THRESHOLD}%)",
            priority=4
        )

# Run monitoring loop
while True:
    check_system()
    time.sleep(60)  # Check every minute

Application Error Logging

import requests
import traceback
import sys

NOTIFER_TOPIC = "application-errors"
NOTIFER_API_KEY = "your-api-key"

def log_error(error, context=None):
    """Log application error to Notifer"""
    error_type = type(error).__name__
    error_message = str(error)
    stack_trace = traceback.format_exc()

    message = f"""
## {error_type}

**Error:** {error_message}

**Context:** {context or 'None'}

**Stack Trace:**
```python
{stack_trace}
"""

try:
    requests.post(
        f"https://app.notifer.io/{NOTIFER_TOPIC}",
        data=message,
        headers={
            "Authorization": f"Bearer {NOTIFER_API_KEY}",
            "X-Title": f"Error: {error_type}",
            "X-Priority": "5",
            "X-Tags": f"error,{error_type.lower()}",
            "X-Markdown": "true"
        },
        timeout=5
    )
except:
    # Don't let notification failure crash the app
    print(f"Failed to log error to Notifer", file=sys.stderr)

Usage in exception handler

try: # Your application code result = 1 / 0 except Exception as e: log_error(e, context="division operation") raise

### CI/CD Pipeline Integration

```python
import requests
import os
import sys
from datetime import datetime

class CIPipeline:
    """CI/CD pipeline notifier"""

    def __init__(self, topic="ci-pipeline"):
        self.topic = topic
        self.base_url = "https://app.notifer.io"
        self.build_start = datetime.now()

    def notify(self, title, message, priority=3, tags=None, markdown=True):
        """Send CI notification"""
        headers = {
            "X-Title": title,
            "X-Priority": str(priority),
            "X-Tags": ",".join(tags or []),
            "X-Markdown": str(markdown).lower()
        }

        try:
            response = requests.post(
                f"{self.base_url}/{self.topic}",
                data=message,
                headers=headers,
                timeout=10
            )
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            print(f"Warning: Failed to send notification: {e}", file=sys.stderr)

    def build_started(self, branch, commit):
        """Notify build started"""
        message = f"""
**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Started:** {self.build_start.strftime('%Y-%m-%d %H:%M:%S')}
"""
        self.notify(
            title=f"Build Started: {branch}",
            message=message,
            priority=2,
            tags=["build", "started", branch]
        )

    def build_success(self, branch, commit, tests_passed):
        """Notify build succeeded"""
        duration = (datetime.now() - self.build_start).total_seconds()

        message = f"""
## ✅ Build Successful

**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Duration:** {duration:.1f}s
**Tests:** {tests_passed} passed

[View Build Logs](https://ci.example.com/builds/latest)
"""
        self.notify(
            title=f"Build Success: {branch}",
            message=message,
            priority=3,
            tags=["build", "success", branch]
        )

    def build_failed(self, branch, commit, error):
        """Notify build failed"""
        duration = (datetime.now() - self.build_start).total_seconds()

        message = f"""
## ❌ Build Failed

**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Duration:** {duration:.1f}s

**Error:**
{error}
[View Build Logs](https://ci.example.com/builds/latest)
"""
        self.notify(
            title=f"Build Failed: {branch}",
            message=message,
            priority=5,
            tags=["build", "failed", branch]
        )

# Usage in CI script
if __name__ == "__main__":
    pipeline = CIPipeline()

    branch = os.environ.get("GIT_BRANCH", "unknown")
    commit = os.environ.get("GIT_COMMIT", "unknown")

    pipeline.build_started(branch, commit)

    try:
        # Run tests, build, etc.
        # ... your CI steps ...

        pipeline.build_success(branch, commit, tests_passed=42)
        sys.exit(0)
    except Exception as e:
        pipeline.build_failed(branch, commit, str(e))
        sys.exit(1)

Backup Status Notifications

import requests
import subprocess
import os
from datetime import datetime

BACKUP_TOPIC = "database-backups"

def notify_backup_status(success, size_mb=None, duration_seconds=None, error=None):
    """Notify backup completion status"""
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    if success:
        message = f"""
## ✅ Backup Completed Successfully

**Type:** Full database backup
**Size:** {size_mb:.2f} MB
**Duration:** {duration_seconds:.1f}s
**Location:** `s3://backups/db-{datetime.now().strftime('%Y-%m-%d')}.sql.gz`
**Completed:** {timestamp}

**Details:**
- Compression: gzip
- Verification: Passed ✅
"""
        priority = 2
        tags = ["backup", "success"]
    else:
        message = f"""
## ❌ Backup Failed

**Error:** {error}
**Time:** {timestamp}

**Action Required:**
1. Check backup logs
2. Verify disk space
3. Retry backup manually
"""
        priority = 5
        tags = ["backup", "failed", "critical"]

    requests.post(
        f"https://app.notifer.io/{BACKUP_TOPIC}",
        data=message,
        headers={
            "X-Title": "Backup Completed" if success else "Backup Failed",
            "X-Priority": str(priority),
            "X-Tags": ",".join(tags),
            "X-Markdown": "true"
        }
    )

# Usage in backup script
def run_backup():
    """Run database backup"""
    start_time = datetime.now()

    try:
        # Run backup command
        result = subprocess.run(
            ["pg_dump", "-h", "localhost", "-U", "postgres", "mydb"],
            capture_output=True,
            check=True
        )

        # Calculate metrics
        duration = (datetime.now() - start_time).total_seconds()
        size_mb = len(result.stdout) / 1024 / 1024

        notify_backup_status(
            success=True,
            size_mb=size_mb,
            duration_seconds=duration
        )

    except subprocess.CalledProcessError as e:
        notify_backup_status(
            success=False,
            error=str(e)
        )
        raise

run_backup()

Scheduled Report Generation

import requests
from datetime import datetime, timedelta

def generate_daily_report():
    """Generate and send daily statistics report"""

    # Fetch your application metrics
    # This is example data - replace with your actual metrics
    metrics = {
        "users_today": 1234,
        "users_change": "+5.2%",
        "orders_today": 456,
        "orders_change": "+12.1%",
        "revenue_today": 12345.67,
        "revenue_change": "+8.7%",
        "errors_today": 23,
        "errors_change": "-15.3%"
    }

    date_str = datetime.now().strftime('%Y-%m-%d')

    message = f"""
## 📊 Daily Report - {date_str}

### Application Stats

| Metric | Value | Change |
|--------|-------|--------|
| Users | {metrics['users_today']} | {metrics['users_change']} ↗️ |
| Orders | {metrics['orders_today']} | {metrics['orders_change']} ↗️ |
| Revenue | ${metrics['revenue_today']:.2f} | {metrics['revenue_change']} ↗️ |
| Errors | {metrics['errors_today']} | {metrics['errors_change']} ↘️ |

---

### Top Performing Pages
1. Homepage - 5,432 views
2. Products - 3,210 views
3. Checkout - 1,890 views

### Issues
- ⚠️ Slow API response on checkout (avg 2.1s)
- ✅ Database backup completed
- ✅ SSL certificate renewed

[View Full Dashboard](https://analytics.example.com)
"""

    requests.post(
        "https://app.notifer.io/daily-reports",
        data=message,
        headers={
            "X-Title": f"Daily Report - {date_str}",
            "X-Priority": "2",
            "X-Tags": "report,daily,analytics",
            "X-Markdown": "true"
        }
    )

# Run as cron job: 0 9 * * * /usr/bin/python3 /path/to/report.py
if __name__ == "__main__":
    generate_daily_report()

Error Handling

Basic Error Handling

import requests

def safe_publish(topic, message, **kwargs):
    """Publish with error handling"""
    try:
        response = requests.post(
            f"https://app.notifer.io/{topic}",
            data=message,
            headers=kwargs.get("headers", {}),
            timeout=5
        )
        response.raise_for_status()
        return response.json()

    except requests.exceptions.Timeout:
        print("Error: Request timeout")
        return None

    except requests.exceptions.ConnectionError:
        print("Error: Connection failed")
        return None

    except requests.exceptions.HTTPError as e:
        print(f"HTTP Error: {e.response.status_code} - {e.response.text}")
        return None

    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

Retry with Exponential Backoff

import requests
import time

def publish_with_retry(topic, message, max_retries=3, **kwargs):
    """Publish with exponential backoff retry"""

    for attempt in range(max_retries):
        try:
            response = requests.post(
                f"https://app.notifer.io/{topic}",
                data=message,
                headers=kwargs.get("headers", {}),
                timeout=5
            )
            response.raise_for_status()
            return response.json()

        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                # Last attempt failed
                print(f"Failed after {max_retries} attempts: {e}")
                raise

            # Calculate backoff delay: 2^attempt seconds
            delay = 2 ** attempt
            print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
            time.sleep(delay)

Best Practices

1. Use Environment Variables

import os
import requests

NOTIFER_TOPIC = os.getenv("NOTIFER_TOPIC", "default-topic")
NOTIFER_API_KEY = os.getenv("NOTIFER_API_KEY")

def publish(message, **kwargs):
    """Publish with environment-based config"""
    headers = kwargs.get("headers", {})

    if NOTIFER_API_KEY:
        headers["Authorization"] = f"Bearer {NOTIFER_API_KEY}"

    response = requests.post(
        f"https://app.notifer.io/{NOTIFER_TOPIC}",
        data=message,
        headers=headers
    )
    response.raise_for_status()
    return response.json()

Environment file (.env):

NOTIFER_TOPIC=production-alerts
NOTIFER_API_KEY=your-api-key-here

2. Create Reusable Client Class

import requests
from typing import Optional, List

class Notifer:
    """Reusable Notifer client"""

    def __init__(self, api_key: Optional[str] = None, base_url: str = "https://app.notifer.io"):
        self.base_url = base_url
        self.api_key = api_key
        self.session = requests.Session()

        if api_key:
            self.session.headers["Authorization"] = f"Bearer {api_key}"

    def publish(
        self,
        topic: str,
        message: str,
        title: Optional[str] = None,
        priority: int = 3,
        tags: Optional[List[str]] = None,
        markdown: bool = False
    ) -> dict:
        """Publish message to topic"""

        headers = {
            "X-Priority": str(priority)
        }

        if title:
            headers["X-Title"] = title

        if tags:
            headers["X-Tags"] = ",".join(tags)

        if markdown:
            headers["X-Markdown"] = "true"

        response = self.session.post(
            f"{self.base_url}/{topic}",
            data=message,
            headers=headers,
            timeout=5
        )
        response.raise_for_status()
        return response.json()

# Usage
client = Notifer(api_key="your-key")

client.publish(
    topic="alerts",
    title="Test Alert",
    message="This is a test",
    priority=4,
    tags=["test", "demo"]
)

3. Logging Integration

import logging
import requests

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def publish(topic, message, **kwargs):
    """Publish with logging"""
    logger.info(f"Publishing to topic '{topic}'")

    try:
        response = requests.post(
            f"https://app.notifer.io/{topic}",
            data=message,
            headers=kwargs.get("headers", {}),
            timeout=5
        )
        response.raise_for_status()

        result = response.json()
        logger.info(f"Published successfully: {result['id']}")
        return result

    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to publish: {e}")
        raise

4. Async Publishing (asyncio)

import aiohttp
import asyncio

async def publish_async(topic, message, **kwargs):
    """Async publish with aiohttp"""

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(
                f"https://app.notifer.io/{topic}",
                data=message,
                headers=kwargs.get("headers", {}),
                timeout=aiohttp.ClientTimeout(total=5)
            ) as response:
                response.raise_for_status()
                return await response.json()

        except aiohttp.ClientError as e:
            print(f"Error: {e}")
            return None

# Usage
async def main():
    result = await publish_async(
        topic="async-topic",
        message="Async message"
    )
    print(result)

asyncio.run(main())

Testing

Unit Tests

import unittest
from unittest.mock import patch, Mock
import requests

class TestNotiferPublishing(unittest.TestCase):
    """Unit tests for Notifer publishing"""

    @patch('requests.post')
    def test_publish_success(self, mock_post):
        """Test successful publish"""

        # Mock response
        mock_response = Mock()
        mock_response.json.return_value = {
            "id": "test-id",
            "topic": "test-topic",
            "message": "Test message"
        }
        mock_post.return_value = mock_response

        # Call function
        result = publish("test-topic", "Test message")

        # Assertions
        self.assertEqual(result["id"], "test-id")
        mock_post.assert_called_once()

    @patch('requests.post')
    def test_publish_failure(self, mock_post):
        """Test publish failure"""

        # Mock error
        mock_post.side_effect = requests.exceptions.ConnectionError()

        # Call should handle error
        result = safe_publish("test-topic", "Test message")

        # Assertions
        self.assertIsNone(result)

if __name__ == "__main__":
    unittest.main()

Next Steps


Pro Tip: Use environment variables for API keys and create a reusable client class for cleaner code! 🐍