Files
dbstorage/dbapp/mainapp/utils.py

1794 lines
71 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Standard library imports
import io
import json
import re
from datetime import datetime, time
# Django imports
from django.contrib.gis.geos import Point
from django.db.models import F
from django.utils import timezone
# Third-party imports
import pandas as pd
from geographiclib.geodesic import Geodesic
# Local imports
from mapsapp.models import Transponders
from .models import (
CustomUser,
Geo,
Modulation,
ObjItem,
Parameter,
Polarization,
Satellite,
SigmaParameter,
Source,
Standard,
)
def find_matching_transponder(satellite, frequency, polarization):
"""
Находит подходящий транспондер для заданных параметров.
Алгоритм:
1. Фильтрует транспондеры по спутнику и поляризации
2. Проверяет, входит ли частота в диапазон транспондера:
downlink - frequency_range/2 <= frequency <= downlink + frequency_range/2
3. Возвращает самый свежий транспондер (по created_at)
Args:
satellite: объект Satellite
frequency: частота в МГц
polarization: объект Polarization
Returns:
Transponders или None: найденный транспондер или None
"""
if not satellite or not polarization or frequency == -1.0:
return None
# Фильтруем транспондеры по спутнику и поляризации
transponders = Transponders.objects.filter(
sat_id=satellite,
polarization=polarization,
downlink__isnull=False,
frequency_range__isnull=False
).annotate(
# Вычисляем нижнюю и верхнюю границы диапазона
lower_bound=F('downlink') - F('frequency_range') / 2,
upper_bound=F('downlink') + F('frequency_range') / 2
).filter(
# Проверяем, входит ли частота в диапазон
lower_bound__lte=frequency,
upper_bound__gte=frequency
).order_by('-created_at') # Сортируем по дате создания (самые свежие первыми)
# Возвращаем самый свежий транспондер
return transponders.first()
def find_matching_lyngsat(satellite, frequency, polarization, tolerance_mhz=0.1):
"""
Находит подходящий источник LyngSat для заданных параметров.
Алгоритм:
1. Фильтрует источники LyngSat по спутнику и поляризации
2. Проверяет, совпадает ли частота с заданной точностью (по умолчанию ±0.1 МГц)
3. Возвращает самый свежий источник (по last_update)
Args:
satellite: объект Satellite
frequency: частота в МГц
polarization: объект Polarization
tolerance_mhz: допуск по частоте в МГц (по умолчанию 0.1)
Returns:
LyngSat или None: найденный источник LyngSat или None
"""
# Импортируем здесь, чтобы избежать циклических импортов
from lyngsatapp.models import LyngSat
if not satellite or not polarization or frequency == -1.0:
return None
# Фильтруем источники LyngSat по спутнику и поляризации
lyngsat_sources = LyngSat.objects.filter(
id_satellite=satellite,
polarization=polarization,
frequency__isnull=False
).filter(
# Проверяем, входит ли частота в допуск
frequency__gte=frequency - tolerance_mhz,
frequency__lte=frequency + tolerance_mhz
).order_by('-last_update') # Сортируем по дате обновления (самые свежие первыми)
# Возвращаем самый свежий источник
return lyngsat_sources.first()
# ============================================================================
# Константы
# ============================================================================
# Значения по умолчанию для пагинации
DEFAULT_ITEMS_PER_PAGE = 50
MAX_ITEMS_PER_PAGE = 10000
# Значения по умолчанию для данных
DEFAULT_NUMERIC_VALUE = -1.0
MINIMUM_BANDWIDTH_MHZ = 0.08
RANGE_DISTANCE = 56
# ============================================================================
# Вспомогательные функции для работы со спутниками
# ============================================================================
class SatelliteNotFoundError(Exception):
"""Исключение, возникающее когда спутник не найден в базе данных."""
pass
def get_satellite_by_norad(norad_id: int) -> Satellite:
"""
Получает спутник по NORAD ID с обработкой ошибок.
Args:
norad_id: NORAD ID спутника
Returns:
Satellite: объект спутника
Raises:
SatelliteNotFoundError: если спутник не найден
ValueError: если norad_id некорректен
"""
if not norad_id or norad_id == -1:
raise ValueError(f"Некорректный NORAD ID: {norad_id}")
try:
return Satellite.objects.get(norad=norad_id)
except Satellite.DoesNotExist:
raise SatelliteNotFoundError(
f"Спутник с NORAD ID {norad_id} не найден в базе данных. "
f"Добавьте спутник в справочник перед импортом данных."
)
except Satellite.MultipleObjectsReturned:
# Если по какой-то причине есть дубликаты, берем первый
return Satellite.objects.filter(norad=norad_id).first()
def get_satellite_by_name(name: str) -> Satellite:
"""
Получает спутник по имени с обработкой ошибок.
Args:
name: имя спутника
Returns:
Satellite: объект спутника
Raises:
SatelliteNotFoundError: если спутник не найден
"""
if not name or name.strip() == "-":
raise ValueError(f"Некорректное имя спутника: {name}")
try:
return Satellite.objects.get(name=name.strip())
except Satellite.DoesNotExist:
raise SatelliteNotFoundError(
f"Спутник '{name}' не найден в базе данных. "
f"Добавьте спутник в справочник перед импортом данных."
)
except Satellite.MultipleObjectsReturned:
# Если есть дубликаты по имени, берем первый
return Satellite.objects.filter(name=name.strip()).first()
def get_all_constants():
sats = [sat.name for sat in Satellite.objects.all()]
standards = [sat.name for sat in Standard.objects.all()]
pols = [sat.name for sat in Polarization.objects.all()]
# mirrors = [sat.name for sat in Mirror.objects.all()]
modulations = [sat.name for sat in Modulation.objects.all()]
return sats, standards, pols, modulations
def find_mirror_satellites(mirror_names: list) -> list:
"""
Находит спутники, которые соответствуют именам зеркал.
Алгоритм:
1. Для каждого имени зеркала:
- Обрезать пробелы и привести к нижнему регистру
- Найти все спутники, в имени или альтернативном имени которых содержится это имя
2. Вернуть список найденных спутников
Args:
mirror_names: список имен зеркал
Returns:
list: список объектов Satellite
"""
from django.db.models import Q
found_satellites = []
for mirror_name in mirror_names:
if not mirror_name or mirror_name == "-":
continue
# Обрезаем пробелы и приводим к нижнему регистру
mirror_name_clean = mirror_name.strip().lower()
if not mirror_name_clean:
continue
# Ищем спутники, в имени или альтернативном имени которых содержится имя зеркала
satellites = Satellite.objects.filter(
Q(name__icontains=mirror_name_clean) | Q(alternative_name__icontains=mirror_name_clean)
)
found_satellites.extend(satellites)
# Убираем дубликаты
return list(set(found_satellites))
def coords_transform(coords: str):
lat_part, lon_part = coords.strip().split()
sign_map = {"N": 1, "E": 1, "S": -1, "W": -1}
lat_sign_char = lat_part[-1]
lat_value = float(lat_part[:-1].replace(",", "."))
latitude = lat_value * sign_map.get(lat_sign_char, 1)
lon_sign_char = lon_part[-1]
lon_value = float(lon_part[:-1].replace(",", "."))
longitude = lon_value * sign_map.get(lon_sign_char, 1)
return (longitude, latitude)
def remove_str(s: str):
if isinstance(s, str):
if (
s.strip() == "-"
or s.strip() == ""
or s.strip() == " "
or "неизв" in s.strip()
):
return -1
return float(s.strip().replace(",", "."))
return s
def _find_or_create_source_by_name_and_distance(
source_name: str, sat: Satellite, coord: tuple, user
) -> Source:
"""
Находит или создает Source на основе имени источника, спутника и расстояния.
Логика:
1. Ищет все существующие Source с ObjItem, у которых:
- Совпадает спутник
- Совпадает имя (source_name)
2. Для каждого найденного Source проверяет расстояние до новой координаты
3. Если найден Source в радиусе ≤56 км:
- Возвращает его и обновляет coords_average через метод update_coords_average
4. Если не найден подходящий Source:
- Создает новый Source с типом "стационарные"
Важно: Может существовать несколько Source с одинаковым именем и спутником,
но они должны быть географически разделены (>56 км друг от друга).
Args:
source_name: имя источника (например, "Turksat 3A 10967,397 [9,348] МГц V")
sat: объект Satellite
coord: координата в формате (lon, lat)
user: пользователь для created_by
Returns:
Source: найденный или созданный объект Source
"""
# Ищем все существующие ObjItem с таким же именем и спутником
existing_objitems = ObjItem.objects.filter(
name=source_name,
parameter_obj__id_satellite=sat,
source__isnull=False,
source__coords_average__isnull=False
).select_related('source', 'parameter_obj', 'source__info')
# Собираем уникальные Source из найденных ObjItem
existing_sources = {}
for objitem in existing_objitems:
if objitem.source.id not in existing_sources:
existing_sources[objitem.source.id] = objitem.source
# Проверяем расстояние до каждого существующего Source
closest_source = None
min_distance = float('inf')
for source in existing_sources.values():
if source.coords_average:
source_coord = (source.coords_average.x, source.coords_average.y)
_, distance = calculate_mean_coords(source_coord, coord)
if distance <= RANGE_DISTANCE and distance < min_distance:
min_distance = distance
closest_source = source
# Если найден близкий Source (≤56 км)
if closest_source:
# Обновляем coords_average через метод модели
closest_source.update_coords_average(coord)
closest_source.save()
return closest_source
# Если не найден подходящий Source - создаем новый с типом "Стационарные"
from .models import ObjectInfo
stationary_info, _ = ObjectInfo.objects.get_or_create(name="Стационарные")
source = Source.objects.create(
coords_average=Point(coord, srid=4326),
info=stationary_info,
created_by=user
)
return source
def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None, is_automatic=False):
"""
Импортирует данные из DataFrame с группировкой по имени источника и расстоянию.
Алгоритм:
1. Для каждой строки DataFrame:
a. Извлечь имя источника (из колонки "Объект наблюдения")
b. Если is_automatic=False:
- Найти подходящий Source:
* Ищет все Source с таким же именем и спутником
* Проверяет расстояние до каждого Source
* Если найден Source в радиусе ≤56 км - использует его
* Иначе создает новый Source
- Обновить coords_average инкрементально
- Создать ObjItem и связать с Source
c. Если is_automatic=True:
- Создать ObjItem без связи с Source (source=None)
- Точка просто хранится в базе
Важные правила:
- Источники разных спутников НЕ объединяются
- Может быть несколько Source с одинаковым именем, но разделенных географически
- Точка добавляется к Source только если расстояние ≤56 км
- Координаты усредняются инкрементально для каждого источника
- Если точка уже существует (по координатам и времени ГЛ), она не добавляется повторно
Args:
df: DataFrame с данными
sat: объект Satellite
current_user: текущий пользователь (optional)
is_automatic: если True, точки не добавляются к Source (optional, default=False)
Returns:
dict: словарь с результатами импорта {
'new_sources': количество созданных Source,
'added': количество добавленных точек,
'skipped': количество пропущенных дубликатов,
'errors': список ошибок
}
"""
try:
df.rename(columns={"Модуляция ": "Модуляция"}, inplace=True)
except Exception as e:
print(e)
consts = get_all_constants()
df.fillna(-1, inplace=True)
df.sort_values('Дата')
user_to_use = current_user if current_user else CustomUser.objects.get(id=1)
new_sources_count = 0
added_count = 0
skipped_count = 0
errors = []
# Словарь для кэширования Source в рамках текущего импорта
# Ключ: (имя источника, id Source), Значение: объект Source
# Используем id в ключе, т.к. может быть несколько Source с одним именем
sources_cache = {}
for idx, row in df.iterrows():
try:
# Извлекаем координату
coord_tuple = coords_transform(row["Координаты"])
# Извлекаем имя источника
source_name = row["Объект наблюдения"]
# Извлекаем время для проверки дубликатов
date = row["Дата"].date()
time_ = row["Время"]
if isinstance(time_, str):
time_ = time_.strip()
time_ = time(0, 0, 0)
timestamp = datetime.combine(date, time_)
# Проверяем дубликаты по координатам и времени
if _is_duplicate_by_coords_and_time(coord_tuple, timestamp):
skipped_count += 1
continue
source = None
# Если is_automatic=False, работаем с Source
if not is_automatic:
found_in_cache = False
for cache_key, cached_source in sources_cache.items():
cached_name, cached_id = cache_key
# Проверяем имя
if cached_name != source_name:
continue
# Проверяем расстояние
if cached_source.coords_average:
source_coord = (cached_source.coords_average.x, cached_source.coords_average.y)
_, distance = calculate_mean_coords(source_coord, coord_tuple)
if distance <= RANGE_DISTANCE:
# Нашли подходящий Source в кэше
cached_source.update_coords_average(coord_tuple)
cached_source.save()
source = cached_source
found_in_cache = True
break
if not found_in_cache:
# Ищем в БД или создаем новый Source
source = _find_or_create_source_by_name_and_distance(
source_name, sat, coord_tuple, user_to_use
)
# Проверяем, был ли создан новый Source
if source.created_at.timestamp() > (datetime.now().timestamp() - 1):
new_sources_count += 1
# Добавляем в кэш
sources_cache[(source_name, source.id)] = source
# Создаем ObjItem (с Source или без, в зависимости от is_automatic)
_create_objitem_from_row(row, sat, source, user_to_use, consts, is_automatic)
added_count += 1
except Exception as e:
error_msg = f"Строка {idx + 2}: {str(e)}"
print(f"Ошибка при обработке строки {idx}: {e}")
errors.append(error_msg)
continue
print(f"Импорт завершен: создано {new_sources_count} новых источников, "
f"добавлено {added_count} точек, пропущено {skipped_count} дубликатов, "
f"ошибок: {len(errors)}")
return {
'new_sources': new_sources_count,
'added': added_count,
'skipped': skipped_count,
'errors': errors
}
def _create_objitem_from_row(row, sat, source, user_to_use, consts, is_automatic=False):
"""
Вспомогательная функция для создания ObjItem из строки DataFrame.
Теперь ищет дополнительные данные (модуляция, стандарт, символьная скорость)
в таблице TechAnalyze по имени источника и спутнику, если они не указаны в Excel.
Args:
row: строка DataFrame
sat: объект Satellite
source: объект Source для связи (может быть None если is_automatic=True)
user_to_use: пользователь для created_by
consts: константы из get_all_constants()
is_automatic: если True, точка не связывается с Source
"""
# Извлекаем координату
geo_point = Point(coords_transform(row["Координаты"]), srid=4326)
# Обработка поляризации
try:
polarization_obj, _ = Polarization.objects.get_or_create(
name=row["Поляризация"].strip()
)
except KeyError:
polarization_obj, _ = Polarization.objects.get_or_create(name="-")
# Обработка ВЧ параметров из Excel
freq = remove_str(row["Частота, МГц"])
freq_line = remove_str(row["Полоса, МГц"])
v = remove_str(row["Символьная скорость, БОД"])
try:
mod_obj, _ = Modulation.objects.get_or_create(name=row["Модуляция"].strip())
except AttributeError:
mod_obj, _ = Modulation.objects.get_or_create(name="-")
# Ищем данные в TechAnalyze (если не указаны в Excel или указаны как "-")
source_name = row["Объект наблюдения"]
tech_data = None
# Проверяем, нужно ли искать данные в TechAnalyze
# (если модуляция "-" или символьная скорость не указана)
if mod_obj.name == "-" or v == -1.0:
tech_data = _find_tech_analyze_data(source_name, sat)
# Если нашли данные в TechAnalyze, используем их
if tech_data:
if mod_obj.name == "-":
mod_obj = tech_data['modulation']
if v == -1.0:
v = tech_data['bod_velocity']
snr = remove_str(row["ОСШ"])
# Обработка стандарта (если есть в Excel или из TechAnalyze)
try:
standard_name = row.get("Стандарт", "-")
if pd.isna(standard_name) or standard_name == "-":
# Если стандарт не указан в Excel, пытаемся взять из TechAnalyze
if tech_data and tech_data['standard']:
standard_obj = tech_data['standard']
else:
standard_obj, _ = Standard.objects.get_or_create(name="-")
else:
standard_obj, _ = Standard.objects.get_or_create(name=standard_name.strip())
except (KeyError, AttributeError):
# Если столбца "Стандарт" нет, пытаемся взять из TechAnalyze
if tech_data and tech_data['standard']:
standard_obj = tech_data['standard']
else:
standard_obj, _ = Standard.objects.get_or_create(name="-")
# Обработка времени
date = row["Дата"].date()
time_ = row["Время"]
if isinstance(time_, str):
time_ = time_.strip()
time_ = time(0, 0, 0)
timestamp = datetime.combine(date, time_)
# Обработка зеркал - теперь это спутники
mirror_names = []
mirror_1 = row["Зеркало 1"].strip().split("\n")
mirror_2 = row["Зеркало 2"].strip().split("\n")
# Собираем все имена зеркал
for mir in mirror_1:
if mir.strip() and mir.strip() != "-":
mirror_names.append(mir.strip())
for mir in mirror_2:
if mir.strip() and mir.strip() != "-":
mirror_names.append(mir.strip())
# Находим спутники-зеркала
mirror_satellites = find_mirror_satellites(mirror_names)
location = row["Местоопределение"].strip()
comment = row["Комментарий"]
source_name = row["Объект наблюдения"]
geo, _ = Geo.objects.get_or_create(
timestamp=timestamp,
coords=geo_point,
defaults={
"location": location,
"comment": comment,
"is_average": (comment != -1.0),
},
)
geo.save()
# Устанавливаем связи с спутниками-зеркалами
if mirror_satellites:
geo.mirrors.set(mirror_satellites)
# Проверяем, существует ли уже ObjItem с таким же geo
existing_obj_item = ObjItem.objects.filter(geo_obj=geo).first()
if existing_obj_item:
# Проверяем, существует ли parameter с такими же значениями
if (
hasattr(existing_obj_item, "parameter_obj")
and existing_obj_item.parameter_obj
and existing_obj_item.parameter_obj.id_satellite == sat
and existing_obj_item.parameter_obj.polarization == polarization_obj
and existing_obj_item.parameter_obj.frequency == freq
and existing_obj_item.parameter_obj.freq_range == freq_line
and existing_obj_item.parameter_obj.bod_velocity == v
and existing_obj_item.parameter_obj.modulation == mod_obj
and existing_obj_item.parameter_obj.snr == snr
):
# Пропускаем создание дубликата
return
# Находим подходящий транспондер
transponder = find_matching_transponder(sat, freq, polarization_obj)
# Находим подходящий источник LyngSat (точность 0.1 МГц)
lyngsat_source = find_matching_lyngsat(sat, freq, polarization_obj, tolerance_mhz=0.1)
# Создаем новый ObjItem и связываем с Source (если не автоматическая), Transponder и LyngSat
obj_item = ObjItem.objects.create(
name=source_name,
source=source if not is_automatic else None,
transponder=transponder,
lyngsat_source=lyngsat_source,
is_automatic=is_automatic,
created_by=user_to_use
)
# Создаем Parameter (с данными из TechAnalyze если они были найдены)
Parameter.objects.create(
id_satellite=sat,
polarization=polarization_obj,
frequency=freq,
freq_range=freq_line,
bod_velocity=v,
modulation=mod_obj,
snr=snr,
standard=standard_obj,
objitem=obj_item,
)
# Связываем geo с objitem
geo.objitem = obj_item
geo.save()
# Обновляем дату подтверждения источника (только если не автоматическая)
if source and not is_automatic:
source.update_confirm_at()
source.save()
def add_satellite_list():
"""
Добавляет список спутников в базу данных (если их еще нет).
Примечание: Эта функция устарела. Используйте админ-панель для добавления спутников.
"""
sats = [
"AZERSPACE 2",
"Amos 4",
"Astra 4A",
"ComsatBW-1",
"Eutelsat 16A",
"Eutelsat 21B",
"Eutelsat 7B",
"ExpressAM6",
"Hellas Sat 3",
"Intelsat 39",
"Intelsat 17",
"NSS 12",
"Sicral 2",
"SkyNet 5B",
"SkyNet 5D",
"Syracuse 4A",
"Turksat 3A",
"Turksat 4A",
"WGS 10",
"Yamal 402",
]
for sat in sats:
sat_obj, _ = Satellite.objects.get_or_create(name=sat)
sat_obj.save()
def parse_string(s: str):
pattern = r"^(.+?) (-?\d+\,\d+) \[(-?\d+\,\d+)\] ([^\s]+) ([A-Za-z]) - (\d{1,2}\.\d{1,2}\.\d{1,4} \d{1,2}:\d{1,2}:\d{1,2})$"
match = re.match(pattern, s)
if match:
return list(match.groups())
else:
raise ValueError("Некорректный формат строки")
def get_point_from_json(filepath: str):
with open(filepath, encoding="utf-8-sig") as jf:
data = json.load(jf)
for obj in data:
if not obj.get("bearingBehavior", {}):
if obj["tacticObjectType"] == "source":
# if not obj['bearingBehavior']:
source_id = obj["id"]
name = obj["name"]
elements = parse_string(name)
sat_name = elements[0]
freq = elements[1]
freq_range = elements[2]
pol = elements[4]
timestamp = datetime.strptime(elements[-1], "%d.%m.%y %H:%M:%S")
lat = None
lon = None
for pos in data:
if pos["id"] == source_id and pos["tacticObjectType"] == "position":
lat = pos["latitude"]
lon = pos["longitude"]
break
print(
f"Name - {sat_name}, f - {freq}, f range - {freq_range}, pol - {pol} "
f"time - {timestamp}, pos - ({lat}, {lon})"
)
def get_points_from_csv(file_content, current_user=None, is_automatic=False):
"""
Импортирует данные из CSV с группировкой по имени источника и расстоянию.
Алгоритм:
1. Для каждой строки CSV:
a. Извлечь имя источника (из колонки "obj") и спутник
b. Проверить дубликаты (координаты + время ГЛ)
c. Если is_automatic=False:
- Найти подходящий Source:
* Ищет все Source с таким же именем и спутником
* Проверяет расстояние до каждого Source
* Если найден Source в радиусе ≤56 км - использует его
* Иначе создает новый Source
- Обновить coords_average инкрементально
- Создать ObjItem и связать с Source
d. Если is_automatic=True:
- Создать ObjItem без связи с Source (source=None)
- Точка просто хранится в базе
Важные правила:
- Источники разных спутников НЕ объединяются
- Может быть несколько Source с одинаковым именем, но разделенных географически
- Точка добавляется к Source только если расстояние ≤56 км
- Координаты усредняются инкрементально для каждого источника
- Если точка уже существует (по координатам и времени ГЛ), она не добавляется повторно
Args:
file_content: содержимое CSV файла
current_user: текущий пользователь (optional)
is_automatic: если True, точки не добавляются к Source (optional, default=False)
Returns:
dict: словарь с результатами импорта {
'new_sources': количество созданных Source,
'added': количество добавленных точек,
'skipped': количество пропущенных дубликатов,
'errors': список ошибок
}
"""
df = pd.read_csv(
io.StringIO(file_content),
sep=";",
names=[
"id",
"obj",
"lat",
"lon",
"h",
"time",
"sat",
"norad_id",
"freq",
"f_range",
"et",
"qaul",
"mir_1",
"mir_2",
"mir_3",
"mir_4",
"mir_5",
"mir_6",
"mir_7",
],
)
df[["lat", "lon", "freq", "f_range"]] = (
df[["lat", "lon", "freq", "f_range"]]
.replace(",", ".", regex=True)
.astype(float)
)
df["time"] = pd.to_datetime(df["time"], format="%d.%m.%Y %H:%M:%S")
df.sort_values('time')
user_to_use = current_user if current_user else CustomUser.objects.get(id=1)
new_sources_count = 0
added_count = 0
skipped_count = 0
errors = []
# Словарь для кэширования Source в рамках текущего импорта
# Ключ: (имя источника, имя спутника, id Source), Значение: объект Source
sources_cache = {}
for idx, row in df.iterrows():
try:
# Извлекаем координату из колонок lat и lon
coord_tuple = (row["lon"], row["lat"])
# Извлекаем имя источника и спутника
source_name = row["obj"]
sat_name = row["sat"]
# Извлекаем время для проверки дубликатов
timestamp = timezone.make_aware(row["time"])
# Проверяем дубликаты по координатам и времени
if _is_duplicate_by_coords_and_time(coord_tuple, timestamp):
skipped_count += 1
continue
# Получаем объект спутника по NORAD ID
try:
sat_obj = get_satellite_by_norad(row["norad_id"])
except (SatelliteNotFoundError, ValueError) as e:
error_msg = f"Строка {idx + 2}: {str(e)}"
print(error_msg)
errors.append(error_msg)
continue
source = None
# Если is_automatic=False, работаем с Source
if not is_automatic:
# Проверяем кэш: ищем подходящий Source среди закэшированных
found_in_cache = False
for cache_key, cached_source in sources_cache.items():
cached_name, cached_sat, cached_id = cache_key
# Проверяем имя и спутник
if cached_name != source_name or cached_sat != sat_name:
continue
# Проверяем расстояние
if cached_source.coords_average:
source_coord = (cached_source.coords_average.x, cached_source.coords_average.y)
_, distance = calculate_mean_coords(source_coord, coord_tuple)
if distance <= RANGE_DISTANCE:
# Нашли подходящий Source в кэше
cached_source.update_coords_average(coord_tuple)
cached_source.save()
source = cached_source
found_in_cache = True
break
if not found_in_cache:
# Ищем в БД или создаем новый Source
source = _find_or_create_source_by_name_and_distance(
source_name, sat_obj, coord_tuple, user_to_use
)
# Проверяем, был ли создан новый Source
if source.created_at.timestamp() > (datetime.now().timestamp() - 1):
new_sources_count += 1
# Добавляем в кэш
sources_cache[(source_name, sat_name, source.id)] = source
# Создаем ObjItem (с Source или без, в зависимости от is_automatic)
_create_objitem_from_csv_row(row, source, user_to_use, is_automatic)
added_count += 1
except Exception as e:
error_msg = f"Строка {idx + 2}: {str(e)}"
print(f"Ошибка при обработке строки {idx}: {e}")
errors.append(error_msg)
continue
print(f"Импорт завершен: создано {new_sources_count} новых источников, "
f"добавлено {added_count} точек, пропущено {skipped_count} дубликатов, "
f"ошибок: {len(errors)}")
return {
'new_sources': new_sources_count,
'added': added_count,
'skipped': skipped_count,
'errors': errors
}
def _is_duplicate_by_coords_and_time(coord_tuple, timestamp, tolerance_km=0.001):
"""
Проверяет, существует ли уже ObjItem с такими же координатами и временем ГЛ.
Args:
coord_tuple: кортеж (lon, lat) координат
timestamp: время ГЛ (datetime)
tolerance_km: допуск для сравнения координат в километрах (default=0.1)
Returns:
bool: True если дубликат найден, False иначе
"""
# Ищем Geo с таким же timestamp и близкими координатами
existing_geo = Geo.objects.filter(
timestamp=timestamp,
coords__isnull=False
)
for geo in existing_geo:
if not geo.coords:
continue
# Проверяем расстояние между координатами
geo_coord = (geo.coords.x, geo.coords.y)
_, distance = calculate_mean_coords(coord_tuple, geo_coord)
if distance <= tolerance_km:
# Найден дубликат
return True
return False
def _is_duplicate_objitem(coord_tuple, frequency, freq_range, tolerance=0.1):
"""
Проверяет, существует ли уже ObjItem с такими же координатами и частотой.
Args:
coord_tuple: кортеж (lon, lat) координат
frequency: частота в МГц
freq_range: полоса частот в МГц
tolerance: допуск для сравнения координат в километрах
Returns:
bool: True если дубликат найден, False иначе
"""
# Ищем ObjItems с близкими координатами через geo_obj
nearby_objitems = ObjItem.objects.filter(
geo_obj__coords__isnull=False
).select_related('parameter_obj', 'geo_obj')
for objitem in nearby_objitems:
if not objitem.geo_obj or not objitem.geo_obj.coords:
continue
# Проверяем расстояние между координатами
geo_coord = (objitem.geo_obj.coords.x, objitem.geo_obj.coords.y)
_, distance = calculate_mean_coords(coord_tuple, geo_coord)
if distance <= tolerance:
# Координаты совпадают, проверяем частоту
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
param = objitem.parameter_obj
# Проверяем совпадение частоты с небольшим допуском (0.1 МГц)
if (abs(param.frequency - frequency) < 0.1 and
abs(param.freq_range - freq_range) < 0.1):
return True
return False
def _find_tech_analyze_data(name: str, satellite: Satellite):
"""
Ищет данные технического анализа по имени и спутнику.
Args:
name: имя источника
satellite: объект Satellite
Returns:
dict или None: словарь с данными {modulation, standard, bod_velocity} или None
"""
from .models import TechAnalyze
try:
tech_analyze = TechAnalyze.objects.filter(
name=name,
satellite=satellite
).select_related('modulation', 'standard').first()
if tech_analyze:
return {
'modulation': tech_analyze.modulation,
'standard': tech_analyze.standard,
'bod_velocity': tech_analyze.bod_velocity if tech_analyze.bod_velocity else -1.0
}
except Exception as e:
print(f"Ошибка при поиске TechAnalyze для {name}: {e}")
return None
def _create_objitem_from_csv_row(row, source, user_to_use, is_automatic=False):
"""
Вспомогательная функция для создания ObjItem из строки CSV DataFrame.
Теперь ищет дополнительные данные (модуляция, стандарт, символьная скорость)
в таблице TechAnalyze по имени источника и спутнику.
Args:
row: строка DataFrame
source: объект Source для связи (может быть None если is_automatic=True)
user_to_use: пользователь для created_by
is_automatic: если True, точка не связывается с Source
"""
# Определяем поляризацию
match row["obj"].split(" ")[-1]:
case "V":
pol = "Вертикальная"
case "H":
pol = "Горизонтальная"
case "R":
pol = "Правая"
case "L":
pol = "Левая"
case _:
pol = "-"
pol_obj, _ = Polarization.objects.get_or_create(name=pol)
# Получаем объект спутника по NORAD ID
try:
sat_obj = get_satellite_by_norad(row["norad_id"])
except (SatelliteNotFoundError, ValueError) as e:
raise Exception(f"Не удалось получить спутник: {str(e)}")
# Ищем данные в TechAnalyze
tech_data = _find_tech_analyze_data(row["obj"], sat_obj)
# Обработка зеркал - теперь это спутники
mirror_names = []
if not pd.isna(row["mir_1"]) and row["mir_1"].strip() != "-":
mirror_names.append(row["mir_1"])
if not pd.isna(row["mir_2"]) and row["mir_2"].strip() != "-":
mirror_names.append(row["mir_2"])
if not pd.isna(row["mir_3"]) and row["mir_3"].strip() != "-":
mirror_names.append(row["mir_3"])
# Находим спутники-зеркала
mirror_satellites = find_mirror_satellites(mirror_names)
# Создаем Geo объект
geo_obj, _ = Geo.objects.get_or_create(
timestamp=timezone.make_aware(row["time"]),
coords=Point(row["lon"], row["lat"], srid=4326),
defaults={
"is_average": False,
},
)
# Устанавливаем связи с спутниками-зеркалами
if mirror_satellites:
geo_obj.mirrors.set(mirror_satellites)
# Проверяем, существует ли уже ObjItem с таким же geo
existing_obj_item = ObjItem.objects.filter(geo_obj=geo_obj).first()
if existing_obj_item:
# Проверяем, существует ли parameter с такими же значениями
if (
hasattr(existing_obj_item, "parameter_obj")
and existing_obj_item.parameter_obj
and existing_obj_item.parameter_obj.id_satellite == sat_obj
and existing_obj_item.parameter_obj.polarization == pol_obj
and existing_obj_item.parameter_obj.frequency == row["freq"]
and existing_obj_item.parameter_obj.freq_range == row["f_range"]
):
# Пропускаем создание дубликата
return
# Находим подходящий транспондер
transponder = find_matching_transponder(sat_obj, row["freq"], pol_obj)
# Находим подходящий источник LyngSat (точность 0.1 МГц)
lyngsat_source = find_matching_lyngsat(sat_obj, row["freq"], pol_obj, tolerance_mhz=0.1)
# Создаем новый ObjItem и связываем с Source (если не автоматическая), Transponder и LyngSat
obj_item = ObjItem.objects.create(
name=row["obj"],
source=source if not is_automatic else None,
transponder=transponder,
lyngsat_source=lyngsat_source,
is_automatic=is_automatic,
created_by=user_to_use
)
# Создаем Parameter с данными из TechAnalyze (если найдены)
if tech_data:
# Используем данные из TechAnalyze
Parameter.objects.create(
id_satellite=sat_obj,
polarization=pol_obj,
frequency=row["freq"],
freq_range=row["f_range"],
bod_velocity=tech_data['bod_velocity'],
modulation=tech_data['modulation'],
standard=tech_data['standard'],
objitem=obj_item,
)
else:
# Создаем без дополнительных данных (как раньше)
Parameter.objects.create(
id_satellite=sat_obj,
polarization=pol_obj,
frequency=row["freq"],
freq_range=row["f_range"],
objitem=obj_item,
)
# Связываем geo с objitem
geo_obj.objitem = obj_item
geo_obj.save()
# Обновляем дату подтверждения источника (только если не автоматическая)
if source and not is_automatic:
source.update_confirm_at()
source.save()
def get_vch_load_from_html(file, sat: Satellite) -> None:
filename = file.name.split("_")
transfer = filename[3]
match filename[2]:
case "H":
pol = "Горизонтальная"
case "V":
pol = "Вертикальная"
case "R":
pol = "Правая"
case "L":
pol = "Левая"
case _:
pol = "-"
tables = pd.read_html(file, encoding="windows-1251")
df = tables[0]
df = df.drop(0).reset_index(drop=True)
df.columns = df.iloc[0]
df = df.drop(0).reset_index(drop=True)
df.replace("Неизвестно", "-", inplace=True)
df[["Частота, МГц", "Полоса, МГц", "Мощность, дБм"]] = df[
["Частота, МГц", "Полоса, МГц", "Мощность, дБм"]
].apply(pd.to_numeric)
df["Время начала измерения"] = df["Время начала измерения"].apply(
lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S")
)
df["Время окончания измерения"] = df["Время окончания измерения"].apply(
lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S")
)
for stroka in df.iterrows():
value = stroka[1]
if value["Полоса, МГц"] < 0.08:
continue
if "-" in value["Символьная скорость"]:
bod_velocity = -1.0
else:
bod_velocity = value["Символьная скорость"]
if "-" in value["Сигнал/шум, дБ"]:
snr = -1.0
else:
snr = value["Сигнал/шум, дБ"]
if value["Пакетность"] == "да":
pack = True
elif value["Пакетность"] == "нет":
pack = False
else:
pack = None
polarization, _ = Polarization.objects.get_or_create(name=pol)
mod, _ = Modulation.objects.get_or_create(name=value["Модуляция"])
standard, _ = Standard.objects.get_or_create(name=value["Стандарт"])
sigma_load, _ = SigmaParameter.objects.get_or_create(
id_satellite=sat,
frequency=value["Частота, МГц"],
freq_range=value["Полоса, МГц"],
polarization=polarization,
defaults={
"transfer": float(transfer),
# "polarization": polarization,
"status": value["Статус"],
"power": value["Мощность, дБм"],
"bod_velocity": bod_velocity,
"modulation": mod,
"snr": snr,
"packets": pack,
"datetime_begin": value["Время начала измерения"],
"datetime_end": value["Время окончания измерения"],
},
)
sigma_load.save()
def get_frequency_tolerance_percent(freq_range_mhz: float) -> float:
"""
Определяет процент погрешности центральной частоты в зависимости от полосы частот.
Args:
freq_range_mhz (float): Полоса частот в МГц
Returns:
float: Процент погрешности для центральной частоты
Диапазоны:
- 0 - 0.5 МГц (0 - 500 кГц): 0.1%
- 0.5 - 1.5 МГц (500 кГц - 1.5 МГц): 0.5%
- 1.5 - 5 МГц: 1%
- 5 - 10 МГц: 2%
- > 10 МГц: 5%
"""
if freq_range_mhz < 0.5:
return 0.005
elif freq_range_mhz < 1.5:
return 0.01
elif freq_range_mhz < 5.0:
return 0.02
elif freq_range_mhz < 10.0:
return 0.05
else:
return 0.1
def compare_and_link_vch_load(
sat_id: Satellite, eps_freq: float, eps_frange: float, ku_range: float
):
"""
Привязывает SigmaParameter к Parameter на основе совпадения параметров.
Погрешность центральной частоты определяется автоматически в зависимости от полосы частот:
- 0-500 кГц: 0.1%
- 500 кГц-1.5 МГц: 0.5%
- 1.5-5 МГц: 1%
- 5-10 МГц: 2%
- >10 МГц: 5%
Args:
sat_id (Satellite): Спутник для фильтрации
eps_freq (float): Не используется (оставлен для обратной совместимости)
eps_frange (float): Погрешность полосы частот в процентах
ku_range (float): Не используется (оставлен для обратной совместимости)
Returns:
tuple: (количество объектов, количество привязок)
"""
# Получаем все ObjItem с Parameter для данного спутника
item_obj = ObjItem.objects.filter(
parameter_obj__id_satellite=sat_id
).select_related("parameter_obj", "parameter_obj__polarization")
vch_sigma = SigmaParameter.objects.filter(id_satellite=sat_id).select_related(
"polarization"
)
link_count = 0
obj_count = item_obj.count()
for obj in item_obj:
vch_load = obj.parameter_obj
# Пропускаем объекты с некорректной частотой
if not vch_load or vch_load.frequency == -1.0:
continue
# Определяем погрешность частоты на основе полосы
freq_tolerance_percent = get_frequency_tolerance_percent(vch_load.freq_range)
# Вычисляем допустимое отклонение частоты в МГц
freq_tolerance_mhz = vch_load.frequency * freq_tolerance_percent / 100
# Вычисляем допустимое отклонение полосы в МГц
frange_tolerance_mhz = vch_load.freq_range * eps_frange / 100
for sigma in vch_sigma:
# Проверяем совпадение по всем параметрам
freq_match = (
abs(sigma.transfer_frequency - vch_load.frequency) <= freq_tolerance_mhz
)
frange_match = (
abs(sigma.freq_range - vch_load.freq_range) <= frange_tolerance_mhz
)
pol_match = sigma.polarization == vch_load.polarization
if freq_match and frange_match and pol_match:
sigma.parameter = vch_load
sigma.save()
link_count += 1
return obj_count, link_count
def kub_report(data_in: io.StringIO) -> pd.DataFrame:
df_in = pd.read_excel(data_in)
df = pd.DataFrame(
columns=[
"Дата",
"Широта",
"Долгота",
"Высота",
"Населённый пункт",
"ИСЗ",
"Прямой канал, МГц",
"Обратный канал, МГц",
"Перенос, МГц",
"Полоса, МГц",
"Зеркала",
]
)
for row in df_in.iterrows():
value = row[1]
date = datetime.date(datetime.now())
isz = value["ИСЗ"]
try:
lat = float(value["Широта, град"].strip().replace(",", "."))
lon = float(value["Долгота, град"].strip().replace(",", "."))
downlink = float(value["Обратный канал, МГц"].strip().replace(",", "."))
freq_range = float(value["Полоса, МГц"].strip().replace(",", "."))
except Exception as e:
lat = value["Широта, град"]
lon = value["Долгота, град"]
downlink = value["Обратный канал, МГц"]
freq_range = value["Полоса, МГц"]
print(e)
norad = int(re.findall(r"\((\d+)\)", isz)[0])
sat_obj = Satellite.objects.get(norad=norad)
pol_obj = Polarization.objects.get(name=value["Поляризация"].strip())
transponder = Transponders.objects.filter(
sat_id=sat_obj,
polarization=pol_obj,
downlink__gte=downlink - F("frequency_range") / 2,
downlink__lte=downlink + F("frequency_range") / 2,
).first()
# try:
# location = geolocator.reverse(f"{lat}, {lon}", language="ru").raw['address']
# loc_name = location.get('city', '') or location.get('town', '') or location.get('province', '') or location.get('country', '')
# except AttributeError:
# loc_name = ''
# sleep(1)
loc_name = ""
if transponder: # and not (len(transponder) > 1):
transfer = transponder.transfer
uplink = transfer + downlink
new_row = pd.DataFrame(
[
{
"Дата": date,
"Широта": lat,
"Долгота": lon,
"Высота": 0.0,
"Населённый пункт": loc_name,
"ИСЗ": isz,
"Прямой канал, МГц": uplink,
"Обратный канал, МГц": downlink,
"Перенос, МГц": transfer,
"Полоса, МГц": freq_range,
"Зеркала": "",
}
]
)
df = pd.concat([df, new_row], ignore_index=True)
else:
print("Ничего не найдено в транспондерах")
return df
# ============================================================================
# Утилиты для работы с координатами
# ============================================================================
# Импорт pyproj для работы с проекциями
from pyproj import CRS, Transformer
def get_gauss_kruger_zone(longitude: float) -> int:
"""
Определяет номер зоны Гаусса-Крюгера по долготе.
Зоны ГК нумеруются от 1 до 60, каждая зона охватывает 6° долготы.
Центральный меридиан зоны N: (6*N - 3)°
Args:
longitude: Долгота в градусах (от -180 до 180)
Returns:
int: Номер зоны ГК (1-60)
"""
# Нормализуем долготу к диапазону 0-360
lon_normalized = longitude if longitude >= 0 else longitude + 360
# Вычисляем номер зоны (1-60)
zone = int((lon_normalized + 6) / 6)
if zone > 60:
zone = 60
if zone < 1:
zone = 1
return zone
def get_gauss_kruger_epsg(zone: int) -> int:
"""
Возвращает EPSG код для зоны Гаусса-Крюгера (Pulkovo 1942 / Gauss-Kruger).
EPSG коды для Pulkovo 1942 GK зон:
- Зона 4: EPSG:28404
- Зона 5: EPSG:28405
- ...
- Зона N: EPSG:28400 + N
Args:
zone: Номер зоны ГК (1-60)
Returns:
int: EPSG код проекции
"""
return 28400 + zone
def transform_wgs84_to_gk(coord: tuple, zone: int = None) -> tuple:
"""
Преобразует координаты из WGS84 (EPSG:4326) в проекцию Гаусса-Крюгера.
Args:
coord: Координаты в формате (longitude, latitude) в WGS84
zone: Номер зоны ГК (если None, определяется автоматически по долготе)
Returns:
tuple: Координаты (x, y) в метрах в проекции ГК
"""
lon, lat = coord
if zone is None:
zone = get_gauss_kruger_zone(lon)
epsg_gk = get_gauss_kruger_epsg(zone)
# Создаём трансформер WGS84 -> GK
transformer = Transformer.from_crs(
CRS.from_epsg(4326),
CRS.from_epsg(epsg_gk),
always_xy=True
)
x, y = transformer.transform(lon, lat)
return (x, y)
def transform_gk_to_wgs84(coord: tuple, zone: int) -> tuple:
"""
Преобразует координаты из проекции Гаусса-Крюгера в WGS84 (EPSG:4326).
Args:
coord: Координаты (x, y) в метрах в проекции ГК
zone: Номер зоны ГК
Returns:
tuple: Координаты (longitude, latitude) в WGS84
"""
x, y = coord
epsg_gk = get_gauss_kruger_epsg(zone)
# Создаём трансформер GK -> WGS84
transformer = Transformer.from_crs(
CRS.from_epsg(epsg_gk),
CRS.from_epsg(4326),
always_xy=True
)
lon, lat = transformer.transform(x, y)
return (lon, lat)
def calculate_distance_gk(coord1_gk: tuple, coord2_gk: tuple) -> float:
"""
Вычисляет расстояние между двумя точками в проекции ГК (в километрах).
Args:
coord1_gk: Первая точка (x, y) в метрах
coord2_gk: Вторая точка (x, y) в метрах
Returns:
float: Расстояние в километрах
"""
import math
x1, y1 = coord1_gk
x2, y2 = coord2_gk
distance_m = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
return distance_m / 1000
def average_coords_in_gk(coords: list[tuple], zone: int = None) -> tuple:
"""
Вычисляет среднее арифметическое координат в проекции Гаусса-Крюгера.
Алгоритм:
1. Определяет зону ГК по первой точке (если не указана)
2. Преобразует все координаты в проекцию ГК
3. Вычисляет среднее арифметическое X и Y
4. Преобразует результат обратно в WGS84
Args:
coords: Список координат в формате [(lon1, lat1), (lon2, lat2), ...]
zone: Номер зоны ГК (если None, определяется по первой точке)
Returns:
tuple: Средние координаты (longitude, latitude) в WGS84
"""
if not coords:
return (0, 0)
if len(coords) == 1:
return coords[0]
# Определяем зону по первой точке
if zone is None:
zone = get_gauss_kruger_zone(coords[0][0])
# Преобразуем все координаты в ГК
coords_gk = [transform_wgs84_to_gk(c, zone) for c in coords]
# Вычисляем среднее арифметическое
avg_x = sum(c[0] for c in coords_gk) / len(coords_gk)
avg_y = sum(c[1] for c in coords_gk) / len(coords_gk)
# Преобразуем обратно в WGS84
return transform_gk_to_wgs84((avg_x, avg_y), zone)
def calculate_mean_coords(coord1: tuple, coord2: tuple) -> tuple[tuple, float]:
"""
Вычисляет среднюю точку между двумя координатами с использованием геодезических вычислений (с учётом эллипсоида).
:param lat1: Широта первой точки в градусах.
:param lon1: Долгота первой точки в градусах.
:param lat2: Широта второй точки в градусах.
:param lon2: Долгота второй точки в градусах.
:return: Словарь с ключами 'lat' и 'lon' для средней точки, и расстояние(dist) в КМ.
"""
lon1, lat1 = coord1
lon2, lat2 = coord2
geod_inv = Geodesic.WGS84.Inverse(lat1, lon1, lat2, lon2)
azimuth1 = geod_inv['azi1']
distance = geod_inv['s12']
geod_direct = Geodesic.WGS84.Direct(lat1, lon1, azimuth1, distance / 2)
return (geod_direct['lon2'], geod_direct['lat2']), distance/1000
def calculate_distance_wgs84(coord1: tuple, coord2: tuple) -> float:
"""
Вычисляет расстояние между двумя точками в WGS84 (в километрах).
Args:
coord1: Первая точка (longitude, latitude)
coord2: Вторая точка (longitude, latitude)
Returns:
float: Расстояние в километрах
"""
lon1, lat1 = coord1
lon2, lat2 = coord2
geod_inv = Geodesic.WGS84.Inverse(lat1, lon1, lat2, lon2)
return geod_inv['s12'] / 1000
def calculate_average_coords_incremental(
current_average: tuple, new_coord: tuple
) -> tuple:
"""
Вычисляет новое среднее между текущим средним и новой координатой.
Использует инкрементальное усреднение: каждая новая точка усредняется
с текущим средним, а не со всеми точками кластера. Это упрощенный подход,
где новое среднее = (текущее_среднее + новая_координата) / 2.
Важно: Это НЕ среднее арифметическое всех точек кластера, а инкрементальное
усреднение между двумя точками (текущим средним и новой точкой).
Args:
current_average (tuple): Текущее среднее в формате (longitude, latitude)
new_coord (tuple): Новая координата в формате (longitude, latitude)
Returns:
tuple: Новое среднее в формате (longitude, latitude)
Example:
>>> avg1 = (37.62, 55.75) # Первая точка
>>> avg2 = calculate_average_coords_incremental(avg1, (37.63, 55.76))
>>> print(avg2)
(37.625, 55.755)
>>> avg3 = calculate_average_coords_incremental(avg2, (37.64, 55.77))
>>> print(avg3)
(37.6325, 55.7625)
>>> # Проверка: среднее между одинаковыми точками
>>> avg = calculate_average_coords_incremental((37.62, 55.75), (37.62, 55.75))
>>> print(avg)
(37.62, 55.75)
"""
current_lon, current_lat = current_average
new_lon, new_lat = new_coord
# Инкрементальное усреднение: (current + new) / 2
avg_lon = (current_lon + new_lon) / 2
avg_lat = (current_lat + new_lat) / 2
return (avg_lon, avg_lat)
# ============================================================================
# Утилиты для форматирования
# ============================================================================
def format_coordinates(longitude: float, latitude: float) -> str:
"""
Форматирует координаты в читаемый вид.
Преобразует числовые координаты в формат с указанием направления
(N/S для широты, E/W для долготы).
Args:
longitude (float): Долгота в десятичных градусах.
latitude (float): Широта в десятичных градусах.
Returns:
str: Отформатированная строка координат в формате "XXN/S YYE/W".
Example:
>>> format_coordinates(37.62, 55.75)
'55.75N 37.62E'
>>> format_coordinates(-122.42, 37.77)
'37.77N 122.42W'
"""
lon_direction = "E" if longitude > 0 else "W"
lat_direction = "N" if latitude > 0 else "S"
lon_value = abs(longitude)
lat_value = abs(latitude)
return f"{lat_value}{lat_direction} {lon_value}{lon_direction}"
def parse_pagination_params(
request, default_per_page: int = DEFAULT_ITEMS_PER_PAGE
) -> tuple:
"""
Извлекает и валидирует параметры пагинации из запроса.
Args:
request: HTTP запрос Django.
default_per_page (int): Количество элементов на странице по умолчанию.
Returns:
tuple: Кортеж (page_number, items_per_page), где:
- page_number (int): Номер текущей страницы (по умолчанию 1).
- items_per_page (int): Количество элементов на странице.
Example:
>>> page, per_page = parse_pagination_params(request, default_per_page=100)
>>> paginator = Paginator(objects, per_page)
>>> page_obj = paginator.get_page(page)
"""
page_number = request.GET.get("page", 1)
items_per_page = request.GET.get("items_per_page", str(default_per_page))
# Валидация page_number
try:
page_number = int(page_number)
if page_number < 1:
page_number = 1
except (ValueError, TypeError):
page_number = 1
# Валидация items_per_page
try:
items_per_page = int(items_per_page)
if items_per_page < 1:
items_per_page = default_per_page
# Ограничиваем максимальное значение для предотвращения перегрузки
if items_per_page > MAX_ITEMS_PER_PAGE:
items_per_page = MAX_ITEMS_PER_PAGE
except (ValueError, TypeError):
items_per_page = default_per_page
return page_number, items_per_page
def get_first_param_subquery(field_name: str):
"""
Возвращает F() выражение для доступа к полю параметра через OneToOne связь.
После рефакторинга связи Parameter-ObjItem с ManyToMany на OneToOne,
эта функция упрощена для возврата прямого F() выражения вместо подзапроса.
Args:
field_name (str): Имя поля модели Parameter для извлечения.
Может включать связанные поля через __ (например, 'id_satellite__name').
Returns:
F: Django F() объект для использования в annotate().
Example:
>>> freq_expr = get_first_param_subquery('frequency')
>>> objects = ObjItem.objects.annotate(first_freq=freq_expr)
>>> for obj in objects:
... print(obj.first_freq)
"""
return F(f"parameter_obj__{field_name}")
# ============================================================================
# Number Formatting Functions
# ============================================================================
def format_coordinate(value):
"""
Format coordinate value to 4 decimal places.
Args:
value: Numeric coordinate value
Returns:
str: Formatted coordinate or '-' if None
"""
if value is None:
return '-'
try:
return f"{float(value):.4f}"
except (ValueError, TypeError):
return '-'
def format_frequency(value):
"""
Format frequency value to 3 decimal places.
Args:
value: Numeric frequency value in MHz
Returns:
str: Formatted frequency or '-' if None
"""
if value is None:
return '-'
try:
return f"{float(value):.3f}"
except (ValueError, TypeError):
return '-'
def format_symbol_rate(value):
"""
Format symbol rate (bod_velocity) to integer.
Args:
value: Numeric symbol rate value
Returns:
str: Formatted symbol rate or '-' if None
"""
if value is None:
return '-'
try:
return f"{float(value):.0f}"
except (ValueError, TypeError):
return '-'
def format_coords_display(point):
"""
Format geographic point coordinates for display.
Args:
point: GeoDjango Point object
Returns:
str: Formatted coordinates as "LAT LON" or '-' if None
"""
if not point:
return '-'
try:
longitude = point.coords[0]
latitude = point.coords[1]
lon = f"{abs(longitude):.4f}E" if longitude > 0 else f"{abs(longitude):.4f}W"
lat = f"{abs(latitude):.4f}N" if latitude > 0 else f"{abs(latitude):.4f}S"
return f"{lat} {lon}"
except (AttributeError, IndexError, TypeError):
return '-'