Skip to content

Publishing from Python

Send notifications to Notifer topics using Python with the requests library.

Quick Start

Using requests Library

The simplest way to publish messages:

import requests

response = requests.post(
    "https://app.notifer.io/my-topic",
    data="Your message here"
)

print(response.json())

Output:

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "topic": "my-topic",
  "message": "Your message here",
  "priority": 3,
  "created_at": "2025-11-22T10:30:00Z"
}

Installation

Install requests

pip install requests

Or with virtual environment:

python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
pip install requests

Basic Publishing

Simple Message

import requests

def send_notification(topic, message):
    """Send simple notification to topic"""
    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message
    )
    response.raise_for_status()
    return response.json()

# Usage
result = send_notification("server-alerts", "Server is down!")
print(f"Message ID: {result['id']}")

With Title and Priority

import requests

def send_alert(topic, title, message, priority=3):
    """Send alert with title and priority"""
    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message,
        headers={
            "X-Title": title,
            "X-Priority": str(priority)
        }
    )
    response.raise_for_status()
    return response.json()

# Usage
send_alert(
    topic="production-alerts",
    title="Database Error",
    message="Connection timeout on prod-db-01",
    priority=5
)

With Tags

import requests

def send_tagged_message(topic, message, tags, priority=3):
    """Send message with tags"""
    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message,
        headers={
            "X-Priority": str(priority),
            "X-Tags": ",".join(tags)
        }
    )
    response.raise_for_status()
    return response.json()

# Usage
send_tagged_message(
    topic="monitoring",
    message="CPU usage at 95%",
    tags=["warning", "cpu", "prod-web-01"],
    priority=4
)

Advanced Features

Markdown Formatting

import requests

def send_markdown_message(topic, message, title=None, priority=3):
    """Send message with Markdown formatting"""
    headers = {
        "X-Markdown": "true",
        "X-Priority": str(priority)
    }

    if title:
        headers["X-Title"] = title

    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message,
        headers=headers
    )
    response.raise_for_status()
    return response.json()

# Usage
markdown_message = """
## Deployment Summary

**Status:** ✅ Success
**Version:** v2.1.0
**Duration:** 3m 45s

### Changes
- Added user authentication
- Fixed payment bug
- Updated dependencies

[View Release Notes](https://github.com/example/repo/releases/v2.1.0)
"""

send_markdown_message(
    topic="deployments",
    message=markdown_message,
    title="Deploy v2.1.0",
    priority=3
)

Private Topics with Authentication

import requests

class NotiferClient:
    """Client for authenticated publishing"""

    def __init__(self, api_key=None, jwt_token=None):
        self.base_url = "https://app.notifer.io"
        self.headers = {}

        if api_key:
            self.headers["Authorization"] = f"Bearer {api_key}"
        elif jwt_token:
            self.headers["Authorization"] = f"Bearer {jwt_token}"

    def publish(self, topic, message, title=None, priority=3, tags=None, markdown=False):
        """Publish message to private topic"""
        headers = self.headers.copy()
        headers["X-Priority"] = str(priority)

        if title:
            headers["X-Title"] = title

        if tags:
            headers["X-Tags"] = ",".join(tags)

        if markdown:
            headers["X-Markdown"] = "true"

        response = requests.post(
            f"{self.base_url}/{topic}",
            data=message,
            headers=headers
        )
        response.raise_for_status()
        return response.json()

# Usage
client = NotiferClient(api_key="your-api-key-here")

client.publish(
    topic="private-alerts",
    title="Critical Issue",
    message="Database connection lost",
    priority=5,
    tags=["critical", "database"]
)

Real-World Examples

Server Monitoring Script

import requests
import psutil
import time

NOTIFER_TOPIC = "server-monitoring"
CPU_THRESHOLD = 80
MEMORY_THRESHOLD = 85

def send_alert(title, message, priority=4):
    """Send monitoring alert"""
    try:
        requests.post(
            f"https://app.notifer.io/{NOTIFER_TOPIC}",
            data=message,
            headers={
                "X-Title": title,
                "X-Priority": str(priority),
                "X-Tags": "monitoring,server"
            },
            timeout=5
        )
    except requests.exceptions.RequestException as e:
        print(f"Failed to send alert: {e}")

