Добавил кеш к lyngsat

This commit is contained in:
2025-11-11 21:43:59 +03:00
parent 4f21c9d7c8
commit a3c381b9c7
15 changed files with 1282 additions and 229 deletions

View File

@@ -0,0 +1,564 @@
"""
Асинхронный парсер данных LyngSat с поддержкой кеширования в Redis.
"""
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import re
import logging
from typing import Optional
from django.core.cache import cache
logger = logging.getLogger(__name__)
def parse_satellite_names(satellite_string: str) -> list[str]:
"""Извлекает все возможные имена спутников из строки."""
slash_parts = [part.strip() for part in satellite_string.split('/')]
all_names = []
for part in slash_parts:
main_match = re.match(r'^([^(]+)', part)
if main_match:
main_name = main_match.group(1).strip()
if main_name:
all_names.append(main_name)
bracket_match = re.search(r'\(([^)]+)\)', part)
if bracket_match:
bracket_name = bracket_match.group(1).strip()
if bracket_name:
all_names.append(bracket_name)
seen = set()
result = []
for name in all_names:
if name not in seen:
seen.add(name)
result.append(name.strip().lower())
return result
class AsyncLyngSatParser:
"""
Асинхронный парсер данных для LyngSat с поддержкой кеширования.
Кеширование:
- Страницы регионов кешируются на 7 дней
- Данные спутников кешируются на 1 день
"""
# Время жизни кеша
REGION_CACHE_TTL = 60 * 60 * 24 * 7 # 7 дней
SATELLITE_CACHE_TTL = 60 * 60 * 24 # 1 день
# Префиксы для ключей кеша
REGION_CACHE_PREFIX = "lyngsat_region"
SATELLITE_CACHE_PREFIX = "lyngsat_satellite"
SATELLITE_LIST_CACHE_PREFIX = "lyngsat_sat_list"
def __init__(
self,
flaresolver_url: str = "http://localhost:8191/v1",
regions: list[str] | None = None,
target_sats: list[str] | None = None,
use_cache: bool = True,
):
"""
Инициализация парсера.
Args:
flaresolver_url: URL FlareSolverr для обхода защиты
regions: Список регионов для парсинга
target_sats: Список целевых спутников (в нижнем регистре)
use_cache: Использовать ли кеширование
"""
self.flaresolver_url = flaresolver_url
self.use_cache = use_cache
self.target_sats = (
list(map(lambda sat: sat.strip().lower(), target_sats)) if target_sats else None
)
self.regions = regions if regions else ["europe", "asia", "america", "atlantic"]
self.BASE_URL = "https://www.lyngsat.com"
def _get_cache_key(self, prefix: str, identifier: str) -> str:
"""Генерирует ключ для кеша."""
return f"{prefix}:{identifier}"
def _get_from_cache(self, key: str) -> Optional[any]:
"""Получает данные из кеша."""
if not self.use_cache:
return None
try:
data = cache.get(key)
if data:
logger.debug(f"Данные получены из кеша: {key}")
return data
except Exception as e:
logger.warning(f"Ошибка при получении из кеша {key}: {e}")
return None
def _set_to_cache(self, key: str, value: any, ttl: int) -> None:
"""Сохраняет данные в кеш."""
if not self.use_cache:
return
try:
cache.set(key, value, timeout=ttl)
logger.debug(f"Данные сохранены в кеш: {key} (TTL: {ttl}s)")
except Exception as e:
logger.warning(f"Ошибка при сохранении в кеш {key}: {e}")
@classmethod
def clear_cache(cls, cache_type: str = "all") -> dict:
"""
Очищает кеш парсера.
Args:
cache_type: Тип кеша для очистки ("regions", "satellites", "all")
Returns:
dict: Статистика очистки
"""
stats = {"cleared": 0, "errors": []}
try:
from django.core.cache import cache as django_cache
if cache_type in ("regions", "all"):
# Очищаем кеш регионов
regions = ["europe", "asia", "america", "atlantic"]
for region in regions:
key = f"{cls.REGION_CACHE_PREFIX}:{region}"
try:
result = django_cache.delete(key)
if result:
stats["cleared"] += 1
logger.info(f"Очищен кеш региона: {region}")
else:
logger.debug(f"Кеш региона {region} не найден или уже удален")
except Exception as e:
error_msg = f"Ошибка при очистке кеша региона {region}: {e}"
logger.error(error_msg)
stats["errors"].append(error_msg)
if cache_type in ("satellites", "all"):
# Для очистки кеша спутников используем keys()
if hasattr(django_cache, 'keys'):
try:
# Очищаем списки спутников
list_keys = django_cache.keys(f"{cls.SATELLITE_LIST_CACHE_PREFIX}:*")
if list_keys:
if hasattr(django_cache, 'delete_many'):
django_cache.delete_many(list_keys)
else:
for key in list_keys:
django_cache.delete(key)
stats["cleared"] += len(list_keys)
logger.info(f"Очищено {len(list_keys)} списков спутников")
# Очищаем данные спутников
sat_keys = django_cache.keys(f"{cls.SATELLITE_CACHE_PREFIX}:*")
if sat_keys:
if hasattr(django_cache, 'delete_many'):
django_cache.delete_many(sat_keys)
else:
for key in sat_keys:
django_cache.delete(key)
stats["cleared"] += len(sat_keys)
logger.info(f"Очищено {len(sat_keys)} данных спутников")
except Exception as e:
error_msg = f"Ошибка при очистке кеша спутников: {e}"
logger.error(error_msg)
stats["errors"].append(error_msg)
else:
logger.warning("Бэкенд кеша не поддерживает keys()")
logger.info("Для полной очистки используйте: redis-cli flushdb")
except Exception as e:
error_msg = f"Критическая ошибка при очистке кеша: {e}"
logger.error(error_msg)
stats["errors"].append(error_msg)
return stats
@classmethod
def clear_all_cache(cls) -> dict:
"""Полностью очищает весь кеш LyngSat."""
stats = {"cleared": 0, "errors": []}
try:
from django.core.cache import cache as django_cache
# Для django-redis используем keys() + delete_many()
if hasattr(django_cache, 'keys'):
patterns = [
f"{cls.REGION_CACHE_PREFIX}:*",
f"{cls.SATELLITE_CACHE_PREFIX}:*",
f"{cls.SATELLITE_LIST_CACHE_PREFIX}:*",
]
all_keys = []
for pattern in patterns:
try:
keys = django_cache.keys(pattern)
if keys:
all_keys.extend(keys)
logger.info(f"Найдено {len(keys)} ключей по паттерну: {pattern}")
except Exception as e:
error_msg = f"Ошибка при поиске ключей {pattern}: {e}"
logger.error(error_msg)
stats["errors"].append(error_msg)
# Удаляем все найденные ключи
if all_keys:
try:
if hasattr(django_cache, 'delete_many'):
django_cache.delete_many(all_keys)
else:
for key in all_keys:
django_cache.delete(key)
stats["cleared"] = len(all_keys)
logger.info(f"Удалено {len(all_keys)} ключей")
except Exception as e:
error_msg = f"Ошибка при удалении ключей: {e}"
logger.error(error_msg)
stats["errors"].append(error_msg)
else:
logger.info("Ключи для удаления не найдены")
elif hasattr(django_cache, 'delete_pattern'):
# Fallback на delete_pattern
patterns = [
f"{cls.REGION_CACHE_PREFIX}:*",
f"{cls.SATELLITE_CACHE_PREFIX}:*",
f"{cls.SATELLITE_LIST_CACHE_PREFIX}:*",
]
for pattern in patterns:
try:
deleted = django_cache.delete_pattern(pattern)
if deleted and isinstance(deleted, int):
stats["cleared"] += deleted
logger.info(f"Очищено {deleted} ключей по паттерну: {pattern}")
except Exception as e:
error_msg = f"Ошибка при очистке паттерна {pattern}: {e}"
logger.error(error_msg)
stats["errors"].append(error_msg)
else:
# Fallback для других бэкендов кеша
logger.warning("Бэкенд кеша не поддерживает keys() или delete_pattern()")
return cls.clear_cache("all")
except Exception as e:
error_msg = f"Критическая ошибка при полной очистке кеша: {e}"
logger.error(error_msg)
stats["errors"].append(error_msg)
return stats
def parse_metadata(self, metadata: str) -> dict:
"""Парсит метаданные транспондера."""
if not metadata or not metadata.strip():
return {
"standard": None,
"modulation": None,
"symbol_rate": None,
"fec": None,
}
normalized = re.sub(r"\s+", "", metadata.strip())
fec_match = re.search(r"([1-9]/[1-9])$", normalized)
fec = fec_match.group(1) if fec_match else None
if fec_match:
core = normalized[: fec_match.start()]
else:
core = normalized
std_match = re.match(r"(DVB-S2?|ABS-S|DVB-T2?|ATSC|ISDB)", core)
standard = std_match.group(1) if std_match else None
rest = core[len(standard) :] if standard else core
modulation = None
mod_match = re.match(r"(8PSK|QPSK|16APSK|32APSK|64QAM|256QAM|BPSK)", rest)
if mod_match:
modulation = mod_match.group(1)
rest = rest[len(modulation) :]
symbol_rate = None
sr_match = re.search(r"(\d+)$", rest)
if sr_match:
try:
symbol_rate = int(sr_match.group(1))
except ValueError:
pass
return {
"standard": standard,
"modulation": modulation,
"symbol_rate": symbol_rate,
"fec": fec,
}
def extract_date(self, s: str) -> datetime | None:
"""Извлекает дату из строки формата YYMMDD."""
s = s.strip()
match = re.search(r"(\d{6})$", s)
if not match:
return None
yymmdd = match.group(1)
try:
return datetime.strptime(yymmdd, "%y%m%d").date()
except ValueError:
return None
def convert_polarization(self, polarization: str) -> str:
"""Преобразует код поляризации в понятное название на русском."""
polarization_map = {
"V": "Вертикальная",
"H": "Горизонтальная",
"R": "Правая",
"L": "Левая",
}
return polarization_map.get(polarization.upper(), polarization)
def fetch_region_page(self, region: str, force_refresh: bool = False) -> Optional[str]:
"""
Получает HTML страницу региона с кешированием.
Args:
region: Название региона
force_refresh: Принудительно обновить кеш
Returns:
HTML содержимое страницы или None при ошибке
"""
cache_key = self._get_cache_key(self.REGION_CACHE_PREFIX, region)
# Проверяем кеш
if not force_refresh:
cached_html = self._get_from_cache(cache_key)
if cached_html:
logger.info(f"Страница региона {region} получена из кеша")
return cached_html
# Запрашиваем страницу
url = f"{self.BASE_URL}/{region}.html"
logger.info(f"Запрос страницы региона: {url}")
try:
payload = {"cmd": "request.get", "url": url, "maxTimeout": 60000}
response = requests.post(self.flaresolver_url, json=payload, timeout=70)
if response.status_code != 200:
logger.error(f"Ошибка при запросе {url}: статус {response.status_code}")
return None
html_content = response.json().get("solution", {}).get("response", "")
if html_content:
# Сохраняем в кеш
self._set_to_cache(cache_key, html_content, self.REGION_CACHE_TTL)
logger.info(f"Страница региона {region} получена и закеширована")
return html_content
except Exception as e:
logger.error(f"Ошибка при получении страницы {url}: {e}", exc_info=True)
return None
def get_satellite_list_from_region(self, region: str, force_refresh: bool = False) -> list[dict]:
"""
Получает список спутников из региона.
Args:
region: Название региона
force_refresh: Принудительно обновить кеш
Returns:
Список словарей с информацией о спутниках
"""
# Создаем уникальный ключ кеша с учетом целевых спутников
# Если target_sats не указаны, используем "all"
sats_key = "all" if not self.target_sats else "_".join(sorted(self.target_sats))
cache_key = self._get_cache_key(self.SATELLITE_LIST_CACHE_PREFIX, f"{region}_{sats_key}")
# Проверяем кеш
if not force_refresh:
cached_list = self._get_from_cache(cache_key)
if cached_list:
logger.info(f"Список спутников региона {region} (фильтр: {sats_key[:50]}) получен из кеша")
return cached_list
# Получаем HTML страницы
html_content = self.fetch_region_page(region, force_refresh)
if not html_content:
return []
# Парсим список спутников
satellites = []
try:
soup = BeautifulSoup(html_content, "html.parser")
col_table = soup.find_all("div", class_="desktab")[0]
tables = col_table.find_next_sibling("table").find_all("table")
trs = []
for table in tables:
trs.extend(table.find_all("tr"))
for tr in trs:
sat_name = tr.find("span").text.replace("ü", "u").strip().lower()
# Фильтруем по целевым спутникам
if self.target_sats is not None:
names = parse_satellite_names(sat_name)
if len(names) == 1:
sat_name = names[0]
else:
for name in names:
if name in self.target_sats:
sat_name = name
break
if sat_name not in self.target_sats:
continue
try:
sat_url = tr.find_all("a")[2]["href"]
except IndexError:
sat_url = tr.find_all("a")[0]["href"]
update_date_str = tr.find_all("td")[-1].text
try:
update_date = datetime.strptime(update_date_str, "%y%m%d").date()
except (ValueError, TypeError):
update_date = None
satellites.append({
"name": sat_name,
"url": sat_url,
"update_date": update_date,
"region": region
})
# Сохраняем в кеш
self._set_to_cache(cache_key, satellites, self.REGION_CACHE_TTL)
sats_filter = "все" if not self.target_sats else f"{len(self.target_sats)} целевых"
logger.info(f"Найдено {len(satellites)} спутников в регионе {region} (фильтр: {sats_filter})")
except Exception as e:
logger.error(f"Ошибка при парсинге списка спутников региона {region}: {e}", exc_info=True)
return satellites
def fetch_satellite_data(self, sat_name: str, sat_url: str, force_refresh: bool = False) -> Optional[dict]:
"""
Получает данные одного спутника с кешированием.
Args:
sat_name: Название спутника
sat_url: URL страницы спутника
force_refresh: Принудительно обновить кеш
Returns:
Словарь с данными спутника или None при ошибке
"""
cache_key = self._get_cache_key(self.SATELLITE_CACHE_PREFIX, sat_name)
# Проверяем кеш
if not force_refresh:
cached_data = self._get_from_cache(cache_key)
if cached_data:
logger.info(f"Данные спутника {sat_name} получены из кеша")
return cached_data
# Запрашиваем данные
full_url = f"{self.BASE_URL}/{sat_url}"
logger.info(f"Запрос данных спутника {sat_name}: {full_url}")
try:
payload = {"cmd": "request.get", "url": full_url, "maxTimeout": 60000}
response = requests.post(self.flaresolver_url, json=payload, timeout=70)
if response.status_code != 200:
logger.error(f"Ошибка при запросе {full_url}: статус {response.status_code}")
return None
html_content = response.json().get("solution", {}).get("response", "")
if not html_content:
logger.warning(f"Пустой ответ для спутника {sat_name}")
return None
# Парсим данные
sources = self.parse_satellite_content(html_content)
satellite_data = {
"name": sat_name,
"url": full_url,
"sources": sources,
"fetched_at": datetime.now().isoformat()
}
# Сохраняем в кеш
self._set_to_cache(cache_key, satellite_data, self.SATELLITE_CACHE_TTL)
logger.info(f"Данные спутника {sat_name} получены и закешированы ({len(sources)} источников)")
return satellite_data
except Exception as e:
logger.error(f"Ошибка при получении данных спутника {sat_name}: {e}", exc_info=True)
return None
def parse_satellite_content(self, html_content: str) -> list[dict]:
"""Парсит содержимое страницы спутника."""
data = []
try:
sat_soup = BeautifulSoup(html_content, "html.parser")
big_table = sat_soup.find("table", class_="bigtable")
if not big_table:
logger.warning("Таблица bigtable не найдена")
return data
all_tables = big_table.find_all("div", class_="desktab")[:-1]
for table in all_tables:
trs = table.find_next_sibling("table").find_all("tr")
for idx, tr in enumerate(trs):
tds = tr.find_all("td")
if len(tds) < 9 or idx < 2:
continue
try:
freq, polarization = tds[0].find("b").text.strip().split("\xa0")
polarization = self.convert_polarization(polarization)
meta = self.parse_metadata(tds[1].text)
provider_name = tds[3].text
last_update = self.extract_date(tds[-1].text)
data.append({
"freq": freq,
"pol": polarization,
"metadata": meta,
"provider_name": provider_name,
"last_update": last_update,
})
except Exception as e:
logger.debug(f"Ошибка при парсинге строки транспондера: {e}")
continue
except Exception as e:
logger.error(f"Ошибка при парсинге содержимого спутника: {e}", exc_info=True)
return data
def get_all_satellites_list(self, force_refresh: bool = False) -> list[dict]:
"""
Получает список всех спутников из всех регионов.
Args:
force_refresh: Принудительно обновить кеш
Returns:
Список словарей с информацией о спутниках
"""
all_satellites = []
for region in self.regions:
logger.info(f"Получение списка спутников из региона: {region}")
satellites = self.get_satellite_list_from_region(region, force_refresh)
all_satellites.extend(satellites)
logger.info(f"Всего найдено спутников: {len(all_satellites)}")
return all_satellites

View File

@@ -0,0 +1,287 @@
"""
Утилиты для асинхронной обработки данных LyngSat с кешированием.
"""
import logging
from typing import Callable, Optional
from .async_parser import AsyncLyngSatParser
from .models import LyngSat
from mainapp.models import Polarization, Standard, Modulation, Satellite
logger = logging.getLogger(__name__)
def process_single_satellite(
parser: AsyncLyngSatParser,
satellite_info: dict,
force_refresh: bool = False
) -> dict:
"""
Обрабатывает один спутник и сохраняет данные в БД.
Args:
parser: Экземпляр парсера
satellite_info: Информация о спутнике (name, url, update_date)
force_refresh: Принудительно обновить кеш
Returns:
dict: Статистика обработки спутника
"""
sat_name = satellite_info["name"]
sat_url = satellite_info["url"]
stats = {
"satellite_name": sat_name,
"sources_found": 0,
"created": 0,
"updated": 0,
"errors": []
}
logger.info(f"Обработка спутника: {sat_name}")
# Получаем данные спутника (из кеша или с сайта)
satellite_data = parser.fetch_satellite_data(sat_name, sat_url, force_refresh)
if not satellite_data:
error_msg = f"Не удалось получить данные для спутника {sat_name}"
logger.error(error_msg)
stats["errors"].append(error_msg)
return stats
sources = satellite_data.get("sources", [])
stats["sources_found"] = len(sources)
logger.info(f"Найдено {len(sources)} источников для {sat_name}")
# Находим спутник в базе
try:
sat_obj = Satellite.objects.get(name__icontains=sat_name)
logger.debug(f"Спутник {sat_name} найден в базе (ID: {sat_obj.id})")
except Satellite.DoesNotExist:
error_msg = f"Спутник '{sat_name}' не найден в базе данных"
logger.warning(error_msg)
stats["errors"].append(error_msg)
return stats
except Satellite.MultipleObjectsReturned:
error_msg = f"Найдено несколько спутников с именем '{sat_name}'"
logger.warning(error_msg)
stats["errors"].append(error_msg)
return stats
# Обрабатываем каждый источник
for source_idx, source in enumerate(sources, 1):
try:
# Парсим частоту
try:
freq = float(source['freq'])
except (ValueError, TypeError):
freq = -1.0
logger.debug(f"Некорректная частота для {sat_name}: {source.get('freq')}")
last_update = source['last_update']
fec = source['metadata'].get('fec')
modulation_name = source['metadata'].get('modulation')
standard_name = source['metadata'].get('standard')
symbol_velocity = source['metadata'].get('symbol_rate')
polarization_name = source['pol']
channel_info = source['provider_name']
# Создаем или получаем связанные объекты
pol_obj, _ = Polarization.objects.get_or_create(
name=polarization_name if polarization_name else "-"
)
mod_obj, _ = Modulation.objects.get_or_create(
name=modulation_name if modulation_name else "-"
)
standard_obj, _ = Standard.objects.get_or_create(
name=standard_name if standard_name else "-"
)
# Создаем или обновляем запись Lyngsat
lyng_obj, created = LyngSat.objects.update_or_create(
id_satellite=sat_obj,
frequency=freq,
polarization=pol_obj,
defaults={
"modulation": mod_obj,
"standard": standard_obj,
"sym_velocity": symbol_velocity if symbol_velocity else 0,
"channel_info": channel_info[:20] if channel_info else "",
"last_update": last_update,
"fec": fec[:30] if fec else "",
"url": satellite_data["url"]
}
)
if created:
stats['created'] += 1
logger.debug(f"Создана запись для {sat_name} {freq} МГц")
else:
stats['updated'] += 1
logger.debug(f"Обновлена запись для {sat_name} {freq} МГц")
# Логируем прогресс каждые 10 источников
if source_idx % 10 == 0:
logger.info(f"Обработано {source_idx}/{len(sources)} источников для {sat_name}")
except Exception as e:
error_msg = f"Ошибка при обработке источника {sat_name}: {str(e)}"
logger.error(error_msg, exc_info=True)
stats['errors'].append(error_msg)
continue
logger.info(f"Завершена обработка {sat_name}: создано {stats['created']}, обновлено {stats['updated']}")
return stats
def fill_lyngsat_data_async(
target_sats: list[str],
regions: list[str] = None,
task_id: str = None,
update_progress: Optional[Callable] = None,
force_refresh: bool = False,
use_cache: bool = True
) -> dict:
"""
Асинхронно заполняет данные Lyngsat для указанных спутников.
Обрабатывает спутники по одному с кешированием.
Args:
target_sats: Список названий спутников для обработки
regions: Список регионов для парсинга (по умолчанию все)
task_id: ID задачи Celery для логирования
update_progress: Функция для обновления прогресса (current, total, status, details)
force_refresh: Принудительно обновить кеш
use_cache: Использовать ли кеширование
Returns:
dict: Статистика обработки
"""
log_prefix = f"[Task {task_id}]" if task_id else "[Lyngsat Async]"
overall_stats = {
'total_satellites': 0,
'processed_satellites': 0,
'total_sources': 0,
'created': 0,
'updated': 0,
'errors': [],
'satellites_details': []
}
if regions is None:
regions = ["europe", "asia", "america", "atlantic"]
logger.info(f"{log_prefix} Начало асинхронной обработки данных")
logger.info(f"{log_prefix} Спутники: {', '.join(target_sats)}")
logger.info(f"{log_prefix} Регионы: {', '.join(regions)}")
logger.info(f"{log_prefix} Использование кеша: {use_cache}, Принудительное обновление: {force_refresh}")
if update_progress:
update_progress(0, len(target_sats), "Инициализация парсера...", {})
try:
# Создаем парсер
parser = AsyncLyngSatParser(
flaresolver_url="http://localhost:8191/v1",
target_sats=target_sats,
regions=regions,
use_cache=use_cache
)
logger.info(f"{log_prefix} Получение списка спутников...")
if update_progress:
update_progress(0, len(target_sats), "Получение списка спутников...", {})
# Получаем список всех спутников
all_satellites = parser.get_all_satellites_list(force_refresh)
overall_stats['total_satellites'] = len(all_satellites)
logger.info(f"{log_prefix} Найдено {len(all_satellites)} спутников для обработки")
# Обрабатываем каждый спутник по отдельности
for idx, satellite_info in enumerate(all_satellites, 1):
sat_name = satellite_info["name"]
logger.info(f"{log_prefix} Обработка спутника {idx}/{len(all_satellites)}: {sat_name}")
if update_progress:
update_progress(
idx - 1,
len(all_satellites),
f"Обработка {sat_name}...",
{
"current_satellite": sat_name,
"created": overall_stats['created'],
"updated": overall_stats['updated']
}
)
# Обрабатываем спутник
sat_stats = process_single_satellite(parser, satellite_info, force_refresh)
# Обновляем общую статистику
overall_stats['processed_satellites'] += 1
overall_stats['total_sources'] += sat_stats['sources_found']
overall_stats['created'] += sat_stats['created']
overall_stats['updated'] += sat_stats['updated']
overall_stats['errors'].extend(sat_stats['errors'])
overall_stats['satellites_details'].append(sat_stats)
logger.info(
f"{log_prefix} Спутник {sat_name} обработан: "
f"источников {sat_stats['sources_found']}, "
f"создано {sat_stats['created']}, "
f"обновлено {sat_stats['updated']}"
)
logger.info(
f"{log_prefix} Обработка завершена. "
f"Спутников: {overall_stats['processed_satellites']}/{overall_stats['total_satellites']}, "
f"Источников: {overall_stats['total_sources']}, "
f"Создано: {overall_stats['created']}, "
f"Обновлено: {overall_stats['updated']}, "
f"Ошибок: {len(overall_stats['errors'])}"
)
if update_progress:
update_progress(
overall_stats['processed_satellites'],
overall_stats['total_satellites'],
"Завершено",
{
"created": overall_stats['created'],
"updated": overall_stats['updated'],
"errors_count": len(overall_stats['errors'])
}
)
except Exception as e:
error_msg = f"Критическая ошибка: {str(e)}"
logger.error(f"{log_prefix} {error_msg}", exc_info=True)
overall_stats['errors'].append(error_msg)
return overall_stats
def clear_lyngsat_cache(cache_type: str = "all") -> dict:
"""
Очищает кеш LyngSat.
Args:
cache_type: Тип кеша для очистки ("regions", "satellites", "all")
Returns:
dict: Статистика очистки
"""
logger.info(f"Очистка кеша LyngSat: {cache_type}")
if cache_type == "all":
stats = AsyncLyngSatParser.clear_all_cache()
else:
stats = AsyncLyngSatParser.clear_cache(cache_type)
logger.info(f"Кеш очищен: {stats}")
return stats

