Przejdź do treści

Publikowanie z Python

Wysyłaj powiadomienia do tematów Notifer używając Python z biblioteką requests.

Szybki start

Używanie biblioteki requests

Najprostszy sposób publikowania wiadomości:

import requests

response = requests.post(
    "https://app.notifer.io/my-topic",
    data="Twoja wiadomość tutaj"
)

print(response.json())

Wynik:

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "topic": "my-topic",
  "message": "Twoja wiadomość tutaj",
  "priority": 3,
  "created_at": "2025-11-22T10:30:00Z"
}

Instalacja

Instalacja requests

pip install requests

Lub ze środowiskiem wirtualnym:

python -m venv venv
source venv/bin/activate  # W Windows: venv\Scripts\activate
pip install requests

Podstawowe publikowanie

Prosta wiadomość

import requests

def send_notification(topic, message):
    """Wyślij proste powiadomienie do tematu"""
    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message
    )
    response.raise_for_status()
    return response.json()

# Użycie
result = send_notification("server-alerts", "Serwer nie działa!")
print(f"Message ID: {result['id']}")

Z tytułem i priorytetem

import requests

def send_alert(topic, title, message, priority=3):
    """Wyślij alert z tytułem i priorytetem"""
    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message,
        headers={
            "X-Title": title,
            "X-Priority": str(priority)
        }
    )
    response.raise_for_status()
    return response.json()

# Użycie
send_alert(
    topic="production-alerts",
    title="Błąd bazy danych",
    message="Przekroczono limit czasu połączenia na prod-db-01",
    priority=5
)

Z tagami

import requests

def send_tagged_message(topic, message, tags, priority=3):
    """Wyślij wiadomość z tagami"""
    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message,
        headers={
            "X-Priority": str(priority),
            "X-Tags": ",".join(tags)
        }
    )
    response.raise_for_status()
    return response.json()

# Użycie
send_tagged_message(
    topic="monitoring",
    message="Użycie CPU na poziomie 95%",
    tags=["warning", "cpu", "prod-web-01"],
    priority=4
)

Funkcje zaawansowane

Formatowanie Markdown

import requests

def send_markdown_message(topic, message, title=None, priority=3):
    """Wyślij wiadomość z formatowaniem Markdown"""
    headers = {
        "X-Markdown": "true",
        "X-Priority": str(priority)
    }

    if title:
        headers["X-Title"] = title

    response = requests.post(
        f"https://app.notifer.io/{topic}",
        data=message,
        headers=headers
    )
    response.raise_for_status()
    return response.json()

# Użycie
markdown_message = """
## Podsumowanie wdrożenia

**Status:** ✅ Sukces
**Wersja:** v2.1.0
**Czas trwania:** 3m 45s

### Zmiany
- Dodano uwierzytelnianie użytkownika
- Naprawiono błąd płatności
- Zaktualizowano zależności

[Zobacz informacje o wydaniu](https://github.com/example/repo/releases/v2.1.0)
"""

send_markdown_message(
    topic="deployments",
    message=markdown_message,
    title="Deploy v2.1.0",
    priority=3
)

Prywatne tematy z uwierzytelnianiem

import requests

class NotiferClient:
    """Klient dla uwierzytelnionego publikowania"""

    def __init__(self, api_key=None, jwt_token=None):
        self.base_url = "https://app.notifer.io"
        self.headers = {}

        if api_key:
            self.headers["Authorization"] = f"Bearer {api_key}"
        elif jwt_token:
            self.headers["Authorization"] = f"Bearer {jwt_token}"

    def publish(self, topic, message, title=None, priority=3, tags=None, markdown=False):
        """Publikuj wiadomość do prywatnego tematu"""
        headers = self.headers.copy()
        headers["X-Priority"] = str(priority)

        if title:
            headers["X-Title"] = title

        if tags:
            headers["X-Tags"] = ",".join(tags)

        if markdown:
            headers["X-Markdown"] = "true"

        response = requests.post(
            f"{self.base_url}/{topic}",
            data=message,
            headers=headers
        )
        response.raise_for_status()
        return response.json()

# Użycie
client = NotiferClient(api_key="your-api-key-here")

