Files
dbstorage/dbapp/lyngsatapp/async_utils.py

293 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Утилиты для асинхронной обработки данных LyngSat с кешированием.
"""
import logging
from typing import Callable, Optional
from .async_parser import AsyncLyngSatParser
from .models import LyngSat
from mainapp.models import Polarization, Standard, Modulation, Satellite
from dbapp.settings.base import FLARESOLVERR_URL
logger = logging.getLogger(__name__)
def process_single_satellite(
parser: AsyncLyngSatParser,
satellite_info: dict,
force_refresh: bool = False
) -> dict:
"""
Обрабатывает один спутник и сохраняет данные в БД.
Args:
parser: Экземпляр парсера
satellite_info: Информация о спутнике (name, url, update_date)
force_refresh: Принудительно обновить кеш
Returns:
dict: Статистика обработки спутника
"""
sat_name = satellite_info["name"]
sat_url = satellite_info["url"]
stats = {
"satellite_name": sat_name,
"sources_found": 0,
"created": 0,
"updated": 0,
"errors": []
}
logger.info(f"Обработка спутника: {sat_name}")
# Получаем данные спутника (из кеша или с сайта)
satellite_data = parser.fetch_satellite_data(sat_name, sat_url, force_refresh)
if not satellite_data:
error_msg = f"Не удалось получить данные для спутника {sat_name}"
logger.error(error_msg)
stats["errors"].append(error_msg)
return stats
sources = satellite_data.get("sources", [])
stats["sources_found"] = len(sources)
logger.info(f"Найдено {len(sources)} источников для {sat_name}")
# Находим спутник в базе по имени или альтернативному имени (lowercase)
from django.db.models import Q
sat_name_lower = sat_name.lower()
try:
sat_obj = Satellite.objects.get(
Q(name__icontains=sat_name_lower) | Q(alternative_name__icontains=sat_name_lower)
)
logger.debug(f"Спутник {sat_name} найден в базе (ID: {sat_obj.id})")
except Satellite.DoesNotExist:
error_msg = f"Спутник '{sat_name}' не найден в базе данных"
logger.warning(error_msg)
stats["errors"].append(error_msg)
return stats
except Satellite.MultipleObjectsReturned:
error_msg = f"Найдено несколько спутников с именем '{sat_name}'"
logger.warning(error_msg)
stats["errors"].append(error_msg)
return stats
# Обрабатываем каждый источник
for source_idx, source in enumerate(sources, 1):
try:
# Парсим частоту
try:
freq = float(source['freq'])
except (ValueError, TypeError):
freq = -1.0
logger.debug(f"Некорректная частота для {sat_name}: {source.get('freq')}")
last_update = source['last_update']
fec = source['metadata'].get('fec')
modulation_name = source['metadata'].get('modulation')
standard_name = source['metadata'].get('standard')
symbol_velocity = source['metadata'].get('symbol_rate')
polarization_name = source['pol']
channel_info = source['provider_name']
# Создаем или получаем связанные объекты
pol_obj, _ = Polarization.objects.get_or_create(
name=polarization_name if polarization_name else "-"
)
mod_obj, _ = Modulation.objects.get_or_create(
name=modulation_name if modulation_name else "-"
)
standard_obj, _ = Standard.objects.get_or_create(
name=standard_name if standard_name else "-"
)
# Создаем или обновляем запись Lyngsat
lyng_obj, created = LyngSat.objects.update_or_create(
id_satellite=sat_obj,
frequency=freq,
polarization=pol_obj,
defaults={
"modulation": mod_obj,
"standard": standard_obj,
"sym_velocity": symbol_velocity if symbol_velocity else 0,
"channel_info": channel_info[:20] if channel_info else "",
"last_update": last_update,
"fec": fec[:30] if fec else "",
"url": satellite_data["url"]
}
)
if created:
stats['created'] += 1
logger.debug(f"Создана запись для {sat_name} {freq} МГц")
else:
stats['updated'] += 1
logger.debug(f"Обновлена запись для {sat_name} {freq} МГц")
# Логируем прогресс каждые 10 источников
if source_idx % 10 == 0:
logger.info(f"Обработано {source_idx}/{len(sources)} источников для {sat_name}")
except Exception as e:
error_msg = f"Ошибка при обработке источника {sat_name}: {str(e)}"
logger.error(error_msg, exc_info=True)
stats['errors'].append(error_msg)
continue
logger.info(f"Завершена обработка {sat_name}: создано {stats['created']}, обновлено {stats['updated']}")
return stats
def fill_lyngsat_data_async(
target_sats: list[str],
regions: list[str] = None,
task_id: str = None,
update_progress: Optional[Callable] = None,
force_refresh: bool = False,
use_cache: bool = True
) -> dict:
"""
Асинхронно заполняет данные Lyngsat для указанных спутников.
Обрабатывает спутники по одному с кешированием.
Args:
target_sats: Список названий спутников для обработки
regions: Список регионов для парсинга (по умолчанию все)
task_id: ID задачи Celery для логирования
update_progress: Функция для обновления прогресса (current, total, status, details)
force_refresh: Принудительно обновить кеш
use_cache: Использовать ли кеширование
Returns:
dict: Статистика обработки
"""
log_prefix = f"[Task {task_id}]" if task_id else "[Lyngsat Async]"
overall_stats = {
'total_satellites': 0,
'processed_satellites': 0,
'total_sources': 0,
'created': 0,
'updated': 0,
'errors': [],
'satellites_details': []
}
if regions is None:
regions = ["europe", "asia", "america", "atlantic"]
logger.info(f"{log_prefix} Начало асинхронной обработки данных")
logger.info(f"{log_prefix} Спутники: {', '.join(target_sats)}")
logger.info(f"{log_prefix} Регионы: {', '.join(regions)}")
logger.info(f"{log_prefix} Использование кеша: {use_cache}, Принудительное обновление: {force_refresh}")
if update_progress:
update_progress(0, len(target_sats), "Инициализация парсера...", {})
try:
# Создаем парсер
parser = AsyncLyngSatParser(
flaresolver_url=FLARESOLVERR_URL,
target_sats=target_sats,
regions=regions,
use_cache=use_cache
)
logger.info(f"{log_prefix} Получение списка спутников...")
if update_progress:
update_progress(0, len(target_sats), "Получение списка спутников...", {})
# Получаем список всех спутников
all_satellites = parser.get_all_satellites_list(force_refresh)
overall_stats['total_satellites'] = len(all_satellites)
logger.info(f"{log_prefix} Найдено {len(all_satellites)} спутников для обработки")
# Обрабатываем каждый спутник по отдельности
for idx, satellite_info in enumerate(all_satellites, 1):
sat_name = satellite_info["name"]
logger.info(f"{log_prefix} Обработка спутника {idx}/{len(all_satellites)}: {sat_name}")
if update_progress:
update_progress(
idx - 1,
len(all_satellites),
f"Обработка {sat_name}...",
{
"current_satellite": sat_name,
"created": overall_stats['created'],
"updated": overall_stats['updated']
}
)
# Обрабатываем спутник
sat_stats = process_single_satellite(parser, satellite_info, force_refresh)
# Обновляем общую статистику
overall_stats['processed_satellites'] += 1
overall_stats['total_sources'] += sat_stats['sources_found']
overall_stats['created'] += sat_stats['created']
overall_stats['updated'] += sat_stats['updated']
overall_stats['errors'].extend(sat_stats['errors'])
overall_stats['satellites_details'].append(sat_stats)
logger.info(
f"{log_prefix} Спутник {sat_name} обработан: "
f"источников {sat_stats['sources_found']}, "
f"создано {sat_stats['created']}, "
f"обновлено {sat_stats['updated']}"
)
logger.info(
f"{log_prefix} Обработка завершена. "
f"Спутников: {overall_stats['processed_satellites']}/{overall_stats['total_satellites']}, "
f"Источников: {overall_stats['total_sources']}, "
f"Создано: {overall_stats['created']}, "
f"Обновлено: {overall_stats['updated']}, "
f"Ошибок: {len(overall_stats['errors'])}"
)
if update_progress:
update_progress(
overall_stats['processed_satellites'],
overall_stats['total_satellites'],
"Завершено",
{
"created": overall_stats['created'],
"updated": overall_stats['updated'],
"errors_count": len(overall_stats['errors'])
}
)
except Exception as e:
error_msg = f"Критическая ошибка: {str(e)}"
logger.error(f"{log_prefix} {error_msg}", exc_info=True)
overall_stats['errors'].append(error_msg)
return overall_stats
def clear_lyngsat_cache(cache_type: str = "all") -> dict:
"""
Очищает кеш LyngSat.
Args:
cache_type: Тип кеша для очистки ("regions", "satellites", "all")
Returns:
dict: Статистика очистки
"""
logger.info(f"Очистка кеша LyngSat: {cache_type}")
if cache_type == "all":
stats = AsyncLyngSatParser.clear_all_cache()
else:
stats = AsyncLyngSatParser.clear_cache(cache_type)
logger.info(f"Кеш очищен: {stats}")
return stats