288 lines
12 KiB
Python
288 lines
12 KiB
Python
"""
|
||
Утилиты для асинхронной обработки данных LyngSat с кешированием.
|
||
"""
|
||
import logging
|
||
from typing import Callable, Optional
|
||
from .async_parser import AsyncLyngSatParser
|
||
from .models import LyngSat
|
||
from mainapp.models import Polarization, Standard, Modulation, Satellite
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def process_single_satellite(
|
||
parser: AsyncLyngSatParser,
|
||
satellite_info: dict,
|
||
force_refresh: bool = False
|
||
) -> dict:
|
||
"""
|
||
Обрабатывает один спутник и сохраняет данные в БД.
|
||
|
||
Args:
|
||
parser: Экземпляр парсера
|
||
satellite_info: Информация о спутнике (name, url, update_date)
|
||
force_refresh: Принудительно обновить кеш
|
||
|
||
Returns:
|
||
dict: Статистика обработки спутника
|
||
"""
|
||
sat_name = satellite_info["name"]
|
||
sat_url = satellite_info["url"]
|
||
|
||
stats = {
|
||
"satellite_name": sat_name,
|
||
"sources_found": 0,
|
||
"created": 0,
|
||
"updated": 0,
|
||
"errors": []
|
||
}
|
||
|
||
logger.info(f"Обработка спутника: {sat_name}")
|
||
|
||
# Получаем данные спутника (из кеша или с сайта)
|
||
satellite_data = parser.fetch_satellite_data(sat_name, sat_url, force_refresh)
|
||
|
||
if not satellite_data:
|
||
error_msg = f"Не удалось получить данные для спутника {sat_name}"
|
||
logger.error(error_msg)
|
||
stats["errors"].append(error_msg)
|
||
return stats
|
||
|
||
sources = satellite_data.get("sources", [])
|
||
stats["sources_found"] = len(sources)
|
||
|
||
logger.info(f"Найдено {len(sources)} источников для {sat_name}")
|
||
|
||
# Находим спутник в базе
|
||
try:
|
||
sat_obj = Satellite.objects.get(name__icontains=sat_name)
|
||
logger.debug(f"Спутник {sat_name} найден в базе (ID: {sat_obj.id})")
|
||
except Satellite.DoesNotExist:
|
||
error_msg = f"Спутник '{sat_name}' не найден в базе данных"
|
||
logger.warning(error_msg)
|
||
stats["errors"].append(error_msg)
|
||
return stats
|
||
except Satellite.MultipleObjectsReturned:
|
||
error_msg = f"Найдено несколько спутников с именем '{sat_name}'"
|
||
logger.warning(error_msg)
|
||
stats["errors"].append(error_msg)
|
||
return stats
|
||
|
||
# Обрабатываем каждый источник
|
||
for source_idx, source in enumerate(sources, 1):
|
||
try:
|
||
# Парсим частоту
|
||
try:
|
||
freq = float(source['freq'])
|
||
except (ValueError, TypeError):
|
||
freq = -1.0
|
||
logger.debug(f"Некорректная частота для {sat_name}: {source.get('freq')}")
|
||
|
||
last_update = source['last_update']
|
||
fec = source['metadata'].get('fec')
|
||
modulation_name = source['metadata'].get('modulation')
|
||
standard_name = source['metadata'].get('standard')
|
||
symbol_velocity = source['metadata'].get('symbol_rate')
|
||
polarization_name = source['pol']
|
||
channel_info = source['provider_name']
|
||
|
||
# Создаем или получаем связанные объекты
|
||
pol_obj, _ = Polarization.objects.get_or_create(
|
||
name=polarization_name if polarization_name else "-"
|
||
)
|
||
|
||
mod_obj, _ = Modulation.objects.get_or_create(
|
||
name=modulation_name if modulation_name else "-"
|
||
)
|
||
|
||
standard_obj, _ = Standard.objects.get_or_create(
|
||
name=standard_name if standard_name else "-"
|
||
)
|
||
|
||
# Создаем или обновляем запись Lyngsat
|
||
lyng_obj, created = LyngSat.objects.update_or_create(
|
||
id_satellite=sat_obj,
|
||
frequency=freq,
|
||
polarization=pol_obj,
|
||
defaults={
|
||
"modulation": mod_obj,
|
||
"standard": standard_obj,
|
||
"sym_velocity": symbol_velocity if symbol_velocity else 0,
|
||
"channel_info": channel_info[:20] if channel_info else "",
|
||
"last_update": last_update,
|
||
"fec": fec[:30] if fec else "",
|
||
"url": satellite_data["url"]
|
||
}
|
||
)
|
||
|
||
if created:
|
||
stats['created'] += 1
|
||
logger.debug(f"Создана запись для {sat_name} {freq} МГц")
|
||
else:
|
||
stats['updated'] += 1
|
||
logger.debug(f"Обновлена запись для {sat_name} {freq} МГц")
|
||
|
||
# Логируем прогресс каждые 10 источников
|
||
if source_idx % 10 == 0:
|
||
logger.info(f"Обработано {source_idx}/{len(sources)} источников для {sat_name}")
|
||
|
||
except Exception as e:
|
||
error_msg = f"Ошибка при обработке источника {sat_name}: {str(e)}"
|
||
logger.error(error_msg, exc_info=True)
|
||
stats['errors'].append(error_msg)
|
||
continue
|
||
|
||
logger.info(f"Завершена обработка {sat_name}: создано {stats['created']}, обновлено {stats['updated']}")
|
||
return stats
|
||
|
||
|
||
def fill_lyngsat_data_async(
|
||
target_sats: list[str],
|
||
regions: list[str] = None,
|
||
task_id: str = None,
|
||
update_progress: Optional[Callable] = None,
|
||
force_refresh: bool = False,
|
||
use_cache: bool = True
|
||
) -> dict:
|
||
"""
|
||
Асинхронно заполняет данные Lyngsat для указанных спутников.
|
||
Обрабатывает спутники по одному с кешированием.
|
||
|
||
Args:
|
||
target_sats: Список названий спутников для обработки
|
||
regions: Список регионов для парсинга (по умолчанию все)
|
||
task_id: ID задачи Celery для логирования
|
||
update_progress: Функция для обновления прогресса (current, total, status, details)
|
||
force_refresh: Принудительно обновить кеш
|
||
use_cache: Использовать ли кеширование
|
||
|
||
Returns:
|
||
dict: Статистика обработки
|
||
"""
|
||
log_prefix = f"[Task {task_id}]" if task_id else "[Lyngsat Async]"
|
||
|
||
overall_stats = {
|
||
'total_satellites': 0,
|
||
'processed_satellites': 0,
|
||
'total_sources': 0,
|
||
'created': 0,
|
||
'updated': 0,
|
||
'errors': [],
|
||
'satellites_details': []
|
||
}
|
||
|
||
if regions is None:
|
||
regions = ["europe", "asia", "america", "atlantic"]
|
||
|
||
logger.info(f"{log_prefix} Начало асинхронной обработки данных")
|
||
logger.info(f"{log_prefix} Спутники: {', '.join(target_sats)}")
|
||
logger.info(f"{log_prefix} Регионы: {', '.join(regions)}")
|
||
logger.info(f"{log_prefix} Использование кеша: {use_cache}, Принудительное обновление: {force_refresh}")
|
||
|
||
if update_progress:
|
||
update_progress(0, len(target_sats), "Инициализация парсера...", {})
|
||
|
||
try:
|
||
# Создаем парсер
|
||
parser = AsyncLyngSatParser(
|
||
flaresolver_url="http://localhost:8191/v1",
|
||
target_sats=target_sats,
|
||
regions=regions,
|
||
use_cache=use_cache
|
||
)
|
||
|
||
logger.info(f"{log_prefix} Получение списка спутников...")
|
||
if update_progress:
|
||
update_progress(0, len(target_sats), "Получение списка спутников...", {})
|
||
|
||
# Получаем список всех спутников
|
||
all_satellites = parser.get_all_satellites_list(force_refresh)
|
||
overall_stats['total_satellites'] = len(all_satellites)
|
||
|
||
logger.info(f"{log_prefix} Найдено {len(all_satellites)} спутников для обработки")
|
||
|
||
# Обрабатываем каждый спутник по отдельности
|
||
for idx, satellite_info in enumerate(all_satellites, 1):
|
||
sat_name = satellite_info["name"]
|
||
|
||
logger.info(f"{log_prefix} Обработка спутника {idx}/{len(all_satellites)}: {sat_name}")
|
||
|
||
if update_progress:
|
||
update_progress(
|
||
idx - 1,
|
||
len(all_satellites),
|
||
f"Обработка {sat_name}...",
|
||
{
|
||
"current_satellite": sat_name,
|
||
"created": overall_stats['created'],
|
||
"updated": overall_stats['updated']
|
||
}
|
||
)
|
||
|
||
# Обрабатываем спутник
|
||
sat_stats = process_single_satellite(parser, satellite_info, force_refresh)
|
||
|
||
# Обновляем общую статистику
|
||
overall_stats['processed_satellites'] += 1
|
||
overall_stats['total_sources'] += sat_stats['sources_found']
|
||
overall_stats['created'] += sat_stats['created']
|
||
overall_stats['updated'] += sat_stats['updated']
|
||
overall_stats['errors'].extend(sat_stats['errors'])
|
||
overall_stats['satellites_details'].append(sat_stats)
|
||
|
||
logger.info(
|
||
f"{log_prefix} Спутник {sat_name} обработан: "
|
||
f"источников {sat_stats['sources_found']}, "
|
||
f"создано {sat_stats['created']}, "
|
||
f"обновлено {sat_stats['updated']}"
|
||
)
|
||
|
||
logger.info(
|
||
f"{log_prefix} Обработка завершена. "
|
||
f"Спутников: {overall_stats['processed_satellites']}/{overall_stats['total_satellites']}, "
|
||
f"Источников: {overall_stats['total_sources']}, "
|
||
f"Создано: {overall_stats['created']}, "
|
||
f"Обновлено: {overall_stats['updated']}, "
|
||
f"Ошибок: {len(overall_stats['errors'])}"
|
||
)
|
||
|
||
if update_progress:
|
||
update_progress(
|
||
overall_stats['processed_satellites'],
|
||
overall_stats['total_satellites'],
|
||
"Завершено",
|
||
{
|
||
"created": overall_stats['created'],
|
||
"updated": overall_stats['updated'],
|
||
"errors_count": len(overall_stats['errors'])
|
||
}
|
||
)
|
||
|
||
except Exception as e:
|
||
error_msg = f"Критическая ошибка: {str(e)}"
|
||
logger.error(f"{log_prefix} {error_msg}", exc_info=True)
|
||
overall_stats['errors'].append(error_msg)
|
||
|
||
return overall_stats
|
||
|
||
|
||
def clear_lyngsat_cache(cache_type: str = "all") -> dict:
|
||
"""
|
||
Очищает кеш LyngSat.
|
||
|
||
Args:
|
||
cache_type: Тип кеша для очистки ("regions", "satellites", "all")
|
||
|
||
Returns:
|
||
dict: Статистика очистки
|
||
"""
|
||
logger.info(f"Очистка кеша LyngSat: {cache_type}")
|
||
|
||
if cache_type == "all":
|
||
stats = AsyncLyngSatParser.clear_all_cache()
|
||
else:
|
||
stats = AsyncLyngSatParser.clear_cache(cache_type)
|
||
|
||
logger.info(f"Кеш очищен: {stats}")
|
||
return stats
|