Files
dbstorage/dbapp/mapsapp/utils.py

451 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Standard library imports
import json
import re
from io import BytesIO
# Third-party imports
import requests
from django.db.models import Q
# Local imports
from mainapp.models import Polarization, Satellite
from .models import Transponders
def parse_satellite_name(full_name: str) -> tuple[str, str | None]:
"""
Парсит полное имя спутника и извлекает основное и альтернативное имя.
Альтернативное имя находится в скобках после основного названия.
Примеры:
"Koreasat 116 (ANASIS-II)" -> ("Koreasat 116", "ANASIS-II")
"Thaicom 6 (Africom 1)" -> ("Thaicom 6", "Africom 1")
"Express AM6" -> ("Express AM6", None)
Args:
full_name: Полное имя спутника (может содержать альтернативное имя в скобках)
Returns:
tuple: (основное_имя, альтернативное_имя или None)
"""
if not full_name:
return (full_name, None)
# Ищем текст в скобках в конце строки
match = re.match(r'^(.+?)\s*\(([^)]+)\)\s*$$', full_name.strip())
if match:
main_name = match.group(1).strip()
alt_name = match.group(2).strip()
return (main_name, alt_name)
return (full_name.strip(), None)
def find_satellite_by_name(name: str):
"""
Ищет спутник по имени или альтернативному имени.
Все сравнения выполняются в lowercase.
Алгоритм поиска:
1. Точное совпадение по name (lowercase)
2. Точное совпадение по alternative_name (lowercase)
3. Частичное совпадение по name (lowercase)
4. Частичное совпадение по alternative_name (lowercase)
Args:
name: Имя спутника для поиска
Returns:
Satellite или None: Найденный спутник или None
Raises:
Satellite.MultipleObjectsReturned: Если найдено несколько спутников
"""
if not name:
return None
name_lower = name.strip().lower()
# 1. Точное совпадение по name (lowercase)
try:
return Satellite.objects.get(name__iexact=name_lower)
except Satellite.DoesNotExist:
pass
except Satellite.MultipleObjectsReturned:
raise
# 2. Точное совпадение по alternative_name (lowercase)
try:
return Satellite.objects.get(alternative_name__iexact=name_lower)
except Satellite.DoesNotExist:
pass
except Satellite.MultipleObjectsReturned:
raise
# 3. Частичное совпадение по name или alternative_name (lowercase)
satellites = Satellite.objects.filter(
Q(name__icontains=name_lower) | Q(alternative_name__icontains=name_lower)
)
if satellites.count() == 1:
return satellites.first()
elif satellites.count() > 1:
raise Satellite.MultipleObjectsReturned(
f"Найдено несколько спутников с именем '{name_lower}'"
)
return None
def search_satellite_on_page(data: dict, satellite_name: str):
for pos, value in data.get('page', {}).get('positions').items():
for name in value['satellites']:
if name['other_names'] is None:
name['other_names'] = ''
if satellite_name.lower() in name['name'].lower() or satellite_name.lower() in name['other_names'].lower():
return pos, name['id']
return '', ''
def get_footprint_data(position: str = 62) -> dict:
"""Возвращает словарь с данным по footprint для спутников на выбранной долготе"""
response = requests.get(f"https://www.satbeams.com/footprints?position={position}", verify=True)
response.raise_for_status()
match = re.search(r'var data = ({.*?});', response.text, re.DOTALL)
if match:
json_str = match.group(1)
try:
data = json.loads(json_str)
return data.get("page", {}).get("footprint_data", {}).get("beams",[])
except json.JSONDecodeError as e:
print("Ошибка парсинга JSON:", e)
else:
print("Нужных данных не найдено")
return {}
def get_all_page_data(url:str = 'https://www.satbeams.com/footprints') -> dict:
"""Возвращает словарь с данными по всем спутникам на странице"""
response = requests.get(url)
response.raise_for_status()
match = re.search(r'var data = ({.*?});', response.text, re.DOTALL)
if match:
json_str = match.group(1)
try:
data = json.loads(json_str)
# Файл json на диске для достоверности
with open('data.json', 'w') as jf:
json.dump(data, jf, indent=2)
return data
except json.JSONDecodeError as e:
print("Ошибка парсинга JSON:", e)
else:
print("Нужных данных не найдено")
return {}
def get_names_footprints_for_satellite(footprint_data: dict, sat_id: str) -> list[str]:
names = []
for beam in footprint_data:
if 'ku' in beam['band'].lower() and sat_id in beam['satellite_id']:
names.append(
{
"name": beam['name'],
"fullname": beam['fullname'][8:]
}
)
return names
def get_band_names(satellite_name: str) -> list[str]:
data = get_all_page_data()
pos, sat_id = search_satellite_on_page(data, satellite_name)
footprints = get_footprint_data(pos)
names = get_names_footprints_for_satellite(footprints, sat_id)
return names
def parse_transponders_from_json(filepath: str, user=None):
"""
Парсит транспондеры из JSON файла.
Если имя спутника содержит альтернативное имя в скобках, оно извлекается
и сохраняется в поле alternative_name.
Args:
filepath: путь к JSON файлу
user: пользователь для установки created_by и updated_by (optional)
"""
with open(filepath, encoding="utf-8") as jf:
data = json.load(jf)
for sat_name_full, trans_zone in data["satellites"].items():
# Парсим имя спутника и альтернативное имя
main_name, alt_name = parse_satellite_name(sat_name_full)
for zone, trans in trans_zone.items():
for tran in trans:
f_b, f_e = tran["freq"][0].split("-")
f = round((float(f_b) + float(f_e))/2, 3)
f_range = round(abs(float(f_e) - float(f_b)), 3)
pol_obj = Polarization.objects.get(name=tran["pol"])
# Ищем спутник по имени или альтернативному имени
sat_obj = find_satellite_by_name(main_name)
if not sat_obj:
# Если не найден, создаём новый с альтернативным именем
sat_obj = Satellite.objects.create(
name=main_name,
alternative_name=alt_name
)
elif alt_name and not sat_obj.alternative_name:
# Если найден, но альтернативное имя не установлено - обновляем
sat_obj.alternative_name = alt_name
sat_obj.save()
tran_obj, created = Transponders.objects.get_or_create(
name=tran["name"],
polarization=pol_obj,
sat_id=sat_obj,
defaults={
"frequency": f,
"frequency_range": f_range,
"zone_name": zone,
}
)
# Устанавливаем пользователя, если передан
if user:
if created:
tran_obj.created_by = user
tran_obj.updated_by = user
tran_obj.save()
# Third-party imports (additional)
import logging
from lxml import etree
logger = logging.getLogger(__name__)
def parse_transponders_from_xml(data_in: BytesIO, user=None):
"""
Парсит транспондеры из XML файла.
Если имя спутника содержит альтернативное имя в скобках, оно извлекается
и сохраняется в поле alternative_name.
Процесс импорта:
1. Сначала создаются/обновляются все спутники
2. Затем для каждого спутника добавляются его транспондеры
Args:
data_in: BytesIO объект с XML данными
user: пользователь для установки created_by и updated_by (optional)
Returns:
dict: Статистика импорта с ключами:
- satellites_created: количество созданных спутников
- satellites_updated: количество обновлённых спутников
- satellites_skipped: количество пропущенных спутников (дубликаты)
- satellites_ignored: количество игнорированных спутников (X, DONT USE)
- transponders_created: количество созданных транспондеров
- transponders_existing: количество существующих транспондеров
- errors: список ошибок с деталями
"""
tree = etree.parse(data_in)
ns = {
'i': 'http://www.w3.org/2001/XMLSchema-instance',
'ns': 'http://schemas.datacontract.org/2004/07/Geolocation.Domain.Utils.Repository.SatellitesSerialization.Memos',
'tr': 'http://schemas.datacontract.org/2004/07/Geolocation.Common.Extensions'
}
satellites = tree.xpath('//ns:SatelliteMemo', namespaces=ns)
# Статистика импорта
stats = {
'satellites_created': 0,
'satellites_updated': 0,
'satellites_skipped': 0,
'satellites_ignored': 0,
'transponders_created': 0,
'transponders_existing': 0,
'errors': []
}
# Этап 1: Создание/обновление спутников
satellite_map = {} # Словарь для связи XML элементов со спутниками в БД
for sat in satellites:
name_full = sat.xpath('./ns:name/text()', namespaces=ns)[0]
# Игнорируем служебные записи
if name_full == 'X' or 'DONT USE' in name_full:
stats['satellites_ignored'] += 1
logger.info(f"Игнорирован спутник: {name_full}")
continue
# Парсим имя спутника и альтернативное имя
main_name, alt_name = parse_satellite_name(name_full)
norad = sat.xpath('./ns:norad/text()', namespaces=ns)
intl_code = sat.xpath('.//ns:internationalCode/text()', namespaces=ns)
sub_sat_point = sat.xpath('.//ns:subSatellitePoint/text()', namespaces=ns)
try:
# Ищем спутник по имени или альтернативному имени
sat_obj = find_satellite_by_name(main_name)
if not sat_obj:
# Если не найден, создаём новый с альтернативным именем
sat_obj = Satellite.objects.create(
name=main_name,
alternative_name=alt_name,
norad=int(norad[0]) if norad else -1,
international_code=intl_code[0] if intl_code else "",
undersat_point=float(sub_sat_point[0]) if sub_sat_point else None
)
stats['satellites_created'] += 1
logger.info(f"Создан спутник: {main_name} (альт. имя: {alt_name})")
else:
# Если найден, обновляем поля если они не установлены
updated = False
if alt_name and not sat_obj.alternative_name:
sat_obj.alternative_name = alt_name
updated = True
if norad and not sat_obj.norad:
sat_obj.norad = int(norad[0])
updated = True
if intl_code and not sat_obj.international_code:
sat_obj.international_code = intl_code[0]
updated = True
if sub_sat_point and not sat_obj.undersat_point:
sat_obj.undersat_point = float(sub_sat_point[0])
updated = True
if updated:
sat_obj.save()
stats['satellites_updated'] += 1
logger.info(f"Обновлён спутник: {main_name}")
# Сохраняем связь XML элемента со спутником в БД
satellite_map[sat] = sat_obj
except Satellite.MultipleObjectsReturned:
# Найдено несколько спутников - пропускаем
stats['satellites_skipped'] += 1
duplicates = Satellite.objects.filter(
Q(name__icontains=main_name.lower()) |
Q(alternative_name__icontains=main_name.lower())
)
duplicate_names = [f"{s.name} (ID: {s.id})" for s in duplicates]
error_msg = f"Найдено несколько спутников для '{name_full}': {', '.join(duplicate_names)}"
stats['errors'].append({
'type': 'duplicate_satellite',
'satellite': name_full,
'details': duplicate_names
})
logger.warning(error_msg)
continue
except Exception as e:
# Другие ошибки при обработке спутника
stats['satellites_skipped'] += 1
error_msg = f"Ошибка при обработке спутника '{name_full}': {str(e)}"
stats['errors'].append({
'type': 'satellite_error',
'satellite': name_full,
'error': str(e)
})
logger.error(error_msg, exc_info=True)
continue
# Этап 2: Добавление транспондеров для каждого спутника
for sat, sat_obj in satellite_map.items():
sat_name = sat.xpath('./ns:name/text()', namespaces=ns)[0]
try:
beams = sat.xpath('.//ns:BeamMemo', namespaces=ns)
zones = {}
for zone in beams:
zone_name = zone.xpath('./ns:name/text()', namespaces=ns)[0] if zone.xpath('./ns:name/text()', namespaces=ns) else '-'
zones[zone.xpath('./ns:id/text()', namespaces=ns)[0]] = {
"name": zone_name,
"pol": zone.xpath('./ns:polarization/text()', namespaces=ns)[0],
}
transponders = sat.xpath('.//ns:TransponderMemo', namespaces=ns)
for transponder in transponders:
try:
tr_id = transponder.xpath('./ns:downlinkBeamId/text()', namespaces=ns)[0]
downlink_start = float(transponder.xpath('./ns:downlinkFrequency/tr:start/text()', namespaces=ns)[0])
downlink_end = float(transponder.xpath('./ns:downlinkFrequency/tr:end/text()', namespaces=ns)[0])
uplink_start = float(transponder.xpath('./ns:uplinkFrequency/tr:start/text()', namespaces=ns)[0])
uplink_end = float(transponder.xpath('./ns:uplinkFrequency/tr:end/text()', namespaces=ns)[0])
tr_data = zones[tr_id]
match tr_data['pol']:
case 'Horizontal':
pol = 'Горизонтальная'
case 'Vertical':
pol = 'Вертикальная'
case 'CircularRight':
pol = 'Правая'
case 'CircularLeft':
pol = 'Левая'
case _:
pol = '-'
tr_name = transponder.xpath('./ns:name/text()', namespaces=ns)[0]
pol_obj, _ = Polarization.objects.get_or_create(name=pol)
trans_obj, created = Transponders.objects.get_or_create(
polarization=pol_obj,
downlink=(downlink_start+downlink_end)/2/1000000,
uplink=(uplink_start+uplink_end)/2/1000000,
frequency_range=abs(downlink_end-downlink_start)/1000000,
name=tr_name,
defaults={
"zone_name": tr_data['name'],
"sat_id": sat_obj,
}
)
if created:
stats['transponders_created'] += 1
else:
stats['transponders_existing'] += 1
if user:
if created:
trans_obj.created_by = user
trans_obj.updated_by = user
trans_obj.save()
except Exception as e:
error_msg = f"Ошибка при обработке транспондера спутника '{sat_name}': {str(e)}"
stats['errors'].append({
'type': 'transponder_error',
'satellite': sat_name,
'error': str(e)
})
logger.error(error_msg, exc_info=True)
continue
except Exception as e:
error_msg = f"Ошибка при обработке транспондеров спутника '{sat_name}': {str(e)}"
stats['errors'].append({
'type': 'transponders_processing_error',
'satellite': sat_name,
'error': str(e)
})
logger.error(error_msg, exc_info=True)
continue
# Итоговая статистика в лог
logger.info(
f"Импорт завершён. Спутники: создано {stats['satellites_created']}, "
f"обновлено {stats['satellites_updated']}, пропущено {stats['satellites_skipped']}, "
f"игнорировано {stats['satellites_ignored']}. "
f"Транспондеры: создано {stats['transponders_created']}, "
f"существующих {stats['transponders_existing']}. "
f"Ошибок: {len(stats['errors'])}"
)
return stats