View File

View File

@@ -0,0 +1,40 @@
"""
Management команда для очистки кеша LyngSat.
"""
from django.core.management.base import BaseCommand
from lyngsatapp.async_utils import clear_lyngsat_cache
class Command(BaseCommand):
help = 'Очищает кеш данных LyngSat'
def add_arguments(self, parser):
parser.add_argument(
'--type',
type=str,
default='all',
choices=['regions', 'satellites', 'all'],
help='Тип кеша для очистки (regions, satellites, all)'
)
def handle(self, *args, **options):
cache_type = options['type']
self.stdout.write(f'Очистка кеша LyngSat: {cache_type}...')
stats = clear_lyngsat_cache(cache_type)
self.stdout.write(
self.style.SUCCESS(
f'Кеш очищен успешно! Удалено записей: {stats["cleared"]}'
)
)
if stats['errors']:
self.stdout.write(
self.style.WARNING(
f'Ошибок при очистке: {len(stats["errors"])}'
)
)
for error in stats['errors']:
self.stdout.write(self.style.ERROR(f' - {error}'))

View File

@@ -6,14 +6,101 @@ from celery import shared_task
from django.core.cache import cache
from .utils import fill_lyngsat_data
from .async_utils import fill_lyngsat_data_async, clear_lyngsat_cache
logger = logging.getLogger(__name__)
@shared_task(bind=True, name='lyngsatapp.fill_lyngsat_data_async')
def fill_lyngsat_data_task(self, target_sats, regions=None):
def fill_lyngsat_data_task(self, target_sats, regions=None, force_refresh=False, use_cache=True):
"""
Асинхронная задача для заполнения данных Lyngsat.
Асинхронная задача для заполнения данных Lyngsat с кешированием.
Обрабатывает спутники по одному.
Args:
target_sats: Список названий спутников для обработки
regions: Список регионов для парсинга (по умолчанию все)
force_refresh: Принудительно обновить кеш
use_cache: Использовать ли кеширование
Returns:
dict: Статистика обработки
"""
task_id = self.request.id
logger.info(f"[Task {task_id}] Начало обработки данных Lyngsat")
logger.info(f"[Task {task_id}] Спутники: {', '.join(target_sats)}")
logger.info(f"[Task {task_id}] Регионы: {', '.join(regions) if regions else 'все'}")
logger.info(f"[Task {task_id}] Кеширование: {use_cache}, Принудительное обновление: {force_refresh}")
# Обновляем статус задачи
self.update_state(
state='PROGRESS',
meta={
'current': 0,
'total': len(target_sats),
'status': 'Инициализация...',
'details': {}
}
)
try:
# Вызываем асинхронную функцию заполнения данных
stats = fill_lyngsat_data_async(
target_sats=target_sats,
regions=regions,
task_id=task_id,
force_refresh=force_refresh,
use_cache=use_cache,
update_progress=lambda current, total, status, details: self.update_state(
state='PROGRESS',
meta={
'current': current,
'total': total,
'status': status,
'details': details
}
)
)
logger.info(f"[Task {task_id}] Обработка завершена успешно")
logger.info(f"[Task {task_id}] Статистика: {stats}")
# Сохраняем результат в кеш для отображения на странице
cache.set(f'lyngsat_task_{task_id}', stats, timeout=3600)
return stats
except Exception as e:
logger.error(f"[Task {task_id}] Ошибка при обработке: {str(e)}", exc_info=True)
error_message = f"{type(e).__name__}: {str(e)}"
self.update_state(
state='FAILURE',
meta={
'error': error_message,
'status': 'Ошибка при обработке',
'details': {},
'exc_type': type(e).__name__,
'exc_message': str(e)
}
)
# Возвращаем словарь с ошибкой вместо raise для корректной сериализации
return {
'error': error_message,
'status': 'FAILURE',
'total_satellites': 0,
'processed_satellites': 0,
'total_sources': 0,
'created': 0,
'updated': 0,
'errors': [error_message]
}
@shared_task(bind=True, name='lyngsatapp.fill_lyngsat_data_sync')
def fill_lyngsat_data_task_sync(self, target_sats, regions=None):
"""
Синхронная задача для заполнения данных Lyngsat (старая версия без кеширования).
Используется для обратной совместимости.
Args:
target_sats: Список названий спутников для обработки
@@ -23,7 +110,7 @@ def fill_lyngsat_data_task(self, target_sats, regions=None):
dict: Статистика обработки
"""
task_id = self.request.id
logger.info(f"[Task {task_id}] Начало обработки данных Lyngsat")
logger.info(f"[Task {task_id}] Начало синхронной обработки данных Lyngsat")
logger.info(f"[Task {task_id}] Спутники: {', '.join(target_sats)}")
logger.info(f"[Task {task_id}] Регионы: {', '.join(regions) if regions else 'все'}")
@@ -38,7 +125,7 @@ def fill_lyngsat_data_task(self, target_sats, regions=None):
)
try:
# Вызываем функцию заполнения данных
# Вызываем старую функцию заполнения данных
stats = fill_lyngsat_data(
target_sats=target_sats,
regions=regions,
@@ -63,11 +150,52 @@ def fill_lyngsat_data_task(self, target_sats, regions=None):
except Exception as e:
logger.error(f"[Task {task_id}] Ошибка при обработке: {str(e)}", exc_info=True)
error_message = f"{type(e).__name__}: {str(e)}"
self.update_state(
state='FAILURE',
meta={
'error': str(e),
'status': 'Ошибка при обработке'
'error': error_message,
'status': 'Ошибка при обработке',
'exc_type': type(e).__name__,
'exc_message': str(e)
}
)
raise
# Возвращаем словарь с ошибкой вместо raise для корректной сериализации
return {
'error': error_message,
'status': 'FAILURE',
'total_satellites': 0,
'total_sources': 0,
'created': 0,
'updated': 0,
'errors': [error_message]
}
@shared_task(bind=True, name='lyngsatapp.clear_cache')
def clear_cache_task(self, cache_type='all'):
"""
Задача для очистки кеша LyngSat.
Args:
cache_type: Тип кеша для очистки ("regions", "satellites", "all")
Returns:
dict: Статистика очистки
"""
task_id = self.request.id
logger.info(f"[Task {task_id}] Запуск задачи очистки кеша: {cache_type}")
try:
stats = clear_lyngsat_cache(cache_type)
logger.info(f"[Task {task_id}] Кеш очищен успешно: {stats}")
return stats
except Exception as e:
logger.error(f"[Task {task_id}] Ошибка при очистке кеша: {str(e)}", exc_info=True)
error_message = f"{type(e).__name__}: {str(e)}"
return {
'error': error_message,
'status': 'FAILURE',
'cleared': 0,
'errors': [error_message]
}