client.publish(
    topic="private-alerts",
    title="Krytyczny problem",
    message="Utracono połączenie z bazą danych",
    priority=5,
    tags=["critical", "database"]
)

Przykłady z rzeczywistości

Skrypt monitorowania serwera

import requests
import psutil
import time

NOTIFER_TOPIC = "server-monitoring"
CPU_THRESHOLD = 80
MEMORY_THRESHOLD = 85

def send_alert(title, message, priority=4):
    """Wyślij alert monitorowania"""
    try:
        requests.post(
            f"https://app.notifer.io/{NOTIFER_TOPIC}",
            data=message,
            headers={
                "X-Title": title,
                "X-Priority": str(priority),
                "X-Tags": "monitoring,server"
            },
            timeout=5
        )
    except requests.exceptions.RequestException as e:
        print(f"Nie udało się wysłać alertu: {e}")

def check_system():
    """Sprawdź zasoby systemowe"""
    cpu_percent = psutil.cpu_percent(interval=1)
    memory_percent = psutil.virtual_memory().percent

    if cpu_percent > CPU_THRESHOLD:
        send_alert(
            title=f"Wysokie użycie CPU: {cpu_percent}%",
            message=f"Użycie CPU wynosi {cpu_percent}% (próg: {CPU_THRESHOLD}%)",
            priority=4
        )

    if memory_percent > MEMORY_THRESHOLD:
        send_alert(
            title=f"Wysokie użycie pamięci: {memory_percent}%",
            message=f"Użycie pamięci wynosi {memory_percent}% (próg: {MEMORY_THRESHOLD}%)",
            priority=4
        )

# Uruchom pętlę monitorowania
while True:
    check_system()
    time.sleep(60)  # Sprawdzaj co minutę

Logowanie błędów aplikacji

import requests
import traceback
import sys

NOTIFER_TOPIC = "application-errors"
NOTIFER_API_KEY = "your-api-key"

def log_error(error, context=None):
    """Zapisz błąd aplikacji w Notifer"""
    error_type = type(error).__name__
    error_message = str(error)
    stack_trace = traceback.format_exc()

    message = f"""
## {error_type}

**Błąd:** {error_message}

**Kontekst:** {context or 'None'}

**Stack Trace:**
```python
{stack_trace}
"""

try:
    requests.post(
        f"https://app.notifer.io/{NOTIFER_TOPIC}",
        data=message,
        headers={
            "Authorization": f"Bearer {NOTIFER_API_KEY}",
            "X-Title": f"Error: {error_type}",
            "X-Priority": "5",
            "X-Tags": f"error,{error_type.lower()}",
            "X-Markdown": "true"
        },
        timeout=5
    )
except:
    # Nie pozwól, aby błąd powiadomienia zepsuł aplikację
    print(f"Nie udało się zapisać błędu w Notifer", file=sys.stderr)

Użycie w obsłudze wyjątków

try: # Kod aplikacji result = 1 / 0 except Exception as e: log_error(e, context="division operation") raise

### Integracja pipeline CI/CD

