12 KiB
Руководство по асинхронному заполнению данных Lyngsat
Обзор
Система заполнения данных Lyngsat теперь работает асинхронно с использованием Celery. Это позволяет:
- Не блокировать веб-интерфейс во время долгих операций
- Отслеживать прогресс выполнения задачи в реальном времени
- Просматривать детальные логи обработки
- Получать уведомления о завершении задачи
Архитектура
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Django │─────▶│ Celery │─────▶│ Redis │
│ Web App │ │ Worker │ │ Broker │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
│ ▼
│ ┌─────────────┐
└─────────────▶│ PostgreSQL │
│ Database │
└─────────────┘
Установка и настройка
1. Установка зависимостей
pip install -r requirements.txt
Новые зависимости:
celery>=5.4.0- асинхронная обработка задачdjango-celery-results>=2.5.1- хранение результатов в БД
2. Применение миграций
cd dbapp
python manage.py migrate
Это создаст таблицы для хранения результатов Celery.
3. Запуск Redis
Redis используется как брокер сообщений для Celery.
Вариант 1: Docker Compose (рекомендуется)
docker-compose up -d redis
Вариант 2: Локальная установка
# Ubuntu/Debian
sudo apt-get install redis-server
sudo systemctl start redis
# macOS
brew install redis
brew services start redis
# Проверка
redis-cli ping
# Должно вернуть: PONG
4. Запуск FlareSolver
FlareSolver необходим для обхода защиты Cloudflare.
docker-compose up -d flaresolverr
Или отдельно:
docker run -d -p 8191:8191 --name flaresolverr ghcr.io/flaresolverr/flaresolverr:latest
5. Запуск Celery Worker
Вариант 1: Используя скрипт
cd dbapp
./start_celery_worker.sh
Вариант 2: Напрямую
cd dbapp
celery -A dbapp worker --loglevel=info
Вариант 3: В фоновом режиме (Linux/macOS)
cd dbapp
celery -A dbapp worker --loglevel=info --logfile=logs/celery_worker.log --detach
Использование
1. Запуск задачи через веб-интерфейс
- Откройте страницу действий:
http://localhost:8000/actions/ - Нажмите "Заполнить данные Lyngsat"
- Выберите спутники и регионы
- Нажмите "Заполнить данные"
- Вы будете перенаправлены на страницу отслеживания прогресса
2. Отслеживание прогресса
На странице статуса задачи вы увидите:
- Прогресс-бар с процентом выполнения
- Текущий статус (например, "Обработка Astra 4A...")
- Состояние задачи (PENDING, PROGRESS, SUCCESS, FAILURE)
- Результаты после завершения:
- Количество обработанных спутников
- Количество обработанных источников
- Количество созданных записей
- Количество обновленных записей
- Список ошибок (если есть)
Страница автоматически обновляется каждые 2 секунды.
3. Просмотр логов
Логи Celery worker содержат детальную информацию о процессе:
# Просмотр логов в реальном времени
tail -f dbapp/logs/celery_worker.log
# Поиск по логам
grep "Task" dbapp/logs/celery_worker.log
grep "ERROR" dbapp/logs/celery_worker.log
Формат логов:
[2024-01-15 10:30:45: INFO/MainProcess][lyngsatapp.fill_lyngsat_data_async(abc123)] [Task abc123] Начало обработки данных Lyngsat
[2024-01-15 10:30:45: INFO/MainProcess][lyngsatapp.fill_lyngsat_data_async(abc123)] [Task abc123] Спутники: Astra 4A, Hotbird 13G
[2024-01-15 10:30:46: INFO/MainProcess][lyngsatapp.fill_lyngsat_data_async(abc123)] [Task abc123] Получено данных по 2 спутникам
[2024-01-15 10:31:00: INFO/MainProcess][lyngsatapp.fill_lyngsat_data_async(abc123)] [Task abc123] Обработка спутника 1/2: Astra 4A
Технические детали
Структура задачи
Файл: dbapp/lyngsatapp/tasks.py
@shared_task(bind=True, name='lyngsatapp.fill_lyngsat_data_async')
def fill_lyngsat_data_task(self, target_sats, regions=None):
# Логирование начала
# Обновление прогресса
# Вызов функции заполнения
# Сохранение результата в кеш
# Обработка ошибок
Обновление прогресса
Функция fill_lyngsat_data теперь принимает callback update_progress:
def update_progress(current, total, status):
self.update_state(
state='PROGRESS',
meta={
'current': current,
'total': total,
'status': status
}
)
API для проверки статуса
Endpoint: /api/lyngsat-task-status/<task_id>/
Ответ:
{
"task_id": "abc123",
"state": "PROGRESS",
"status": "Обработка Astra 4A...",
"current": 1,
"total": 2,
"percent": 50
}
Логирование
Используется стандартный модуль logging Python:
import logging
logger = logging.getLogger(__name__)
logger.info(f"[Task {task_id}] Начало обработки")
logger.debug(f"[Task {task_id}] Детальная информация")
logger.warning(f"[Task {task_id}] Предупреждение")
logger.error(f"[Task {task_id}] Ошибка", exc_info=True)
Настройки Celery
Файл: dbapp/dbapp/settings/base.py
# Брокер сообщений
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# Хранение результатов
CELERY_RESULT_BACKEND = 'django-db'
# Таймауты
CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 минут
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # 25 минут
# Отслеживание прогресса
CELERY_TASK_TRACK_STARTED = True
Мониторинг и отладка
Flower - веб-интерфейс для мониторинга Celery
Установка:
pip install flower
Запуск:
celery -A dbapp flower
Откройте: http://localhost:5555
Проверка статуса задачи через Django shell
python manage.py shell
from celery.result import AsyncResult
task_id = 'abc123'
task = AsyncResult(task_id)
print(f"State: {task.state}")
print(f"Info: {task.info}")
print(f"Result: {task.result}")
Очистка старых результатов
# Удалить результаты старше 1 дня
python manage.py celery_results_cleanup --days=1
Решение проблем
Проблема: Worker не запускается
Решение:
- Проверьте, что Redis запущен:
redis-cli ping - Проверьте настройки в
.env:CELERY_BROKER_URL - Проверьте логи:
tail -f logs/celery_worker.log
Проблема: Задача зависла в состоянии PENDING
Решение:
- Проверьте, что worker запущен:
ps aux | grep celery - Перезапустите worker
- Проверьте соединение с Redis
Проблема: Задача завершается с ошибкой
Решение:
- Проверьте логи worker
- Проверьте, что FlareSolver запущен:
curl http://localhost:8191/v1 - Проверьте, что спутники существуют в базе данных
Проблема: Прогресс не обновляется
Решение:
- Откройте консоль браузера (F12) и проверьте ошибки
- Проверьте, что API endpoint доступен:
/api/lyngsat-task-status/<task_id>/ - Очистите кеш браузера
Производственное развертывание
Systemd сервис для Celery Worker
Создайте файл /etc/systemd/system/celery-worker.service:
[Unit]
Description=Celery Worker for Django Lyngsat
After=network.target redis.service
[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/path/to/dbapp
Environment="PATH=/path/to/venv/bin"
ExecStart=/path/to/venv/bin/celery -A dbapp worker --loglevel=info --logfile=/var/log/celery/worker.log --detach
ExecStop=/path/to/venv/bin/celery -A dbapp control shutdown
Restart=always
[Install]
WantedBy=multi-user.target
Запуск:
sudo systemctl daemon-reload
sudo systemctl enable celery-worker
sudo systemctl start celery-worker
sudo systemctl status celery-worker
Supervisor (альтернатива)
Установка:
sudo apt-get install supervisor
Конфигурация /etc/supervisor/conf.d/celery.conf:
[program:celery-worker]
command=/path/to/venv/bin/celery -A dbapp worker --loglevel=info
directory=/path/to/dbapp
user=www-data
autostart=true
autorestart=true
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker_error.log
Запуск:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start celery-worker
Дополнительные возможности
Периодические задачи (Celery Beat)
Для автоматического обновления данных по расписанию:
- Установите
django-celery-beat:
pip install django-celery-beat
- Добавьте в
INSTALLED_APPS:
INSTALLED_APPS = [
...
'django_celery_beat',
]
- Примените миграции:
python manage.py migrate django_celery_beat
-
Создайте периодическую задачу через админ-панель Django
-
Запустите beat scheduler:
celery -A dbapp beat --loglevel=info
Уведомления по email
Добавьте в задачу отправку email при завершении:
from django.core.mail import send_mail
@shared_task(bind=True)
def fill_lyngsat_data_task(self, target_sats, regions=None):
# ... обработка ...
# Отправка email
send_mail(
'Задача Lyngsat завершена',
f'Обработано {stats["total_satellites"]} спутников',
'noreply@example.com',
['admin@example.com'],
)
Заключение
Асинхронная обработка данных Lyngsat обеспечивает:
- ✅ Неблокирующий веб-интерфейс
- ✅ Отслеживание прогресса в реальном времени
- ✅ Детальное логирование
- ✅ Масштабируемость (можно запустить несколько workers)
- ✅ Надежность (автоматический retry при ошибках)
Для получения дополнительной помощи обратитесь к документации: