Skip to main content

Publishing from Python

Send notifications to Notifer topics using Python with the requests library.

Quick Start

Using requests Library

The simplest way to publish messages:

import requests

response = requests.post(
"https://app.notifer.io/my-topic",
data="Your message here"
)

print(response.json())

Output:

{
"id": "550e8400-e29b-41d4-a716-446655440000",
"topic": "my-topic",
"message": "Your message here",
"title": null,
"timestamp": "2025-11-22T10:30:00Z",
"priority": 3,
"tags": []
}

Installation

Install requests

pip install requests

Or with virtual environment:

python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install requests

Basic Publishing

Simple Message

import requests

def send_notification(topic, message):
"""Send simple notification to topic"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message
)
response.raise_for_status()
return response.json()

# Usage
result = send_notification("server-alerts", "Server is down!")
print(f"Message ID: {result['id']}")

With Title and Priority

import requests

def send_alert(topic, title, message, priority=3):
"""Send alert with title and priority"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers={
"X-Title": title,
"X-Priority": str(priority)
}
)
response.raise_for_status()
return response.json()

# Usage
send_alert(
topic="production-alerts",
title="Database Error",
message="Connection timeout on prod-db-01",
priority=1
)

With Tags

import requests

def send_tagged_message(topic, message, tags, priority=3):
"""Send message with tags"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers={
"X-Priority": str(priority),
"X-Tags": ",".join(tags)
}
)
response.raise_for_status()
return response.json()

# Usage
send_tagged_message(
topic="monitoring",
message="CPU usage at 95%",
tags=["warning", "cpu", "prod-web-01"],
priority=2
)

Advanced Features

Markdown Formatting

import requests

def send_markdown_message(topic, message, title=None, priority=3):
"""Send message with Markdown formatting"""
headers = {
"X-Priority": str(priority)
}

if title:
headers["X-Title"] = title

response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()

# Usage
markdown_message = """
## Deployment Summary

**Status:** ✅ Success
**Version:** v2.1.0
**Duration:** 3m 45s

### Changes
- Added user authentication
- Fixed payment bug
- Updated dependencies

