Publishing from Python¶
Send notifications to Notifer topics using Python with the requests library or the official SDK (coming soon).
Quick Start¶
Using requests Library¶
The simplest way to publish messages:
import requests
response = requests.post(
"https://app.notifer.io/my-topic",
data="Your message here"
)
print(response.json())
Output:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"topic": "my-topic",
"message": "Your message here",
"priority": 3,
"created_at": "2025-11-22T10:30:00Z"
}
Installation¶
Install requests¶
Or with virtual environment:
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install requests
Official SDK (Coming Soon)¶
The official SDK will simplify authentication, retries, and error handling.
Basic Publishing¶
Simple Message¶
import requests
def send_notification(topic, message):
"""Send simple notification to topic"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message
)
response.raise_for_status()
return response.json()
# Usage
result = send_notification("server-alerts", "Server is down!")
print(f"Message ID: {result['id']}")
With Title and Priority¶
import requests
def send_alert(topic, title, message, priority=3):
"""Send alert with title and priority"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers={
"X-Title": title,
"X-Priority": str(priority)
}
)
response.raise_for_status()
return response.json()
# Usage
send_alert(
topic="production-alerts",
title="Database Error",
message="Connection timeout on prod-db-01",
priority=5
)
With Tags¶
import requests
def send_tagged_message(topic, message, tags, priority=3):
"""Send message with tags"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers={
"X-Priority": str(priority),
"X-Tags": ",".join(tags)
}
)
response.raise_for_status()
return response.json()
# Usage
send_tagged_message(
topic="monitoring",
message="CPU usage at 95%",
tags=["warning", "cpu", "prod-web-01"],
priority=4
)
Advanced Features¶
Markdown Formatting¶
import requests
def send_markdown_message(topic, message, title=None, priority=3):
"""Send message with Markdown formatting"""
headers = {
"X-Markdown": "true",
"X-Priority": str(priority)
}
if title:
headers["X-Title"] = title
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()
# Usage
markdown_message = """
## Deployment Summary
**Status:** ✅ Success
**Version:** v2.1.0
**Duration:** 3m 45s
### Changes
- Added user authentication
- Fixed payment bug
- Updated dependencies
[View Release Notes](https://github.com/example/repo/releases/v2.1.0)
"""
send_markdown_message(
topic="deployments",
message=markdown_message,
title="Deploy v2.1.0",
priority=3
)
Private Topics with Authentication¶
import requests
class NotiferClient:
"""Client for authenticated publishing"""
def __init__(self, api_key=None, jwt_token=None):
self.base_url = "https://app.notifer.io"
self.headers = {}
if api_key:
self.headers["Authorization"] = f"Bearer {api_key}"
elif jwt_token:
self.headers["Authorization"] = f"Bearer {jwt_token}"
def publish(self, topic, message, title=None, priority=3, tags=None, markdown=False):
"""Publish message to private topic"""
headers = self.headers.copy()
headers["X-Priority"] = str(priority)
if title:
headers["X-Title"] = title
if tags:
headers["X-Tags"] = ",".join(tags)
if markdown:
headers["X-Markdown"] = "true"
response = requests.post(
f"{self.base_url}/{topic}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()
# Usage
client = NotiferClient(api_key="your-api-key-here")
client.publish(
topic="private-alerts",
title="Critical Issue",
message="Database connection lost",
priority=5,
tags=["critical", "database"]
)
Real-World Examples¶
Server Monitoring Script¶
import requests
import psutil
import time
NOTIFER_TOPIC = "server-monitoring"
CPU_THRESHOLD = 80
MEMORY_THRESHOLD = 85
def send_alert(title, message, priority=4):
"""Send monitoring alert"""
try:
requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers={
"X-Title": title,
"X-Priority": str(priority),
"X-Tags": "monitoring,server"
},
timeout=5
)
except requests.exceptions.RequestException as e:
print(f"Failed to send alert: {e}")
def check_system():
"""Check system resources"""
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
if cpu_percent > CPU_THRESHOLD:
send_alert(
title=f"High CPU Usage: {cpu_percent}%",
message=f"CPU usage is at {cpu_percent}% (threshold: {CPU_THRESHOLD}%)",
priority=4
)
if memory_percent > MEMORY_THRESHOLD:
send_alert(
title=f"High Memory Usage: {memory_percent}%",
message=f"Memory usage is at {memory_percent}% (threshold: {MEMORY_THRESHOLD}%)",
priority=4
)
# Run monitoring loop
while True:
check_system()
time.sleep(60) # Check every minute
Application Error Logging¶
import requests
import traceback
import sys
NOTIFER_TOPIC = "application-errors"
NOTIFER_API_KEY = "your-api-key"
def log_error(error, context=None):
"""Log application error to Notifer"""
error_type = type(error).__name__
error_message = str(error)
stack_trace = traceback.format_exc()
message = f"""
## {error_type}
**Error:** {error_message}
**Context:** {context or 'None'}
**Stack Trace:**
```python
{stack_trace}
try:
requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers={
"Authorization": f"Bearer {NOTIFER_API_KEY}",
"X-Title": f"Error: {error_type}",
"X-Priority": "5",
"X-Tags": f"error,{error_type.lower()}",
"X-Markdown": "true"
},
timeout=5
)
except:
# Don't let notification failure crash the app
print(f"Failed to log error to Notifer", file=sys.stderr)
Usage in exception handler¶
try: # Your application code result = 1 / 0 except Exception as e: log_error(e, context="division operation") raise
### CI/CD Pipeline Integration
```python
import requests
import os
import sys
from datetime import datetime
class CIPipeline:
"""CI/CD pipeline notifier"""
def __init__(self, topic="ci-pipeline"):
self.topic = topic
self.base_url = "https://app.notifer.io"
self.build_start = datetime.now()
def notify(self, title, message, priority=3, tags=None, markdown=True):
"""Send CI notification"""
headers = {
"X-Title": title,
"X-Priority": str(priority),
"X-Tags": ",".join(tags or []),
"X-Markdown": str(markdown).lower()
}
try:
response = requests.post(
f"{self.base_url}/{self.topic}",
data=message,
headers=headers,
timeout=10
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Warning: Failed to send notification: {e}", file=sys.stderr)
def build_started(self, branch, commit):
"""Notify build started"""
message = f"""
**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Started:** {self.build_start.strftime('%Y-%m-%d %H:%M:%S')}
"""
self.notify(
title=f"Build Started: {branch}",
message=message,
priority=2,
tags=["build", "started", branch]
)
def build_success(self, branch, commit, tests_passed):
"""Notify build succeeded"""
duration = (datetime.now() - self.build_start).total_seconds()
message = f"""
## ✅ Build Successful
**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Duration:** {duration:.1f}s
**Tests:** {tests_passed} passed
[View Build Logs](https://ci.example.com/builds/latest)
"""
self.notify(
title=f"Build Success: {branch}",
message=message,
priority=3,
tags=["build", "success", branch]
)
def build_failed(self, branch, commit, error):
"""Notify build failed"""
duration = (datetime.now() - self.build_start).total_seconds()
message = f"""
## ❌ Build Failed
**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Duration:** {duration:.1f}s
**Error:**
[View Build Logs](https://ci.example.com/builds/latest)
"""
self.notify(
title=f"Build Failed: {branch}",
message=message,
priority=5,
tags=["build", "failed", branch]
)
# Usage in CI script
if __name__ == "__main__":
pipeline = CIPipeline()
branch = os.environ.get("GIT_BRANCH", "unknown")
commit = os.environ.get("GIT_COMMIT", "unknown")
pipeline.build_started(branch, commit)
try:
# Run tests, build, etc.
# ... your CI steps ...
pipeline.build_success(branch, commit, tests_passed=42)
sys.exit(0)
except Exception as e:
pipeline.build_failed(branch, commit, str(e))
sys.exit(1)
Backup Status Notifications¶
import requests
import subprocess
import os
from datetime import datetime
BACKUP_TOPIC = "database-backups"
def notify_backup_status(success, size_mb=None, duration_seconds=None, error=None):
"""Notify backup completion status"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if success:
message = f"""
## ✅ Backup Completed Successfully
**Type:** Full database backup
**Size:** {size_mb:.2f} MB
**Duration:** {duration_seconds:.1f}s
**Location:** `s3://backups/db-{datetime.now().strftime('%Y-%m-%d')}.sql.gz`
**Completed:** {timestamp}
**Details:**
- Compression: gzip
- Verification: Passed ✅
"""
priority = 2
tags = ["backup", "success"]
else:
message = f"""
## ❌ Backup Failed
**Error:** {error}
**Time:** {timestamp}
**Action Required:**
1. Check backup logs
2. Verify disk space
3. Retry backup manually
"""
priority = 5
tags = ["backup", "failed", "critical"]
requests.post(
f"https://app.notifer.io/{BACKUP_TOPIC}",
data=message,
headers={
"X-Title": "Backup Completed" if success else "Backup Failed",
"X-Priority": str(priority),
"X-Tags": ",".join(tags),
"X-Markdown": "true"
}
)
# Usage in backup script
def run_backup():
"""Run database backup"""
start_time = datetime.now()
try:
# Run backup command
result = subprocess.run(
["pg_dump", "-h", "localhost", "-U", "postgres", "mydb"],
capture_output=True,
check=True
)
# Calculate metrics
duration = (datetime.now() - start_time).total_seconds()
size_mb = len(result.stdout) / 1024 / 1024
notify_backup_status(
success=True,
size_mb=size_mb,
duration_seconds=duration
)
except subprocess.CalledProcessError as e:
notify_backup_status(
success=False,
error=str(e)
)
raise
run_backup()
Scheduled Report Generation¶
import requests
from datetime import datetime, timedelta
def generate_daily_report():
"""Generate and send daily statistics report"""
# Fetch your application metrics
# This is example data - replace with your actual metrics
metrics = {
"users_today": 1234,
"users_change": "+5.2%",
"orders_today": 456,
"orders_change": "+12.1%",
"revenue_today": 12345.67,
"revenue_change": "+8.7%",
"errors_today": 23,
"errors_change": "-15.3%"
}
date_str = datetime.now().strftime('%Y-%m-%d')
message = f"""
## 📊 Daily Report - {date_str}
### Application Stats
| Metric | Value | Change |
|--------|-------|--------|
| Users | {metrics['users_today']} | {metrics['users_change']} ↗️ |
| Orders | {metrics['orders_today']} | {metrics['orders_change']} ↗️ |
| Revenue | ${metrics['revenue_today']:.2f} | {metrics['revenue_change']} ↗️ |
| Errors | {metrics['errors_today']} | {metrics['errors_change']} ↘️ |
---
### Top Performing Pages
1. Homepage - 5,432 views
2. Products - 3,210 views
3. Checkout - 1,890 views
### Issues
- ⚠️ Slow API response on checkout (avg 2.1s)
- ✅ Database backup completed
- ✅ SSL certificate renewed
[View Full Dashboard](https://analytics.example.com)
"""
requests.post(
"https://app.notifer.io/daily-reports",
data=message,
headers={
"X-Title": f"Daily Report - {date_str}",
"X-Priority": "2",
"X-Tags": "report,daily,analytics",
"X-Markdown": "true"
}
)
# Run as cron job: 0 9 * * * /usr/bin/python3 /path/to/report.py
if __name__ == "__main__":
generate_daily_report()
Error Handling¶
Basic Error Handling¶
import requests
def safe_publish(topic, message, **kwargs):
"""Publish with error handling"""
try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
print("Error: Request timeout")
return None
except requests.exceptions.ConnectionError:
print("Error: Connection failed")
return None
except requests.exceptions.HTTPError as e:
print(f"HTTP Error: {e.response.status_code} - {e.response.text}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
Retry with Exponential Backoff¶
import requests
import time
def publish_with_retry(topic, message, max_retries=3, **kwargs):
"""Publish with exponential backoff retry"""
for attempt in range(max_retries):
try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
# Last attempt failed
print(f"Failed after {max_retries} attempts: {e}")
raise
# Calculate backoff delay: 2^attempt seconds
delay = 2 ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
time.sleep(delay)
Best Practices¶
1. Use Environment Variables¶
import os
import requests
NOTIFER_TOPIC = os.getenv("NOTIFER_TOPIC", "default-topic")
NOTIFER_API_KEY = os.getenv("NOTIFER_API_KEY")
def publish(message, **kwargs):
"""Publish with environment-based config"""
headers = kwargs.get("headers", {})
if NOTIFER_API_KEY:
headers["Authorization"] = f"Bearer {NOTIFER_API_KEY}"
response = requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()
Environment file (.env):
2. Create Reusable Client Class¶
import requests
from typing import Optional, List
class Notifer:
"""Reusable Notifer client"""
def __init__(self, api_key: Optional[str] = None, base_url: str = "https://app.notifer.io"):
self.base_url = base_url
self.api_key = api_key
self.session = requests.Session()
if api_key:
self.session.headers["Authorization"] = f"Bearer {api_key}"
def publish(
self,
topic: str,
message: str,
title: Optional[str] = None,
priority: int = 3,
tags: Optional[List[str]] = None,
markdown: bool = False
) -> dict:
"""Publish message to topic"""
headers = {
"X-Priority": str(priority)
}
if title:
headers["X-Title"] = title
if tags:
headers["X-Tags"] = ",".join(tags)
if markdown:
headers["X-Markdown"] = "true"
response = self.session.post(
f"{self.base_url}/{topic}",
data=message,
headers=headers,
timeout=5
)
response.raise_for_status()
return response.json()
# Usage
client = Notifer(api_key="your-key")
client.publish(
topic="alerts",
title="Test Alert",
message="This is a test",
priority=4,
tags=["test", "demo"]
)
3. Logging Integration¶
import logging
import requests
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def publish(topic, message, **kwargs):
"""Publish with logging"""
logger.info(f"Publishing to topic '{topic}'")
try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()
result = response.json()
logger.info(f"Published successfully: {result['id']}")
return result
except requests.exceptions.RequestException as e:
logger.error(f"Failed to publish: {e}")
raise
4. Async Publishing (asyncio)¶
import aiohttp
import asyncio
async def publish_async(topic, message, **kwargs):
"""Async publish with aiohttp"""
async with aiohttp.ClientSession() as session:
try:
async with session.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=aiohttp.ClientTimeout(total=5)
) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
print(f"Error: {e}")
return None
# Usage
async def main():
result = await publish_async(
topic="async-topic",
message="Async message"
)
print(result)
asyncio.run(main())
Testing¶
Unit Tests¶
import unittest
from unittest.mock import patch, Mock
import requests
class TestNotiferPublishing(unittest.TestCase):
"""Unit tests for Notifer publishing"""
@patch('requests.post')
def test_publish_success(self, mock_post):
"""Test successful publish"""
# Mock response
mock_response = Mock()
mock_response.json.return_value = {
"id": "test-id",
"topic": "test-topic",
"message": "Test message"
}
mock_post.return_value = mock_response
# Call function
result = publish("test-topic", "Test message")
# Assertions
self.assertEqual(result["id"], "test-id")
mock_post.assert_called_once()
@patch('requests.post')
def test_publish_failure(self, mock_post):
"""Test publish failure"""
# Mock error
mock_post.side_effect = requests.exceptions.ConnectionError()
# Call should handle error
result = safe_publish("test-topic", "Test message")
# Assertions
self.assertIsNone(result)
if __name__ == "__main__":
unittest.main()
Next Steps¶
- JavaScript Publishing - Publish from JavaScript/Node.js
- HTTP Publishing - Publish via cURL and HTTP
- API Reference - Complete API documentation
- SSE Guide - Subscribe to topics in Python
Pro Tip: Use environment variables for API keys and create a reusable client class for cleaner code! 🐍