""" Утилиты для асинхронной обработки данных LyngSat с кешированием. """ import logging from typing import Callable, Optional from .async_parser import AsyncLyngSatParser from .models import LyngSat from mainapp.models import Polarization, Standard, Modulation, Satellite logger = logging.getLogger(__name__) def process_single_satellite( parser: AsyncLyngSatParser, satellite_info: dict, force_refresh: bool = False ) -> dict: """ Обрабатывает один спутник и сохраняет данные в БД. Args: parser: Экземпляр парсера satellite_info: Информация о спутнике (name, url, update_date) force_refresh: Принудительно обновить кеш Returns: dict: Статистика обработки спутника """ sat_name = satellite_info["name"] sat_url = satellite_info["url"] stats = { "satellite_name": sat_name, "sources_found": 0, "created": 0, "updated": 0, "errors": [] } logger.info(f"Обработка спутника: {sat_name}") # Получаем данные спутника (из кеша или с сайта) satellite_data = parser.fetch_satellite_data(sat_name, sat_url, force_refresh) if not satellite_data: error_msg = f"Не удалось получить данные для спутника {sat_name}" logger.error(error_msg) stats["errors"].append(error_msg) return stats sources = satellite_data.get("sources", []) stats["sources_found"] = len(sources) logger.info(f"Найдено {len(sources)} источников для {sat_name}") # Находим спутник в базе try: sat_obj = Satellite.objects.get(name__icontains=sat_name) logger.debug(f"Спутник {sat_name} найден в базе (ID: {sat_obj.id})") except Satellite.DoesNotExist: error_msg = f"Спутник '{sat_name}' не найден в базе данных" logger.warning(error_msg) stats["errors"].append(error_msg) return stats except Satellite.MultipleObjectsReturned: error_msg = f"Найдено несколько спутников с именем '{sat_name}'" logger.warning(error_msg) stats["errors"].append(error_msg) return stats # Обрабатываем каждый источник for source_idx, source in enumerate(sources, 1): try: # Парсим частоту try: freq = float(source['freq']) except (ValueError, TypeError): freq = -1.0 logger.debug(f"Некорректная частота для {sat_name}: {source.get('freq')}") last_update = source['last_update'] fec = source['metadata'].get('fec') modulation_name = source['metadata'].get('modulation') standard_name = source['metadata'].get('standard') symbol_velocity = source['metadata'].get('symbol_rate') polarization_name = source['pol'] channel_info = source['provider_name'] # Создаем или получаем связанные объекты pol_obj, _ = Polarization.objects.get_or_create( name=polarization_name if polarization_name else "-" ) mod_obj, _ = Modulation.objects.get_or_create( name=modulation_name if modulation_name else "-" ) standard_obj, _ = Standard.objects.get_or_create( name=standard_name if standard_name else "-" ) # Создаем или обновляем запись Lyngsat lyng_obj, created = LyngSat.objects.update_or_create( id_satellite=sat_obj, frequency=freq, polarization=pol_obj, defaults={ "modulation": mod_obj, "standard": standard_obj, "sym_velocity": symbol_velocity if symbol_velocity else 0, "channel_info": channel_info[:20] if channel_info else "", "last_update": last_update, "fec": fec[:30] if fec else "", "url": satellite_data["url"] } ) if created: stats['created'] += 1 logger.debug(f"Создана запись для {sat_name} {freq} МГц") else: stats['updated'] += 1 logger.debug(f"Обновлена запись для {sat_name} {freq} МГц") # Логируем прогресс каждые 10 источников if source_idx % 10 == 0: logger.info(f"Обработано {source_idx}/{len(sources)} источников для {sat_name}") except Exception as e: error_msg = f"Ошибка при обработке источника {sat_name}: {str(e)}" logger.error(error_msg, exc_info=True) stats['errors'].append(error_msg) continue logger.info(f"Завершена обработка {sat_name}: создано {stats['created']}, обновлено {stats['updated']}") return stats def fill_lyngsat_data_async( target_sats: list[str], regions: list[str] = None, task_id: str = None, update_progress: Optional[Callable] = None, force_refresh: bool = False, use_cache: bool = True ) -> dict: """ Асинхронно заполняет данные Lyngsat для указанных спутников. Обрабатывает спутники по одному с кешированием. Args: target_sats: Список названий спутников для обработки regions: Список регионов для парсинга (по умолчанию все) task_id: ID задачи Celery для логирования update_progress: Функция для обновления прогресса (current, total, status, details) force_refresh: Принудительно обновить кеш use_cache: Использовать ли кеширование Returns: dict: Статистика обработки """ log_prefix = f"[Task {task_id}]" if task_id else "[Lyngsat Async]" overall_stats = { 'total_satellites': 0, 'processed_satellites': 0, 'total_sources': 0, 'created': 0, 'updated': 0, 'errors': [], 'satellites_details': [] } if regions is None: regions = ["europe", "asia", "america", "atlantic"] logger.info(f"{log_prefix} Начало асинхронной обработки данных") logger.info(f"{log_prefix} Спутники: {', '.join(target_sats)}") logger.info(f"{log_prefix} Регионы: {', '.join(regions)}") logger.info(f"{log_prefix} Использование кеша: {use_cache}, Принудительное обновление: {force_refresh}") if update_progress: update_progress(0, len(target_sats), "Инициализация парсера...", {}) try: # Создаем парсер parser = AsyncLyngSatParser( flaresolver_url="http://localhost:8191/v1", target_sats=target_sats, regions=regions, use_cache=use_cache ) logger.info(f"{log_prefix} Получение списка спутников...") if update_progress: update_progress(0, len(target_sats), "Получение списка спутников...", {}) # Получаем список всех спутников all_satellites = parser.get_all_satellites_list(force_refresh) overall_stats['total_satellites'] = len(all_satellites) logger.info(f"{log_prefix} Найдено {len(all_satellites)} спутников для обработки") # Обрабатываем каждый спутник по отдельности for idx, satellite_info in enumerate(all_satellites, 1): sat_name = satellite_info["name"] logger.info(f"{log_prefix} Обработка спутника {idx}/{len(all_satellites)}: {sat_name}") if update_progress: update_progress( idx - 1, len(all_satellites), f"Обработка {sat_name}...", { "current_satellite": sat_name, "created": overall_stats['created'], "updated": overall_stats['updated'] } ) # Обрабатываем спутник sat_stats = process_single_satellite(parser, satellite_info, force_refresh) # Обновляем общую статистику overall_stats['processed_satellites'] += 1 overall_stats['total_sources'] += sat_stats['sources_found'] overall_stats['created'] += sat_stats['created'] overall_stats['updated'] += sat_stats['updated'] overall_stats['errors'].extend(sat_stats['errors']) overall_stats['satellites_details'].append(sat_stats) logger.info( f"{log_prefix} Спутник {sat_name} обработан: " f"источников {sat_stats['sources_found']}, " f"создано {sat_stats['created']}, " f"обновлено {sat_stats['updated']}" ) logger.info( f"{log_prefix} Обработка завершена. " f"Спутников: {overall_stats['processed_satellites']}/{overall_stats['total_satellites']}, " f"Источников: {overall_stats['total_sources']}, " f"Создано: {overall_stats['created']}, " f"Обновлено: {overall_stats['updated']}, " f"Ошибок: {len(overall_stats['errors'])}" ) if update_progress: update_progress( overall_stats['processed_satellites'], overall_stats['total_satellites'], "Завершено", { "created": overall_stats['created'], "updated": overall_stats['updated'], "errors_count": len(overall_stats['errors']) } ) except Exception as e: error_msg = f"Критическая ошибка: {str(e)}" logger.error(f"{log_prefix} {error_msg}", exc_info=True) overall_stats['errors'].append(error_msg) return overall_stats def clear_lyngsat_cache(cache_type: str = "all") -> dict: """ Очищает кеш LyngSat. Args: cache_type: Тип кеша для очистки ("regions", "satellites", "all") Returns: dict: Статистика очистки """ logger.info(f"Очистка кеша LyngSat: {cache_type}") if cache_type == "all": stats = AsyncLyngSatParser.clear_all_cache() else: stats = AsyncLyngSatParser.clear_cache(cache_type) logger.info(f"Кеш очищен: {stats}") return stats