Przejdź do głównej zawartości

Publikowanie z Python

Wysyłaj powiadomienia do tematów Notifer używając Python z biblioteką requests.

Szybki start

Używanie biblioteki requests

Najprostszy sposób publikowania wiadomości:

import requests

response = requests.post(
"https://app.notifer.io/my-topic",
data="Twoja wiadomość tutaj"
)

print(response.json())

Wynik:

{
"id": "550e8400-e29b-41d4-a716-446655440000",
"topic": "my-topic",
"message": "Twoja wiadomość tutaj",
"title": null,
"timestamp": "2025-11-22T10:30:00Z",
"priority": 3,
"tags": []
}

Instalacja

Instalacja requests

pip install requests

Lub ze środowiskiem wirtualnym:

python -m venv venv
source venv/bin/activate # W Windows: venv\Scripts\activate
pip install requests

Podstawowe publikowanie

Prosta wiadomość

import requests

def send_notification(topic, message):
"""Wyślij proste powiadomienie do tematu"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message
)
response.raise_for_status()
return response.json()

# Użycie
result = send_notification("server-alerts", "Serwer nie działa!")
print(f"Message ID: {result['id']}")

Z tytułem i priorytetem

import requests

def send_alert(topic, title, message, priority=3):
"""Wyślij alert z tytułem i priorytetem"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers={
"X-Title": title,
"X-Priority": str(priority)
}
)
response.raise_for_status()
return response.json()

# Użycie
send_alert(
topic="production-alerts",
title="Błąd bazy danych",
message="Przekroczono limit czasu połączenia na prod-db-01",
priority=1
)

Z tagami

import requests

def send_tagged_message(topic, message, tags, priority=3):
"""Wyślij wiadomość z tagami"""
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers={
"X-Priority": str(priority),
"X-Tags": ",".join(tags)
}
)
response.raise_for_status()
return response.json()

# Użycie
send_tagged_message(
topic="monitoring",
message="Użycie CPU na poziomie 95%",
tags=["warning", "cpu", "prod-web-01"],
priority=2
)

Funkcje zaawansowane

Formatowanie Markdown

import requests

def send_markdown_message(topic, message, title=None, priority=3):
"""Wyślij wiadomość z formatowaniem Markdown"""
headers = {
"X-Priority": str(priority)
}

if title:
headers["X-Title"] = title

response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()

# Użycie
markdown_message = """
## Podsumowanie wdrożenia

**Status:** ✅ Sukces
**Wersja:** v2.1.0
**Czas trwania:** 3m 45s

### Zmiany
- Dodano uwierzytelnianie użytkownika
- Naprawiono błąd płatności
- Zaktualizowano zależności

[Zobacz informacje o wydaniu](https://github.com/example/repo/releases/v2.1.0)
"""

send_markdown_message(
topic="deployments",
message=markdown_message,
title="Deploy v2.1.0",
priority=3
)

Prywatne tematy z uwierzytelnianiem

import requests

class NotiferClient:
"""Klient dla uwierzytelnionego publikowania"""

def __init__(self, api_key=None, jwt_token=None):
self.base_url = "https://app.notifer.io"
self.headers = {}

if api_key:
self.headers["Authorization"] = f"Bearer {api_key}"
elif jwt_token:
self.headers["Authorization"] = f"Bearer {jwt_token}"

def publish(self, topic, message, title=None, priority=3, tags=None):
"""Publikuj wiadomość do prywatnego tematu"""
headers = self.headers.copy()
headers["X-Priority"] = str(priority)

if title:
headers["X-Title"] = title

if tags:
headers["X-Tags"] = ",".join(tags)

response = requests.post(
f"{self.base_url}/{topic}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()

# Użycie
client = NotiferClient(api_key="your-api-key-here")

client.publish(
topic="private-alerts",
title="Krytyczny problem",
message="Utracono połączenie z bazą danych",
priority=1,
tags=["critical", "database"]
)

Przykłady z rzeczywistości

Skrypt monitorowania serwera

import requests
import psutil
import time

NOTIFER_TOPIC = "server-monitoring"
CPU_THRESHOLD = 80
MEMORY_THRESHOLD = 85

def send_alert(title, message, priority=2):
"""Wyślij alert monitorowania"""
try:
requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers={
"X-Title": title,
"X-Priority": str(priority),
"X-Tags": "monitoring,server"
},
timeout=5
)
except requests.exceptions.RequestException as e:
print(f"Nie udało się wysłać alertu: {e}")

def check_system():
"""Sprawdź zasoby systemowe"""
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent

if cpu_percent > CPU_THRESHOLD:
send_alert(
title=f"Wysokie użycie CPU: {cpu_percent}%",
message=f"Użycie CPU wynosi {cpu_percent}% (próg: {CPU_THRESHOLD}%)",
priority=2
)

