Publishing from Python¶
Send notifications to Notifer topics using Python with the requests library.
Quick Start¶
Using requests Library¶
The simplest way to publish messages:
import requests
response = requests.post(
"https://app.notifer.io/my-topic",
data="Your message here"
)
print(response.json())
Output:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"topic": "my-topic",
"message": "Your message here",
"priority": 3,
"created_at": "2025-11-22T10:30:00Z"
}
Installation¶
Install requests¶
Or with virtual environment:
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install requests
Basic Publishing¶
Simple Message¶
import requests
def send_notification(topic, message):
"""Send simple notification to topic"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message
)
response.raise_for_status()
return response.json()
# Usage
result = send_notification("server-alerts", "Server is down!")
print(f"Message ID: {result['id']}")
With Title and Priority¶
import requests
def send_alert(topic, title, message, priority=3):
"""Send alert with title and priority"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers={
"X-Title": title,
"X-Priority": str(priority)
}
)
response.raise_for_status()
return response.json()
# Usage
send_alert(
topic="production-alerts",
title="Database Error",
message="Connection timeout on prod-db-01",
priority=5
)
With Tags¶
import requests
def send_tagged_message(topic, message, tags, priority=3):
"""Send message with tags"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers={
"X-Priority": str(priority),
"X-Tags": ",".join(tags)
}
)
response.raise_for_status()
return response.json()
# Usage
send_tagged_message(
topic="monitoring",
message="CPU usage at 95%",
tags=["warning", "cpu", "prod-web-01"],
priority=4
)
Advanced Features¶
Markdown Formatting¶
import requests
def send_markdown_message(topic, message, title=None, priority=3):
"""Send message with Markdown formatting"""
headers = {
"X-Markdown": "true",
"X-Priority": str(priority)
}
if title:
headers["X-Title"] = title
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()
# Usage
markdown_message = """
## Deployment Summary
**Status:** ✅ Success
**Version:** v2.1.0
**Duration:** 3m 45s
### Changes
- Added user authentication
- Fixed payment bug
- Updated dependencies
[View Release Notes](https://github.com/example/repo/releases/v2.1.0)
"""
send_markdown_message(
topic="deployments",
message=markdown_message,
title="Deploy v2.1.0",
priority=3
)
Private Topics with Authentication¶
import requests
class NotiferClient:
"""Client for authenticated publishing"""
def __init__(self, api_key=None, jwt_token=None):
self.base_url = "https://app.notifer.io"
self.headers = {}
if api_key:
self.headers["Authorization"] = f"Bearer {api_key}"
elif jwt_token:
self.headers["Authorization"] = f"Bearer {jwt_token}"
def publish(self, topic, message, title=None, priority=3, tags=None, markdown=False):
"""Publish message to private topic"""
headers = self.headers.copy()
headers["X-Priority"] = str(priority)
if title:
headers["X-Title"] = title
if tags:
headers["X-Tags"] = ",".join(tags)
if markdown:
headers["X-Markdown"] = "true"
response = requests.post(
f"{self.base_url}/{topic}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()
# Usage
client = NotiferClient(api_key="your-api-key-here")
client.publish(
topic="private-alerts",
title="Critical Issue",
message="Database connection lost",
priority=5,
tags=["critical", "database"]
)
Real-World Examples¶
Server Monitoring Script¶
import requests
import psutil
import time
NOTIFER_TOPIC = "server-monitoring"
CPU_THRESHOLD = 80
MEMORY_THRESHOLD = 85
def send_alert(title, message, priority=4):
"""Send monitoring alert"""
try:
requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers={
"X-Title": title,
"X-Priority": str(priority),
"X-Tags": "monitoring,server"
},
timeout=5
)
except requests.exceptions.RequestException as e:
print(f"Failed to send alert: {e}")
def check_system():
"""Check system resources"""
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
if cpu_percent > CPU_THRESHOLD:
send_alert(
title=f"High CPU Usage: {cpu_percent}%",
message=f"CPU usage is at {cpu_percent}% (threshold: {CPU_THRESHOLD}%)",
priority=4
)
if memory_percent > MEMORY_THRESHOLD:
send_alert(
title=f"High Memory Usage: {memory_percent}%",
message=f"Memory usage is at {memory_percent}% (threshold: {MEMORY_THRESHOLD}%)",
priority=4
)
# Run monitoring loop
while True:
check_system()
time.sleep(60) # Check every minute
Application Error Logging¶
import requests
import traceback
import sys
NOTIFER_TOPIC = "application-errors"
NOTIFER_API_KEY = "your-api-key"
def log_error(error, context=None):
"""Log application error to Notifer"""
error_type = type(error).__name__
error_message = str(error)
stack_trace = traceback.format_exc()
message = f"""
## {error_type}
**Error:** {error_message}
**Context:** {context or 'None'}
**Stack Trace:**
```python
{stack_trace}
try:
requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers={
"Authorization": f"Bearer {NOTIFER_API_KEY}",
"X-Title": f"Error: {error_type}",
"X-Priority": "5",
"X-Tags": f"error,{error_type.lower()}",
"X-Markdown": "true"
},
timeout=5
)
except:
# Don't let notification failure crash the app
print(f"Failed to log error to Notifer", file=sys.stderr)
Usage in exception handler¶
try: # Your application code result = 1 / 0 except Exception as e: log_error(e, context="division operation") raise
### CI/CD Pipeline Integration
```python
import requests
import os
import sys
from datetime import datetime
class CIPipeline:
"""CI/CD pipeline notifier"""
def __init__(self, topic="ci-pipeline"):
self.topic = topic
self.base_url = "https://app.notifer.io"
self.build_start = datetime.now()
def notify(self, title, message, priority=3, tags=None, markdown=True):
"""Send CI notification"""
headers = {
"X-Title": title,
"X-Priority": str(priority),
"X-Tags": ",".join(tags or []),
"X-Markdown": str(markdown).lower()
}
try:
response = requests.post(
f"{self.base_url}/{self.topic}",
data=message,
headers=headers,
timeout=10
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Warning: Failed to send notification: {e}", file=sys.stderr)
def build_started(self, branch, commit):
"""Notify build started"""
message = f"""
**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Started:** {self.build_start.strftime('%Y-%m-%d %H:%M:%S')}
"""
self.notify(
title=f"Build Started: {branch}",
message=message,
priority=2,
tags=["build", "started", branch]
)
def build_success(self, branch, commit, tests_passed):
"""Notify build succeeded"""
duration = (datetime.now() - self.build_start).total_seconds()
message = f"""
## ✅ Build Successful
**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Duration:** {duration:.1f}s
**Tests:** {tests_passed} passed
[View Build Logs](https://ci.example.com/builds/latest)
"""
self.notify(
title=f"Build Success: {branch}",
message=message,
priority=3,
tags=["build", "success", branch]
)
def build_failed(self, branch, commit, error):
"""Notify build failed"""
duration = (datetime.now() - self.build_start).total_seconds()
message = f"""
## ❌ Build Failed
**Branch:** `{branch}`
**Commit:** `{commit[:7]}`
**Duration:** {duration:.1f}s
**Error:**
[View Build Logs](https://ci.example.com/builds/latest)
"""
self.notify(
title=f"Build Failed: {branch}",
message=message,
priority=5,
tags=["build", "failed", branch]
)
# Usage in CI script
if __name__ == "__main__":
pipeline = CIPipeline()
branch = os.environ.get("GIT_BRANCH", "unknown")
commit = os.environ.get("GIT_COMMIT", "unknown")
pipeline.build_started(branch, commit)
try:
# Run tests, build, etc.
# ... your CI steps ...
pipeline.build_success(branch, commit, tests_passed=42)
sys.exit(0)
except Exception as e:
pipeline.build_failed(branch, commit, str(e))
sys.exit(1)
Backup Status Notifications¶
import requests
import subprocess
import os
from datetime import datetime
BACKUP_TOPIC = "database-backups"
def notify_backup_status(success, size_mb=None, duration_seconds=None, error=None):
"""Notify backup completion status"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if success:
message = f"""
## ✅ Backup Completed Successfully
**Type:** Full database backup
**Size:** {size_mb:.2f} MB
**Duration:** {duration_seconds:.1f}s
**Location:** `s3://backups/db-{datetime.now().strftime('%Y-%m-%d')}.sql.gz`
**Completed:** {timestamp}
**Details:**
- Compression: gzip
- Verification: Passed ✅
"""
priority = 2
tags = ["backup", "success"]
else:
message = f"""
## ❌ Backup Failed
**Error:** {error}
**Time:** {timestamp}
**Action Required:**
1. Check backup logs
2. Verify disk space
3. Retry backup manually
"""
priority = 5
tags = ["backup", "failed", "critical"]
requests.post(
f"https://app.notifer.io/{BACKUP_TOPIC}",
data=message,
headers={
"X-Title": "Backup Completed" if success else "Backup Failed",
"X-Priority": str(priority),
"X-Tags": ",".join(tags),
"X-Markdown": "true"
}
)
# Usage in backup script
def run_backup():
"""Run database backup"""
start_time = datetime.now()
try:
# Run backup command
result = subprocess.run(
["pg_dump", "-h", "localhost", "-U", "postgres", "mydb"],
capture_output=True,
check=True
)
# Calculate metrics
duration = (datetime.now() - start_time).total_seconds()
size_mb = len(result.stdout) / 1024 / 1024
notify_backup_status(
success=True,
size_mb=size_mb,
duration_seconds=duration
)
except subprocess.CalledProcessError as e:
notify_backup_status(
success=False,
error=str(e)
)
raise
run_backup()
Scheduled Report Generation¶
import requests
from datetime import datetime, timedelta
def generate_daily_report():
"""Generate and send daily statistics report"""
# Fetch your application metrics
# This is example data - replace with your actual metrics
metrics = {
"users_today": 1234,
"users_change": "+5.2%",
"orders_today": 456,
"orders_change": "+12.1%",
"revenue_today": 12345.67,
"revenue_change": "+8.7%",
"errors_today": 23,
"errors_change": "-15.3%"
}
date_str = datetime.now().strftime('%Y-%m-%d')
message = f"""
## 📊 Daily Report - {date_str}
### Application Stats
| Metric | Value | Change |
|--------|-------|--------|
| Users | {metrics['users_today']} | {metrics['users_change']} ↗️ |
| Orders | {metrics['orders_today']} | {metrics['orders_change']} ↗️ |
| Revenue | ${metrics['revenue_today']:.2f} | {metrics['revenue_change']} ↗️ |
| Errors | {metrics['errors_today']} | {metrics['errors_change']} ↘️ |
---
### Top Performing Pages
1. Homepage - 5,432 views
2. Products - 3,210 views
3. Checkout - 1,890 views
### Issues
- ⚠️ Slow API response on checkout (avg 2.1s)
- ✅ Database backup completed
- ✅ SSL certificate renewed
[View Full Dashboard](https://analytics.example.com)
"""
requests.post(
"https://app.notifer.io/daily-reports",
data=message,
headers={
"X-Title": f"Daily Report - {date_str}",
"X-Priority": "2",
"X-Tags": "report,daily,analytics",
"X-Markdown": "true"
}
)
# Run as cron job: 0 9 * * * /usr/bin/python3 /path/to/report.py
if __name__ == "__main__":
generate_daily_report()
Error Handling¶
Basic Error Handling¶
import requests
def safe_publish(topic, message, **kwargs):
"""Publish with error handling"""
try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
print("Error: Request timeout")
return None
except requests.exceptions.ConnectionError:
print("Error: Connection failed")
return None
except requests.exceptions.HTTPError as e:
print(f"HTTP Error: {e.response.status_code} - {e.response.text}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
Retry with Exponential Backoff¶
import requests
import time
def publish_with_retry(topic, message, max_retries=3, **kwargs):
"""Publish with exponential backoff retry"""
for attempt in range(max_retries):
try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
# Last attempt failed
print(f"Failed after {max_retries} attempts: {e}")
raise
# Calculate backoff delay: 2^attempt seconds
delay = 2 ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
time.sleep(delay)
Best Practices¶
1. Use Environment Variables¶
import os
import requests
NOTIFER_TOPIC = os.getenv("NOTIFER_TOPIC", "default-topic")
NOTIFER_API_KEY = os.getenv("NOTIFER_API_KEY")
def publish(message, **kwargs):
"""Publish with environment-based config"""
headers = kwargs.get("headers", {})
if NOTIFER_API_KEY:
headers["Authorization"] = f"Bearer {NOTIFER_API_KEY}"
response = requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()
Environment file (.env):
2. Create Reusable Client Class¶
import requests
from typing import Optional, List
class Notifer:
"""Reusable Notifer client"""
def __init__(self, api_key: Optional[str] = None, base_url: str = "https://app.notifer.io"):
self.base_url = base_url
self.api_key = api_key
self.session = requests.Session()
if api_key:
self.session.headers["Authorization"] = f"Bearer {api_key}"
def publish(
self,
topic: str,
message: str,
title: Optional[str] = None,
priority: int = 3,
tags: Optional[List[str]] = None,
markdown: bool = False
) -> dict:
"""Publish message to topic"""
headers = {
"X-Priority": str(priority)
}
if title:
headers["X-Title"] = title
if tags:
headers["X-Tags"] = ",".join(tags)
if markdown:
headers["X-Markdown"] = "true"
response = self.session.post(
f"{self.base_url}/{topic}",
data=message,
headers=headers,
timeout=5
)
response.raise_for_status()
return response.json()
# Usage
client = Notifer(api_key="your-key")
client.publish(
topic="alerts",
title="Test Alert",
message="This is a test",
priority=4,
tags=["test", "demo"]
)
3. Logging Integration¶
import logging
import requests
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def publish(topic, message, **kwargs):
"""Publish with logging"""
logger.info(f"Publishing to topic '{topic}'")
try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()
result = response.json()
logger.info(f"Published successfully: {result['id']}")
return result
except requests.exceptions.RequestException as e:
logger.error(f"Failed to publish: {e}")
raise
4. Async Publishing (asyncio)¶
import aiohttp
import asyncio
async def publish_async(topic, message, **kwargs):
"""Async publish with aiohttp"""
async with aiohttp.ClientSession() as session:
try:
async with session.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=aiohttp.ClientTimeout(total=5)
) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
print(f"Error: {e}")
return None
# Usage
async def main():
result = await publish_async(
topic="async-topic",
message="Async message"
)
print(result)
asyncio.run(main())
Testing¶
Unit Tests¶
import unittest
from unittest.mock import patch, Mock
import requests
class TestNotiferPublishing(unittest.TestCase):
"""Unit tests for Notifer publishing"""
@patch('requests.post')
def test_publish_success(self, mock_post):
"""Test successful publish"""
# Mock response
mock_response = Mock()
mock_response.json.return_value = {
"id": "test-id",
"topic": "test-topic",
"message": "Test message"
}
mock_post.return_value = mock_response
# Call function
result = publish("test-topic", "Test message")
# Assertions
self.assertEqual(result["id"], "test-id")
mock_post.assert_called_once()
@patch('requests.post')
def test_publish_failure(self, mock_post):
"""Test publish failure"""
# Mock error
mock_post.side_effect = requests.exceptions.ConnectionError()
# Call should handle error
result = safe_publish("test-topic", "Test message")
# Assertions
self.assertIsNone(result)
if __name__ == "__main__":
unittest.main()
Next Steps¶
- JavaScript Publishing - Publish from JavaScript/Node.js
- HTTP Publishing - Publish via cURL and HTTP
- API Reference - Complete API documentation
- SSE Guide - Subscribe to topics in Python
Pro Tip: Use environment variables for API keys and create a reusable client class for cleaner code! 🐍