[View Release Notes](https://github.com/example/repo/releases/v2.1.0)
"""

send_markdown_message(
topic="deployments",
message=markdown_message,
title="Deploy v2.1.0",
priority=3
)

Private Topics with Authentication

import requests

class NotiferClient:
"""Client for authenticated publishing"""

def __init__(self, api_key=None, jwt_token=None):
self.base_url = "https://app.notifer.io"
self.headers = {}

if api_key:
self.headers["Authorization"] = f"Bearer {api_key}"
elif jwt_token:
self.headers["Authorization"] = f"Bearer {jwt_token}"

def publish(self, topic, message, title=None, priority=3, tags=None):
"""Publish message to private topic"""
headers = self.headers.copy()
headers["X-Priority"] = str(priority)

if title:
headers["X-Title"] = title

if tags:
headers["X-Tags"] = ",".join(tags)

response = requests.post(
f"{self.base_url}/{topic}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()

# Usage
client = NotiferClient(api_key="your-api-key-here")

client.publish(
topic="private-alerts",
title="Critical Issue",
message="Database connection lost",
priority=1,
tags=["critical", "database"]
)

Real-World Examples

Server Monitoring Script

import requests
import psutil
import time

NOTIFER_TOPIC = "server-monitoring"
CPU_THRESHOLD = 80
MEMORY_THRESHOLD = 85

def send_alert(title, message, priority=2):
"""Send monitoring alert"""
try:
requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers={
"X-Title": title,
"X-Priority": str(priority),
"X-Tags": "monitoring,server"
},
timeout=5
)
except requests.exceptions.RequestException as e:
print(f"Failed to send alert: {e}")

def check_system():
"""Check system resources"""
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent

if cpu_percent > CPU_THRESHOLD:
send_alert(
title=f"High CPU Usage: {cpu_percent}%",
message=f"CPU usage is at {cpu_percent}% (threshold: {CPU_THRESHOLD}%)",
priority=2
)

if memory_percent > MEMORY_THRESHOLD:
send_alert(
title=f"High Memory Usage: {memory_percent}%",
message=f"Memory usage is at {memory_percent}% (threshold: {MEMORY_THRESHOLD}%)",
priority=2
)

# Run monitoring loop
while True:
check_system()
time.sleep(60) # Check every minute

Application Error Logging

import requests
import traceback
import sys

NOTIFER_TOPIC = "application-errors"
NOTIFER_API_KEY = "your-api-key"

def log_error(error, context=None):
"""Log application error to Notifer"""
error_type = type(error).__name__
error_message = str(error)
stack_trace = traceback.format_exc()

message = f"""
## {error_type}

**Error:** {error_message}

**Context:** {context or 'None'}

**Stack Trace:**
```
{stack_trace}
```
"""

try:
requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers={
"Authorization": f"Bearer {NOTIFER_API_KEY}",
"X-Title": f"Error: {error_type}",
"X-Priority": "1",
"X-Tags": f"error,{error_type.lower()}"
},
timeout=5
)
except:
# Don't let notification failure crash the app
print(f"Failed to log error to Notifer", file=sys.stderr)

# Usage in exception handler
try:
# Your application code
result = 1 / 0
except Exception as e:
log_error(e, context="division operation")
raise

### CI/CD Pipeline Integration

```python
import requests
import os
import sys
from datetime import datetime

class CIPipeline:
"""CI/CD pipeline notifier"""

def __init__(self, topic="ci-pipeline"):
self.topic = topic
self.base_url = "https://app.notifer.io"
self.build_start = datetime.now()

def notify(self, title, message, priority=3, tags=None):
"""Send CI notification"""
headers = {
"X-Title": title,
"X-Priority": str(priority),
"X-Tags": ",".join(tags or [])
}

try:
response = requests.post(
f"{self.base_url}/{self.topic}",
data=message,
headers=headers,
timeout=10
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Warning: Failed to send notification: {e}", file=sys.stderr)

def build_started(self, branch, commit):
"""Notify build started"""
message = f"""
**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Started:** {self.build_start.strftime('%Y-%m-%d %H:%M:%S')}
"""
self.notify(
title=f"Build Started: {branch}",
message=message,
priority=2,
tags=["build", "started", branch]
)

def build_success(self, branch, commit, tests_passed):
"""Notify build succeeded"""
duration = (datetime.now() - self.build_start).total_seconds()

message = f"""
## ✅ Build Successful

**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Duration:** {duration:.1f}s
**Tests:** {tests_passed} passed

[View Build Logs](https://ci.example.com/builds/latest)
"""
self.notify(
title=f"Build Success: {branch}",
message=message,
priority=3,
tags=["build", "success", branch]
)

def build_failed(self, branch, commit, error):
"""Notify build failed"""
duration = (datetime.now() - self.build_start).total_seconds()

message = f"""
## ❌ Build Failed

**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Duration:** {duration:.1f}s

**Error:**

{error}


[View Build Logs](https://ci.example.com/builds/latest)
"""
self.notify(
title=f"Build Failed: {branch}",
message=message,
priority=1,
tags=["build", "failed", branch]
)

# Usage in CI script
if __name__ == "__main__":
pipeline = CIPipeline()

branch = os.environ.get("GIT_BRANCH", "unknown")
commit = os.environ.get("GIT_COMMIT", "unknown")

pipeline.build_started(branch, commit)

try:
# Run tests, build, etc.
# ... your CI steps ...

pipeline.build_success(branch, commit, tests_passed=42)
sys.exit(0)
except Exception as e:
pipeline.build_failed(branch, commit, str(e))
sys.exit(1)

Backup Status Notifications

import requests
import subprocess
import os
from datetime import datetime

BACKUP_TOPIC = "database-backups"

def notify_backup_status(success, size_mb=None, duration_seconds=None, error=None):
"""Notify backup completion status"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

if success:
message = f"""
## ✅ Backup Completed Successfully

**Type:** Full database backup
**Size:** {size_mb:.2f} MB
**Duration:** {duration_seconds:.1f}s
**Location:** `s3://backups/db-{datetime.now().strftime('%Y-%m-%d')}.sql.gz`
**Completed:** {timestamp}

**Details:**
- Compression: gzip
- Verification: Passed ✅
"""
priority = 2
tags = ["backup", "success"]
else:
message = f"""
## ❌ Backup Failed

**Error:** {error}
**Time:** {timestamp}

**Action Required:**
1. Check backup logs
2. Verify disk space
3. Retry backup manually
"""
priority = 1
tags = ["backup", "failed", "critical"]

requests.post(
f"https://app.notifer.io/{BACKUP_TOPIC}",
data=message,
headers={
"X-Title": "Backup Completed" if success else "Backup Failed",
"X-Priority": str(priority),
"X-Tags": ",".join(tags)
}
)

# Usage in backup script
def run_backup():
"""Run database backup"""
start_time = datetime.now()

try:
# Run backup command
result = subprocess.run(
["pg_dump", "-h", "localhost", "-U", "postgres", "mydb"],
capture_output=True,
check=True
)

# Calculate metrics
duration = (datetime.now() - start_time).total_seconds()
size_mb = len(result.stdout) / 1024 / 1024

notify_backup_status(
success=True,
size_mb=size_mb,
duration_seconds=duration
)

except subprocess.CalledProcessError as e:
notify_backup_status(
success=False,
error=str(e)
)
raise

run_backup()

Scheduled Report Generation

import requests
from datetime import datetime, timedelta

def generate_daily_report():
"""Generate and send daily statistics report"""

# Fetch your application metrics
# This is example data - replace with your actual metrics
metrics = {
"users_today": 1234,
"users_change": "+5.2%",
"orders_today": 456,
"orders_change": "+12.1%",
"revenue_today": 12345.67,
"revenue_change": "+8.7%",
"errors_today": 23,
"errors_change": "-15.3%"
}

date_str = datetime.now().strftime('%Y-%m-%d')

message = f"""
## 📊 Daily Report - {date_str}

### Application Stats

| Metric | Value | Change |
|--------|-------|--------|
| Users | {metrics['users_today']} | {metrics['users_change']} ↗️ |
| Orders | {metrics['orders_today']} | {metrics['orders_change']} ↗️ |
| Revenue | ${metrics['revenue_today']:.2f} | {metrics['revenue_change']} ↗️ |
| Errors | {metrics['errors_today']} | {metrics['errors_change']} ↘️ |

---

### Top Performing Pages
1. Homepage - 5,432 views
2. Products - 3,210 views
3. Checkout - 1,890 views

### Issues
- ⚠️ Slow API response on checkout (avg 2.1s)
- ✅ Database backup completed
- ✅ SSL certificate renewed

[View Full Dashboard](https://analytics.example.com)
"""

requests.post(
"https://app.notifer.io/daily-reports",
data=message,
headers={
"X-Title": f"Daily Report - {date_str}",
"X-Priority": "2",
"X-Tags": "report,daily,analytics"
}
)

# Run as cron job: 0 9 * * * /usr/bin/python3 /path/to/report.py
if __name__ == "__main__":
generate_daily_report()

Error Handling

Basic Error Handling

import requests

def safe_publish(topic, message, **kwargs):
"""Publish with error handling"""
try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()
return response.json()

except requests.exceptions.Timeout:
print("Error: Request timeout")
return None

except requests.exceptions.ConnectionError:
print("Error: Connection failed")
return None

except requests.exceptions.HTTPError as e:
print(f"HTTP Error: {e.response.status_code} - {e.response.text}")
return None

except Exception as e:
print(f"Unexpected error: {e}")
return None

Retry with Exponential Backoff

import requests
import time

def publish_with_retry(topic, message, max_retries=3, **kwargs):
"""Publish with exponential backoff retry"""

for attempt in range(max_retries):
try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()
return response.json()

except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
# Last attempt failed
print(f"Failed after {max_retries} attempts: {e}")
raise

# Calculate backoff delay: 2^attempt seconds
delay = 2 ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
time.sleep(delay)

Best Practices

1. Use Environment Variables

import os
import requests

NOTIFER_TOPIC = os.getenv("NOTIFER_TOPIC", "default-topic")
NOTIFER_API_KEY = os.getenv("NOTIFER_API_KEY")

def publish(message, **kwargs):
"""Publish with environment-based config"""
headers = kwargs.get("headers", {})

if NOTIFER_API_KEY:
headers["Authorization"] = f"Bearer {NOTIFER_API_KEY}"

response = requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()

Environment file (.env):

NOTIFER_TOPIC=production-alerts
NOTIFER_API_KEY=your-api-key-here

2. Create Reusable Client Class

import requests
from typing import Optional, List

class Notifer:
"""Reusable Notifer client"""

def __init__(self, api_key: Optional[str] = None, base_url: str = "https://app.notifer.io"):
self.base_url = base_url
self.api_key = api_key
self.session = requests.Session()

if api_key:
self.session.headers["Authorization"] = f"Bearer {api_key}"

def publish(
self,
topic: str,
message: str,
title: Optional[str] = None,
priority: int = 3,
tags: Optional[List[str]] = None
) -> dict:
"""Publish message to topic"""

headers = {
"X-Priority": str(priority)
}

if title:
headers["X-Title"] = title

if tags:
headers["X-Tags"] = ",".join(tags)

response = self.session.post(
f"{self.base_url}/{topic}",
data=message,
headers=headers,
timeout=5
)
response.raise_for_status()
return response.json()

# Usage
client = Notifer(api_key="your-key")

client.publish(
topic="alerts",
title="Test Alert",
message="This is a test",
priority=2,
tags=["test", "demo"]
)

3. Logging Integration

import logging
import requests

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def publish(topic, message, **kwargs):
"""Publish with logging"""
logger.info(f"Publishing to topic '{topic}'")

try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()

result = response.json()
logger.info(f"Published successfully: {result['id']}")
return result

except requests.exceptions.RequestException as e:
logger.error(f"Failed to publish: {e}")
raise

4. Async Publishing (asyncio)

import aiohttp
import asyncio

async def publish_async(topic, message, **kwargs):
"""Async publish with aiohttp"""

async with aiohttp.ClientSession() as session:
try:
async with session.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=aiohttp.ClientTimeout(total=5)
) as response:
response.raise_for_status()
return await response.json()

except aiohttp.ClientError as e:
print(f"Error: {e}")
return None

# Usage
async def main():
result = await publish_async(
topic="async-topic",
message="Async message"
)
print(result)

asyncio.run(main())

Testing

Unit Tests

import unittest
from unittest.mock import patch, Mock
import requests

class TestNotiferPublishing(unittest.TestCase):
"""Unit tests for Notifer publishing"""

@patch('requests.post')
def test_publish_success(self, mock_post):
"""Test successful publish"""

# Mock response
mock_response = Mock()
mock_response.json.return_value = {
"id": "test-id",
"topic": "test-topic",
"message": "Test message"
}
mock_post.return_value = mock_response

# Call function
result = publish("test-topic", "Test message")

# Assertions
self.assertEqual(result["id"], "test-id")
mock_post.assert_called_once()

@patch('requests.post')
def test_publish_failure(self, mock_post):
"""Test publish failure"""

# Mock error
mock_post.side_effect = requests.exceptions.ConnectionError()

# Call should handle error
result = safe_publish("test-topic", "Test message")

# Assertions
self.assertIsNone(result)

if __name__ == "__main__":
unittest.main()

Next Steps


Pro Tip: Use environment variables for API keys and create a reusable client class for cleaner code! 🐍