if memory_percent > MEMORY_THRESHOLD:
send_alert(
title=f"Wysokie użycie pamięci: {memory_percent}%",
message=f"Użycie pamięci wynosi {memory_percent}% (próg: {MEMORY_THRESHOLD}%)",
priority=2
)

# Uruchom pętlę monitorowania
while True:
check_system()
time.sleep(60) # Sprawdzaj co minutę

Logowanie błędów aplikacji

import requests
import traceback
import sys

NOTIFER_TOPIC = "application-errors"
NOTIFER_API_KEY = "your-api-key"

def log_error(error, context=None):
"""Zapisz błąd aplikacji w Notifer"""
error_type = type(error).__name__
error_message = str(error)
stack_trace = traceback.format_exc()

message = f"""
## {error_type}

**Błąd:** {error_message}

**Kontekst:** {context or 'None'}

**Stack Trace:**
```python
{stack_trace}

"""

try:
requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers={
"Authorization": f"Bearer {NOTIFER_API_KEY}",
"X-Title": f"Error: {error_type}",
"X-Priority": "1",
"X-Tags": f"error,{error_type.lower()}"
},
timeout=5
)
except:
# Nie pozwól, aby błąd powiadomienia zepsuł aplikację
print(f"Nie udało się zapisać błędu w Notifer", file=sys.stderr)

Użycie w obsłudze wyjątków

try: # Kod aplikacji result = 1 / 0 except Exception as e: log_error(e, context="division operation") raise


### Integracja pipeline CI/CD

```python
import requests
import os
import sys
from datetime import datetime

class CIPipeline:
"""Powiadamiacz pipeline CI/CD"""

def __init__(self, topic="ci-pipeline"):
self.topic = topic
self.base_url = "https://app.notifer.io"
self.build_start = datetime.now()

def notify(self, title, message, priority=3, tags=None):
"""Wyślij powiadomienie CI"""
headers = {
"X-Title": title,
"X-Priority": str(priority),
"X-Tags": ",".join(tags or [])
}

try:
response = requests.post(
f"{self.base_url}/{self.topic}",
data=message,
headers=headers,
timeout=10
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Ostrzeżenie: Nie udało się wysłać powiadomienia: {e}", file=sys.stderr)

def build_started(self, branch, commit):
"""Powiadom o rozpoczęciu budowania"""
message = f"""
**Gałąź:** `{branch}`
**Commit:** `{commit[:7]}`
**Rozpoczęto:** {self.build_start.strftime('%Y-%m-%d %H:%M:%S')}
"""
self.notify(
title=f"Rozpoczęto budowanie: {branch}",
message=message,
priority=2,
tags=["build", "started", branch]
)

def build_success(self, branch, commit, tests_passed):
"""Powiadom o sukcesie budowania"""
duration = (datetime.now() - self.build_start).total_seconds()

message = f"""
## ✅ Budowanie zakończone sukcesem

**Gałąź:** `{branch}`
**Commit:** `{commit[:7]}`
**Czas trwania:** {duration:.1f}s
**Testy:** {tests_passed} zaliczonych

[Zobacz logi budowania](https://ci.example.com/builds/latest)
"""
self.notify(
title=f"Sukces budowania: {branch}",
message=message,
priority=3,
tags=["build", "success", branch]
)

def build_failed(self, branch, commit, error):
"""Powiadom o niepowodzeniu budowania"""
duration = (datetime.now() - self.build_start).total_seconds()

message = f"""
## ❌ Budowanie nie powiodło się

**Gałąź:** `{branch}`
**Commit:** `{commit[:7]}`
**Czas trwania:** {duration:.1f}s

**Błąd:**

{error}


[Zobacz logi budowania](https://ci.example.com/builds/latest)
"""
self.notify(
title=f"Niepowodzenie budowania: {branch}",
message=message,
priority=1,
tags=["build", "failed", branch]
)

# Użycie w skrypcie CI
if __name__ == "__main__":
pipeline = CIPipeline()

branch = os.environ.get("GIT_BRANCH", "unknown")
commit = os.environ.get("GIT_COMMIT", "unknown")

pipeline.build_started(branch, commit)

try:
# Uruchom testy, budowanie itp.
# ... twoje kroki CI ...

pipeline.build_success(branch, commit, tests_passed=42)
sys.exit(0)
except Exception as e:
pipeline.build_failed(branch, commit, str(e))
sys.exit(1)

Powiadomienia o statusie kopii zapasowej

import requests
import subprocess
import os
from datetime import datetime

BACKUP_TOPIC = "database-backups"

def notify_backup_status(success, size_mb=None, duration_seconds=None, error=None):
"""Powiadom o statusie ukończenia kopii zapasowej"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

if success:
message = f"""
## ✅ Kopia zapasowa zakończona pomyślnie

**Typ:** Pełna kopia zapasowa bazy danych
**Rozmiar:** {size_mb:.2f} MB
**Czas trwania:** {duration_seconds:.1f}s
**Lokalizacja:** `s3://backups/db-{datetime.now().strftime('%Y-%m-%d')}.sql.gz`
**Ukończono:** {timestamp}