def check_system():
    """Check system resources"""
    cpu_percent = psutil.cpu_percent(interval=1)
    memory_percent = psutil.virtual_memory().percent

    if cpu_percent > CPU_THRESHOLD:
        send_alert(
            title=f"High CPU Usage: {cpu_percent}%",
            message=f"CPU usage is at {cpu_percent}% (threshold: {CPU_THRESHOLD}%)",
            priority=4
        )

    if memory_percent > MEMORY_THRESHOLD:
        send_alert(
            title=f"High Memory Usage: {memory_percent}%",
            message=f"Memory usage is at {memory_percent}% (threshold: {MEMORY_THRESHOLD}%)",
            priority=4
        )

# Run monitoring loop
while True:
    check_system()
    time.sleep(60)  # Check every minute

Application Error Logging

import requests
import traceback
import sys

NOTIFER_TOPIC = "application-errors"
NOTIFER_API_KEY = "your-api-key"

def log_error(error, context=None):
    """Log application error to Notifer"""
    error_type = type(error).__name__
    error_message = str(error)
    stack_trace = traceback.format_exc()

    message = f"""
## {error_type}

**Error:** {error_message}

**Context:** {context or 'None'}

**Stack Trace:**
```python
{stack_trace}
"""

try:
    requests.post(
        f"https://app.notifer.io/{NOTIFER_TOPIC}",
        data=message,
        headers={
            "Authorization": f"Bearer {NOTIFER_API_KEY}",
            "X-Title": f"Error: {error_type}",
            "X-Priority": "5",
            "X-Tags": f"error,{error_type.lower()}",
            "X-Markdown": "true"
        },
        timeout=5
    )
except:
    # Don't let notification failure crash the app
    print(f"Failed to log error to Notifer", file=sys.stderr)

Usage in exception handler

try: # Your application code result = 1 / 0 except Exception as e: log_error(e, context="division operation") raise

### CI/CD Pipeline Integration

```python
import requests
import os
import sys
from datetime import datetime

class CIPipeline:
    """CI/CD pipeline notifier"""

    def __init__(self, topic="ci-pipeline"):
        self.topic = topic
        self.base_url = "https://app.notifer.io"
        self.build_start = datetime.now()

    def notify(self, title, message, priority=3, tags=None, markdown=True):
        """Send CI notification"""
        headers = {
            "X-Title": title,
            "X-Priority": str(priority),
            "X-Tags": ",".join(tags or []),
            "X-Markdown": str(markdown).lower()
        }

        try:
            response = requests.post(
                f"{self.base_url}/{self.topic}",
                data=message,
                headers=headers,
                timeout=10
            )
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            print(f"Warning: Failed to send notification: {e}", file=sys.stderr)

    def build_started(self, branch, commit):
        """Notify build started"""
        message = f"""
**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Started:** {self.build_start.strftime('%Y-%m-%d %H:%M:%S')}
"""
        self.notify(
            title=f"Build Started: {branch}",
            message=message,
            priority=2,
            tags=["build", "started", branch]
        )

    def build_success(self, branch, commit, tests_passed):
        """Notify build succeeded"""
        duration = (datetime.now() - self.build_start).total_seconds()

        message = f"""
## ✅ Build Successful

**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Duration:** {duration:.1f}s
**Tests:** {tests_passed} passed

[View Build Logs](https://ci.example.com/builds/latest)
"""
        self.notify(
            title=f"Build Success: {branch}",
            message=message,
            priority=3,
            tags=["build", "success", branch]
        )

    def build_failed(self, branch, commit, error):
        """Notify build failed"""
        duration = (datetime.now() - self.build_start).total_seconds()

        message = f"""
## ❌ Build Failed

**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Duration:** {duration:.1f}s

**Error:**
{error}
[View Build Logs](https://ci.example.com/builds/latest)
"""
        self.notify(
            title=f"Build Failed: {branch}",
            message=message,
            priority=5,
            tags=["build", "failed", branch]
        )

# Usage in CI script
if __name__ == "__main__":
    pipeline = CIPipeline()

    branch = os.environ.get("GIT_BRANCH", "unknown")
    commit = os.environ.get("GIT_COMMIT", "unknown")

    pipeline.build_started(branch, commit)

    try:
        # Run tests, build, etc.
        # ... your CI steps ...

        pipeline.build_success(branch, commit, tests_passed=42)
        sys.exit(0)
    except Exception as e:
        pipeline.build_failed(branch, commit, str(e))
        sys.exit(1)

Backup Status Notifications

import requests
import subprocess
import os
from datetime import datetime

BACKUP_TOPIC = "database-backups"

def notify_backup_status(success, size_mb=None, duration_seconds=None, error=None):
    """Notify backup completion status"""
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    if success:
        message = f"""
## ✅ Backup Completed Successfully

**Type:** Full database backup
**Size:** {size_mb:.2f} MB
**Duration:** {duration_seconds:.1f}s
**Location:** `s3://backups/db-{datetime.now().strftime('%Y-%m-%d')}.sql.gz`
**Completed:** {timestamp}