```python
import requests
import os
import sys
from datetime import datetime

class CIPipeline:
    """Powiadamiacz pipeline CI/CD"""

    def __init__(self, topic="ci-pipeline"):
        self.topic = topic
        self.base_url = "https://app.notifer.io"
        self.build_start = datetime.now()

    def notify(self, title, message, priority=3, tags=None, markdown=True):
        """Wyślij powiadomienie CI"""
        headers = {
            "X-Title": title,
            "X-Priority": str(priority),
            "X-Tags": ",".join(tags or []),
            "X-Markdown": str(markdown).lower()
        }

        try:
            response = requests.post(
                f"{self.base_url}/{self.topic}",
                data=message,
                headers=headers,
                timeout=10
            )
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            print(f"Ostrzeżenie: Nie udało się wysłać powiadomienia: {e}", file=sys.stderr)

    def build_started(self, branch, commit):
        """Powiadom o rozpoczęciu budowania"""
        message = f"""
**Gałąź:** `{branch}`
**Commit:** `{commit[:7]}`
**Rozpoczęto:** {self.build_start.strftime('%Y-%m-%d %H:%M:%S')}
"""
        self.notify(
            title=f"Rozpoczęto budowanie: {branch}",
            message=message,
            priority=2,
            tags=["build", "started", branch]
        )

    def build_success(self, branch, commit, tests_passed):
        """Powiadom o sukcesie budowania"""
        duration = (datetime.now() - self.build_start).total_seconds()

        message = f"""
## ✅ Budowanie zakończone sukcesem

**Gałąź:** `{branch}`
**Commit:** `{commit[:7]}`
**Czas trwania:** {duration:.1f}s
**Testy:** {tests_passed} zaliczonych

[Zobacz logi budowania](https://ci.example.com/builds/latest)
"""
        self.notify(
            title=f"Sukces budowania: {branch}",
            message=message,
            priority=3,
            tags=["build", "success", branch]
        )

    def build_failed(self, branch, commit, error):
        """Powiadom o niepowodzeniu budowania"""
        duration = (datetime.now() - self.build_start).total_seconds()

        message = f"""
## ❌ Budowanie nie powiodło się

**Gałąź:** `{branch}`
**Commit:** `{commit[:7]}`
**Czas trwania:** {duration:.1f}s

**Błąd:**
{error}
[Zobacz logi budowania](https://ci.example.com/builds/latest)
"""
        self.notify(
            title=f"Niepowodzenie budowania: {branch}",
            message=message,
            priority=5,
            tags=["build", "failed", branch]
        )

# Użycie w skrypcie CI
if __name__ == "__main__":
    pipeline = CIPipeline()

    branch = os.environ.get("GIT_BRANCH", "unknown")
    commit = os.environ.get("GIT_COMMIT", "unknown")

    pipeline.build_started(branch, commit)

    try:
        # Uruchom testy, budowanie itp.
        # ... twoje kroki CI ...

        pipeline.build_success(branch, commit, tests_passed=42)
        sys.exit(0)
    except Exception as e:
        pipeline.build_failed(branch, commit, str(e))
        sys.exit(1)

Powiadomienia o statusie kopii zapasowej

import requests
import subprocess
import os
from datetime import datetime

BACKUP_TOPIC = "database-backups"

def notify_backup_status(success, size_mb=None, duration_seconds=None, error=None):
    """Powiadom o statusie ukończenia kopii zapasowej"""
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    if success:
        message = f"""
## ✅ Kopia zapasowa zakończona pomyślnie

**Typ:** Pełna kopia zapasowa bazy danych
**Rozmiar:** {size_mb:.2f} MB
**Czas trwania:** {duration_seconds:.1f}s
**Lokalizacja:** `s3://backups/db-{datetime.now().strftime('%Y-%m-%d')}.sql.gz`
**Ukończono:** {timestamp}

**Szczegóły:**
- Kompresja: gzip
- Weryfikacja: Zaliczona ✅
"""
        priority = 2
        tags = ["backup", "success"]
    else:
        message = f"""
## ❌ Kopia zapasowa nie powiodła się

**Błąd:** {error}
**Czas:** {timestamp}

**Wymagane działania:**
1. Sprawdź logi kopii zapasowej
2. Zweryfikuj miejsce na dysku
3. Ponów kopię zapasową ręcznie
"""
        priority = 5
        tags = ["backup", "failed", "critical"]

    requests.post(
        f"https://app.notifer.io/{BACKUP_TOPIC}",
        data=message,
        headers={
            "X-Title": "Kopia zapasowa ukończona" if success else "Kopia zapasowa nie powiodła się",
            "X-Priority": str(priority),
            "X-Tags": ",".join(tags),
            "X-Markdown": "true"
        }
    )

# Użycie w skrypcie kopii zapasowej
def run_backup():
    """Uruchom kopię zapasową bazy danych"""
    start_time = datetime.now()

    try:
        # Uruchom polecenie kopii zapasowej
        result = subprocess.run(
            ["pg_dump", "-h", "localhost", "-U", "postgres", "mydb"],
            capture_output=True,
            check=True
        )

        # Oblicz metryki
        duration = (datetime.now() - start_time).total_seconds()
        size_mb = len(result.stdout) / 1024 / 1024

        notify_backup_status(
            success=True,
            size_mb=size_mb,
            duration_seconds=duration
        )

    except subprocess.CalledProcessError as e:
        notify_backup_status(
            success=False,
            error=str(e)
        )
        raise

run_backup()

Generowanie zaplanowanych raportów

import requests
from datetime import datetime, timedelta

def generate_daily_report():
    """Wygeneruj i wyślij raport statystyk dziennych"""

    # Pobierz metryki aplikacji
    # To są przykładowe dane - zastąp własnymi metrykami
    metrics = {
        "users_today": 1234,
        "users_change": "+5.2%",
        "orders_today": 456,
        "orders_change": "+12.1%",
        "revenue_today": 12345.67,
        "revenue_change": "+8.7%",
        "errors_today": 23,
        "errors_change": "-15.3%"
    }

    date_str = datetime.now().strftime('%Y-%m-%d')

    message = f"""