**Szczegóły:**
- Kompresja: gzip
- Weryfikacja: Zaliczona ✅
"""
priority = 2
tags = ["backup", "success"]
else:
message = f"""
## ❌ Kopia zapasowa nie powiodła się

**Błąd:** {error}
**Czas:** {timestamp}

**Wymagane działania:**
1. Sprawdź logi kopii zapasowej
2. Zweryfikuj miejsce na dysku
3. Ponów kopię zapasową ręcznie
"""
priority = 1
tags = ["backup", "failed", "critical"]

requests.post(
f"https://app.notifer.io/{BACKUP_TOPIC}",
data=message,
headers={
"X-Title": "Kopia zapasowa ukończona" if success else "Kopia zapasowa nie powiodła się",
"X-Priority": str(priority),
"X-Tags": ",".join(tags)
}
)

# Użycie w skrypcie kopii zapasowej
def run_backup():
"""Uruchom kopię zapasową bazy danych"""
start_time = datetime.now()

try:
# Uruchom polecenie kopii zapasowej
result = subprocess.run(
["pg_dump", "-h", "localhost", "-U", "postgres", "mydb"],
capture_output=True,
check=True
)

# Oblicz metryki
duration = (datetime.now() - start_time).total_seconds()
size_mb = len(result.stdout) / 1024 / 1024

notify_backup_status(
success=True,
size_mb=size_mb,
duration_seconds=duration
)

except subprocess.CalledProcessError as e:
notify_backup_status(
success=False,
error=str(e)
)
raise

run_backup()

Generowanie zaplanowanych raportów

import requests
from datetime import datetime, timedelta

def generate_daily_report():
"""Wygeneruj i wyślij raport statystyk dziennych"""

# Pobierz metryki aplikacji
# To są przykładowe dane - zastąp własnymi metrykami
metrics = {
"users_today": 1234,
"users_change": "+5.2%",
"orders_today": 456,
"orders_change": "+12.1%",
"revenue_today": 12345.67,
"revenue_change": "+8.7%",
"errors_today": 23,
"errors_change": "-15.3%"
}

date_str = datetime.now().strftime('%Y-%m-%d')

message = f"""
## 📊 Raport dzienny - {date_str}

### Statystyki aplikacji

| Metryka | Wartość | Zmiana |
|--------|-------|--------|
| Użytkownicy | {metrics['users_today']} | {metrics['users_change']} ↗️ |
| Zamówienia | {metrics['orders_today']} | {metrics['orders_change']} ↗️ |
| Przychód | ${metrics['revenue_today']:.2f} | {metrics['revenue_change']} ↗️ |
| Błędy | {metrics['errors_today']} | {metrics['errors_change']} ↘️ |

---

### Najpopularniejsze strony
1. Strona główna - 5 432 wyświetleń
2. Produkty - 3 210 wyświetleń
3. Kasa - 1 890 wyświetleń

### Problemy
- ⚠️ Wolna odpowiedź API na kasie (średnio 2,1s)
- ✅ Kopia zapasowa bazy danych ukończona
- ✅ Certyfikat SSL odnowiony