**Details:**
- Compression: gzip
- Verification: Passed ✅
"""
        priority = 2
        tags = ["backup", "success"]
    else:
        message = f"""
## ❌ Backup Failed

**Error:** {error}
**Time:** {timestamp}

**Action Required:**
1. Check backup logs
2. Verify disk space
3. Retry backup manually
"""
        priority = 5
        tags = ["backup", "failed", "critical"]

    requests.post(
        f"https://app.notifer.io/{BACKUP_TOPIC}",
        data=message,
        headers={
            "X-Title": "Backup Completed" if success else "Backup Failed",
            "X-Priority": str(priority),
            "X-Tags": ",".join(tags),
            "X-Markdown": "true"
        }
    )

# Usage in backup script
def run_backup():
    """Run database backup"""
    start_time = datetime.now()

    try:
        # Run backup command
        result = subprocess.run(
            ["pg_dump", "-h", "localhost", "-U", "postgres", "mydb"],
            capture_output=True,
            check=True
        )

        # Calculate metrics
        duration = (datetime.now() - start_time).total_seconds()
        size_mb = len(result.stdout) / 1024 / 1024

        notify_backup_status(
            success=True,
            size_mb=size_mb,
            duration_seconds=duration
        )

    except subprocess.CalledProcessError as e:
        notify_backup_status(
            success=False,
            error=str(e)
        )
        raise

run_backup()

Scheduled Report Generation

import requests
from datetime import datetime, timedelta

def generate_daily_report():
    """Generate and send daily statistics report"""

    # Fetch your application metrics
    # This is example data - replace with your actual metrics
    metrics = {
        "users_today": 1234,
        "users_change": "+5.2%",
        "orders_today": 456,
        "orders_change": "+12.1%",
        "revenue_today": 12345.67,
        "revenue_change": "+8.7%",
        "errors_today": 23,
        "errors_change": "-15.3%"
    }

    date_str = datetime.now().strftime('%Y-%m-%d')

    message = f"""
## 📊 Daily Report - {date_str}

### Application Stats

| Metric | Value | Change |
|--------|-------|--------|
| Users | {metrics['users_today']} | {metrics['users_change']} ↗️ |
| Orders | {metrics['orders_today']} | {metrics['orders_change']} ↗️ |
| Revenue | ${metrics['revenue_today']:.2f} | {metrics['revenue_change']} ↗️ |
| Errors | {metrics['errors_today']} | {metrics['errors_change']} ↘️ |

---

### Top Performing Pages
1. Homepage - 5,432 views
2. Products - 3,210 views
3. Checkout - 1,890 views

### Issues
- ⚠️ Slow API response on checkout (avg 2.1s)
- ✅ Database backup completed
- ✅ SSL certificate renewed

