Files
dbstorage/dbapp/lyngsatapp/tasks.py

202 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Celery tasks for Lyngsat data processing.
"""
import logging
from celery import shared_task
from django.core.cache import cache
from .utils import fill_lyngsat_data
from .async_utils import fill_lyngsat_data_async, clear_lyngsat_cache
logger = logging.getLogger(__name__)
@shared_task(bind=True, name='lyngsatapp.fill_lyngsat_data_async')
def fill_lyngsat_data_task(self, target_sats, regions=None, force_refresh=False, use_cache=True):
"""
Асинхронная задача для заполнения данных Lyngsat с кешированием.
Обрабатывает спутники по одному.
Args:
target_sats: Список названий спутников для обработки
regions: Список регионов для парсинга (по умолчанию все)
force_refresh: Принудительно обновить кеш
use_cache: Использовать ли кеширование
Returns:
dict: Статистика обработки
"""
task_id = self.request.id
logger.info(f"[Task {task_id}] Начало обработки данных Lyngsat")
logger.info(f"[Task {task_id}] Спутники: {', '.join(target_sats)}")
logger.info(f"[Task {task_id}] Регионы: {', '.join(regions) if regions else 'все'}")
logger.info(f"[Task {task_id}] Кеширование: {use_cache}, Принудительное обновление: {force_refresh}")
# Обновляем статус задачи
self.update_state(
state='PROGRESS',
meta={
'current': 0,
'total': len(target_sats),
'status': 'Инициализация...',
'details': {}
}
)
try:
# Вызываем асинхронную функцию заполнения данных
stats = fill_lyngsat_data_async(
target_sats=target_sats,
regions=regions,
task_id=task_id,
force_refresh=force_refresh,
use_cache=use_cache,
update_progress=lambda current, total, status, details: self.update_state(
state='PROGRESS',
meta={
'current': current,
'total': total,
'status': status,
'details': details
}
)
)
logger.info(f"[Task {task_id}] Обработка завершена успешно")
logger.info(f"[Task {task_id}] Статистика: {stats}")
# Сохраняем результат в кеш для отображения на странице
cache.set(f'lyngsat_task_{task_id}', stats, timeout=3600)
return stats
except Exception as e:
logger.error(f"[Task {task_id}] Ошибка при обработке: {str(e)}", exc_info=True)
error_message = f"{type(e).__name__}: {str(e)}"
self.update_state(
state='FAILURE',
meta={
'error': error_message,
'status': 'Ошибка при обработке',
'details': {},
'exc_type': type(e).__name__,
'exc_message': str(e)
}
)
# Возвращаем словарь с ошибкой вместо raise для корректной сериализации
return {
'error': error_message,
'status': 'FAILURE',
'total_satellites': 0,
'processed_satellites': 0,
'total_sources': 0,
'created': 0,
'updated': 0,
'errors': [error_message]
}
@shared_task(bind=True, name='lyngsatapp.fill_lyngsat_data_sync')
def fill_lyngsat_data_task_sync(self, target_sats, regions=None):
"""
Синхронная задача для заполнения данных Lyngsat (старая версия без кеширования).
Используется для обратной совместимости.
Args:
target_sats: Список названий спутников для обработки
regions: Список регионов для парсинга (по умолчанию все)
Returns:
dict: Статистика обработки
"""
task_id = self.request.id
logger.info(f"[Task {task_id}] Начало синхронной обработки данных Lyngsat")
logger.info(f"[Task {task_id}] Спутники: {', '.join(target_sats)}")
logger.info(f"[Task {task_id}] Регионы: {', '.join(regions) if regions else 'все'}")
# Обновляем статус задачи
self.update_state(
state='PROGRESS',
meta={
'current': 0,
'total': len(target_sats),
'status': 'Инициализация...'
}
)
try:
# Вызываем старую функцию заполнения данных
stats = fill_lyngsat_data(
target_sats=target_sats,
regions=regions,
task_id=task_id,
update_progress=lambda current, total, status: self.update_state(
state='PROGRESS',
meta={
'current': current,
'total': total,
'status': status
}
)
)
logger.info(f"[Task {task_id}] Обработка завершена успешно")
logger.info(f"[Task {task_id}] Статистика: {stats}")
# Сохраняем результат в кеш для отображения на странице
cache.set(f'lyngsat_task_{task_id}', stats, timeout=3600)
return stats
except Exception as e:
logger.error(f"[Task {task_id}] Ошибка при обработке: {str(e)}", exc_info=True)
error_message = f"{type(e).__name__}: {str(e)}"
self.update_state(
state='FAILURE',
meta={
'error': error_message,
'status': 'Ошибка при обработке',
'exc_type': type(e).__name__,
'exc_message': str(e)
}
)
# Возвращаем словарь с ошибкой вместо raise для корректной сериализации
return {
'error': error_message,
'status': 'FAILURE',
'total_satellites': 0,
'total_sources': 0,
'created': 0,
'updated': 0,
'errors': [error_message]
}
@shared_task(bind=True, name='lyngsatapp.clear_cache')
def clear_cache_task(self, cache_type='all'):
"""
Задача для очистки кеша LyngSat.
Args:
cache_type: Тип кеша для очистки ("regions", "satellites", "all")
Returns:
dict: Статистика очистки
"""
task_id = self.request.id
logger.info(f"[Task {task_id}] Запуск задачи очистки кеша: {cache_type}")
try:
stats = clear_lyngsat_cache(cache_type)
logger.info(f"[Task {task_id}] Кеш очищен успешно: {stats}")
return stats
except Exception as e:
logger.error(f"[Task {task_id}] Ошибка при очистке кеша: {str(e)}", exc_info=True)
error_message = f"{type(e).__name__}: {str(e)}"
return {
'error': error_message,
'status': 'FAILURE',
'cleared': 0,
'errors': [error_message]
}