[Zobacz pełny pulpit](https://analytics.example.com)
"""

requests.post(
"https://app.notifer.io/daily-reports",
data=message,
headers={
"X-Title": f"Raport dzienny - {date_str}",
"X-Priority": "2",
"X-Tags": "report,daily,analytics"
}
)

# Uruchom jako zadanie cron: 0 9 * * * /usr/bin/python3 /path/to/report.py
if __name__ == "__main__":
generate_daily_report()

Obsługa błędów

Podstawowa obsługa błędów

import requests

def safe_publish(topic, message, **kwargs):
"""Publikuj z obsługą błędów"""
try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()
return response.json()

except requests.exceptions.Timeout:
print("Błąd: Przekroczono limit czasu żądania")
return None

except requests.exceptions.ConnectionError:
print("Błąd: Połączenie nie powiodło się")
return None

except requests.exceptions.HTTPError as e:
print(f"Błąd HTTP: {e.response.status_code} - {e.response.text}")
return None

except Exception as e:
print(f"Nieoczekiwany błąd: {e}")
return None

Ponawianie z wykładniczym backoff

import requests
import time

def publish_with_retry(topic, message, max_retries=3, **kwargs):
"""Publikuj z ponowieniem z wykładniczym backoff"""

for attempt in range(max_retries):
try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()
return response.json()

except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
# Ostatnia próba się nie powiodła
print(f"Niepowodzenie po {max_retries} próbach: {e}")
raise

# Oblicz opóźnienie backoff: 2^attempt sekund
delay = 2 ** attempt
print(f"Próba {attempt + 1} nie powiodła się, ponowienie za {delay}s...")
time.sleep(delay)

Dobre praktyki

1. Używaj zmiennych środowiskowych

import os
import requests

NOTIFER_TOPIC = os.getenv("NOTIFER_TOPIC", "default-topic")
NOTIFER_API_KEY = os.getenv("NOTIFER_API_KEY")

def publish(message, **kwargs):
"""Publikuj z konfiguracją opartą na środowisku"""
headers = kwargs.get("headers", {})

if NOTIFER_API_KEY:
headers["Authorization"] = f"Bearer {NOTIFER_API_KEY}"

response = requests.post(
f"https://app.notifer.io/{NOTIFER_TOPIC}",
data=message,
headers=headers
)
response.raise_for_status()
return response.json()

Plik środowiskowy (.env):

NOTIFER_TOPIC=production-alerts
NOTIFER_API_KEY=your-api-key-here

2. Utwórz klasę klienta wielokrotnego użytku

import requests
from typing import Optional, List

class Notifer:
"""Klient Notifer wielokrotnego użytku"""

def __init__(self, api_key: Optional[str] = None, base_url: str = "https://app.notifer.io"):
self.base_url = base_url
self.api_key = api_key
self.session = requests.Session()

if api_key:
self.session.headers["Authorization"] = f"Bearer {api_key}"

def publish(
self,
topic: str,
message: str,
title: Optional[str] = None,
priority: int = 3,
tags: Optional[List[str]] = None
) -> dict:
"""Publikuj wiadomość do tematu"""

headers = {
"X-Priority": str(priority)
}

if title:
headers["X-Title"] = title

if tags:
headers["X-Tags"] = ",".join(tags)

response = self.session.post(
f"{self.base_url}/{topic}",
data=message,
headers=headers,
timeout=5
)
response.raise_for_status()
return response.json()

# Użycie
client = Notifer(api_key="your-key")

client.publish(
topic="alerts",
title="Alert testowy",
message="To jest test",
priority=2,
tags=["test", "demo"]
)

3. Integracja logowania

import logging
import requests

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def publish(topic, message, **kwargs):
"""Publikuj z logowaniem"""
logger.info(f"Publikowanie do tematu '{topic}'")

try:
response = requests.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=5
)
response.raise_for_status()

result = response.json()
logger.info(f"Opublikowano pomyślnie: {result['id']}")
return result

except requests.exceptions.RequestException as e:
logger.error(f"Nie udało się opublikować: {e}")
raise

4. Publikowanie asynchroniczne (asyncio)

import aiohttp
import asyncio

async def publish_async(topic, message, **kwargs):
"""Asynchroniczne publikowanie z aiohttp"""

async with aiohttp.ClientSession() as session:
try:
async with session.post(
f"https://app.notifer.io/{topic}",
data=message,
headers=kwargs.get("headers", {}),
timeout=aiohttp.ClientTimeout(total=5)
) as response:
response.raise_for_status()
return await response.json()

except aiohttp.ClientError as e:
print(f"Błąd: {e}")
return None

# Użycie
async def main():
result = await publish_async(
topic="async-topic",
message="Wiadomość asynchroniczna"
)
print(result)

asyncio.run(main())

Testowanie

Testy jednostkowe

import unittest
from unittest.mock import patch, Mock
import requests

class TestNotiferPublishing(unittest.TestCase):
"""Testy jednostkowe dla publikowania Notifer"""

@patch('requests.post')
def test_publish_success(self, mock_post):
"""Test pomyślnego publikowania"""

# Mock odpowiedzi
mock_response = Mock()
mock_response.json.return_value = {
"id": "test-id",
"topic": "test-topic",
"message": "Test message"
}
mock_post.return_value = mock_response

# Wywołaj funkcję
result = publish("test-topic", "Test message")

# Asercje
self.assertEqual(result["id"], "test-id")
mock_post.assert_called_once()

@patch('requests.post')
def test_publish_failure(self, mock_post):
"""Test niepowodzenia publikowania"""

# Mock błędu
mock_post.side_effect = requests.exceptions.ConnectionError()

# Wywołanie powinno obsłużyć błąd
result = safe_publish("test-topic", "Test message")

# Asercje
self.assertIsNone(result)

if __name__ == "__main__":
unittest.main()

Następne kroki


Wskazówka: Używaj zmiennych środowiskowych dla kluczy API i twórz klasę klienta wielokrotnego użytku dla czystszego kodu!