[View Full Dashboard](https://analytics.example.com)
"""

    requests.post(
        "https://app.notifer.io/daily-reports",
        data=message,
        headers={
            "X-Title": f"Daily Report - {date_str}",
            "X-Priority": "2",
            "X-Tags": "report,daily,analytics",
            "X-Markdown": "true"
        }
    )

# Run as cron job: 0 9 * * * /usr/bin/python3 /path/to/report.py
if __name__ == "__main__":
    generate_daily_report()

Error Handling

Basic Error Handling

import requests

def safe_publish(topic, message, **kwargs):
    """Publish with error handling"""
    try:
        response = requests.post(
            f"https://app.notifer.io/{topic}",
            data=message,
            headers=kwargs.get("headers", {}),
            timeout=5
        )
        response.raise_for_status()
        return response.json()

    except requests.exceptions.Timeout:
        print("Error: Request timeout")
        return None

    except requests.exceptions.ConnectionError:
        print("Error: Connection failed")
        return None

    except requests.exceptions.HTTPError as e:
        print(f"HTTP Error: {e.response.status_code} - {e.response.text}")
        return None

    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

Retry with Exponential Backoff

import requests
import time

def publish_with_retry(topic, message, max_retries=3, **kwargs):
    """Publish with exponential backoff retry"""

    for attempt in range(max_retries):
        try:
            response = requests.post(
                f"https://app.notifer.io/{topic}",
                data=message,
                headers=kwargs.get("headers", {}),
                timeout=5
            )
            response.raise_for_status()
            return response.json()

        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                # Last attempt failed
                print(f"Failed after {max_retries} attempts: {e}")
                raise

            # Calculate backoff delay: 2^attempt seconds
            delay = 2 ** attempt
            print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
            time.sleep(delay)

Best Practices

1. Use Environment Variables

import os
import requests

NOTIFER_TOPIC = os.getenv("NOTIFER_TOPIC", "default-topic")
NOTIFER_API_KEY = os.getenv("NOTIFER_API_KEY")

def publish(message, **kwargs):
    """Publish with environment-based config"""
    headers = kwargs.get("headers", {})

    if NOTIFER_API_KEY:
        headers["Authorization"] = f"Bearer {NOTIFER_API_KEY}"

    response = requests.post(
        f"https://app.notifer.io/{NOTIFER_TOPIC}",
        data=message,
        headers=headers
    )
    response.raise_for_status()
    return response.json()

Environment file (.env):

NOTIFER_TOPIC=production-alerts
NOTIFER_API_KEY=your-api-key-here

2. Create Reusable Client Class

import requests
from typing import Optional, List

class Notifer:
    """Reusable Notifer client"""

    def __init__(self, api_key: Optional[str] = None, base_url: str = "https://app.notifer.io"):
        self.base_url = base_url
        self.api_key = api_key
        self.session = requests.Session()

        if api_key:
            self.session.headers["Authorization"] = f"Bearer {api_key}"

    def publish(
        self,
        topic: str,
        message: str,
        title: Optional[str] = None,
        priority: int = 3,
        tags: Optional[List[str]] = None,
        markdown: bool = False
    ) -> dict:
        """Publish message to topic"""

        headers = {
            "X-Priority": str(priority)
        }

        if title:
            headers["X-Title"] = title

        if tags:
            headers["X-Tags"] = ",".join(tags)

        if markdown:
            headers["X-Markdown"] = "true"

        response = self.session.post(
            f"{self.base_url}/{topic}",
            data=message,
            headers=headers,
            timeout=5
        )
        response.raise_for_status()
        return response.json()

# Usage
client = Notifer(api_key="your-key")

client.publish(
    topic="alerts",
    title="Test Alert",
    message="This is a test",
    priority=4,
    tags=["test", "demo"]
)

3. Logging Integration

import logging
import requests

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def publish(topic, message, **kwargs):
    """Publish with logging"""
    logger.info(f"Publishing to topic '{topic}'")

    try:
        response = requests.post(
            f"https://app.notifer.io/{topic}",
            data=message,
            headers=kwargs.get("headers", {}),
            timeout=5
        )
        response.raise_for_status()

        result = response.json()
        logger.info(f"Published successfully: {result['id']}")
        return result

    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to publish: {e}")
        raise

4. Async Publishing (asyncio)

import aiohttp
import asyncio

async def publish_async(topic, message, **kwargs):
    """Async publish with aiohttp"""

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(
                f"https://app.notifer.io/{topic}",
                data=message,
                headers=kwargs.get("headers", {}),
                timeout=aiohttp.ClientTimeout(total=5)
            ) as response:
                response.raise_for_status()
                return await response.json()

        except aiohttp.ClientError as e:
            print(f"Error: {e}")
            return None

# Usage
async def main():
    result = await publish_async(
        topic="async-topic",
        message="Async message"
    )
    print(result)

asyncio.run(main())

Testing

Unit Tests

import unittest
from unittest.mock import patch, Mock
import requests

class TestNotiferPublishing(unittest.TestCase):
    """Unit tests for Notifer publishing"""

    @patch('requests.post')
    def test_publish_success(self, mock_post):
        """Test successful publish"""

        # Mock response
        mock_response = Mock()
        mock_response.json.return_value = {
            "id": "test-id",
            "topic": "test-topic",
            "message": "Test message"
        }
        mock_post.return_value = mock_response

        # Call function
        result = publish("test-topic", "Test message")

        # Assertions
        self.assertEqual(result["id"], "test-id")
        mock_post.assert_called_once()

    @patch('requests.post')
    def test_publish_failure(self, mock_post):
        """Test publish failure"""

        # Mock error
        mock_post.side_effect = requests.exceptions.ConnectionError()

        # Call should handle error
        result = safe_publish("test-topic", "Test message")

        # Assertions
        self.assertIsNone(result)

if __name__ == "__main__":
    unittest.main()

Next Steps


Pro Tip: Use environment variables for API keys and create a reusable client class for cleaner code! 🐍