## 📊 Raport dzienny - {date_str}

### Statystyki aplikacji

| Metryka | Wartość | Zmiana |
|--------|-------|--------|
| Użytkownicy | {metrics['users_today']} | {metrics['users_change']} ↗️ |
| Zamówienia | {metrics['orders_today']} | {metrics['orders_change']} ↗️ |
| Przychód | ${metrics['revenue_today']:.2f} | {metrics['revenue_change']} ↗️ |
| Błędy | {metrics['errors_today']} | {metrics['errors_change']} ↘️ |

---

### Najpopularniejsze strony
1. Strona główna - 5 432 wyświetleń
2. Produkty - 3 210 wyświetleń
3. Kasa - 1 890 wyświetleń

### Problemy
- ⚠️ Wolna odpowiedź API na kasie (średnio 2,1s)
- ✅ Kopia zapasowa bazy danych ukończona
- ✅ Certyfikat SSL odnowiony

[Zobacz pełny pulpit](https://analytics.example.com)
"""

    requests.post(
        "https://app.notifer.io/daily-reports",
        data=message,
        headers={
            "X-Title": f"Raport dzienny - {date_str}",
            "X-Priority": "2",
            "X-Tags": "report,daily,analytics",
            "X-Markdown": "true"
        }
    )

# Uruchom jako zadanie cron: 0 9 * * * /usr/bin/python3 /path/to/report.py
if __name__ == "__main__":
    generate_daily_report()

Obsługa błędów

Podstawowa obsługa błędów

import requests

def safe_publish(topic, message, **kwargs):
    """Publikuj z obsługą błędów"""
    try:
        response = requests.post(
            f"https://app.notifer.io/{topic}",
            data=message,
            headers=kwargs.get("headers", {}),
            timeout=5
        )
        response.raise_for_status()
        return response.json()

    except requests.exceptions.Timeout:
        print("Błąd: Przekroczono limit czasu żądania")
        return None

    except requests.exceptions.ConnectionError:
        print("Błąd: Połączenie nie powiodło się")
        return None

    except requests.exceptions.HTTPError as e:
        print(f"Błąd HTTP: {e.response.status_code} - {e.response.text}")
        return None

    except Exception as e:
        print(f"Nieoczekiwany błąd: {e}")
        return None

Ponawianie z wykładniczym backoff

import requests
import time

def publish_with_retry(topic, message, max_retries=3, **kwargs):
    """Publikuj z ponowieniem z wykładniczym backoff"""

    for attempt in range(max_retries):
        try:
            response = requests.post(
                f"https://app.notifer.io/{topic}",
                data=message,
                headers=kwargs.get("headers", {}),
                timeout=5
            )
            response.raise_for_status()
            return response.json()

        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                # Ostatnia próba się nie powiodła
                print(f"Niepowodzenie po {max_retries} próbach: {e}")
                raise

            # Oblicz opóźnienie backoff: 2^attempt sekund
            delay = 2 ** attempt
            print(f"Próba {attempt + 1} nie powiodła się, ponowienie za {delay}s...")
            time.sleep(delay)

Dobre praktyki

1. Używaj zmiennych środowiskowych

import os
import requests

NOTIFER_TOPIC = os.getenv("NOTIFER_TOPIC", "default-topic")
NOTIFER_API_KEY = os.getenv("NOTIFER_API_KEY")

def publish(message, **kwargs):
    """Publikuj z konfiguracją opartą na środowisku"""
    headers = kwargs.get("headers", {})

    if NOTIFER_API_KEY:
        headers["Authorization"] = f"Bearer {NOTIFER_API_KEY}"

    response = requests.post(
        f"https://app.notifer.io/{NOTIFER_TOPIC}",
        data=message,
        headers=headers
    )
    response.raise_for_status()
    return response.json()

Plik środowiskowy (.env):

NOTIFER_TOPIC=production-alerts
NOTIFER_API_KEY=your-api-key-here

2. Utwórz klasę klienta wielokrotnego użytku

import requests
from typing import Optional, List

class Notifer:
    """Klient Notifer wielokrotnego użytku"""

    def __init__(self, api_key: Optional[str] = None, base_url: str = "https://app.notifer.io"):
        self.base_url = base_url
        self.api_key = api_key
        self.session = requests.Session()

        if api_key:
            self.session.headers["Authorization"] = f"Bearer {api_key}"

    def publish(
        self,
        topic: str,
        message: str,
        title: Optional[str] = None,
        priority: int = 3,
        tags: Optional[List[str]] = None,
        markdown: bool = False
    ) -> dict:
        """Publikuj wiadomość do tematu"""

        headers = {
            "X-Priority": str(priority)
        }

        if title:
            headers["X-Title"] = title

        if tags:
            headers["X-Tags"] = ",".join(tags)

        if markdown:
            headers["X-Markdown"] = "true"

        response = self.session.post(
            f"{self.base_url}/{topic}",
            data=message,
            headers=headers,
            timeout=5
        )
        response.raise_for_status()
        return response.json()

# Użycie
client = Notifer(api_key="your-key")

client.publish(
    topic="alerts",
    title="Alert testowy",
    message="To jest test",
    priority=4,
    tags=["test", "demo"]
)

3. Integracja logowania

import logging
import requests

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def publish(topic, message, **kwargs):
    """Publikuj z logowaniem"""
    logger.info(f"Publikowanie do tematu '{topic}'")

    try:
        response = requests.post(
            f"https://app.notifer.io/{topic}",
            data=message,
            headers=kwargs.get("headers", {}),
            timeout=5
        )
        response.raise_for_status()

        result = response.json()
        logger.info(f"Opublikowano pomyślnie: {result['id']}")
        return result

    except requests.exceptions.RequestException as e:
        logger.error(f"Nie udało się opublikować: {e}")
        raise

4. Publikowanie asynchroniczne (asyncio)

import aiohttp
import asyncio

async def publish_async(topic, message, **kwargs):
    """Asynchroniczne publikowanie z aiohttp"""

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(
                f"https://app.notifer.io/{topic}",
                data=message,
                headers=kwargs.get("headers", {}),
                timeout=aiohttp.ClientTimeout(total=5)
            ) as response:
                response.raise_for_status()
                return await response.json()

        except aiohttp.ClientError as e:
            print(f"Błąd: {e}")
            return None

# Użycie
async def main():
    result = await publish_async(
        topic="async-topic",
        message="Wiadomość asynchroniczna"
    )
    print(result)

asyncio.run(main())

Testowanie

Testy jednostkowe

import unittest
from unittest.mock import patch, Mock
import requests

class TestNotiferPublishing(unittest.TestCase):
    """Testy jednostkowe dla publikowania Notifer"""

    @patch('requests.post')
    def test_publish_success(self, mock_post):
        """Test pomyślnego publikowania"""

        # Mock odpowiedzi
        mock_response = Mock()
        mock_response.json.return_value = {
            "id": "test-id",
            "topic": "test-topic",
            "message": "Test message"
        }
        mock_post.return_value = mock_response

        # Wywołaj funkcję
        result = publish("test-topic", "Test message")

        # Asercje
        self.assertEqual(result["id"], "test-id")
        mock_post.assert_called_once()

    @patch('requests.post')
    def test_publish_failure(self, mock_post):
        """Test niepowodzenia publikowania"""

        # Mock błędu
        mock_post.side_effect = requests.exceptions.ConnectionError()

        # Wywołanie powinno obsłużyć błąd
        result = safe_publish("test-topic", "Test message")

        # Asercje
        self.assertIsNone(result)

if __name__ == "__main__":
    unittest.main()

Następne kroki


Wskazówka: Używaj zmiennych środowiskowych dla kluczy API i twórz klasę klienta wielokrotnego użytku dla czystszego kodu!