Files
dbstorage/dbapp/mainapp/utils.py

1188 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Standard library imports
import io
import json
import re
from datetime import datetime, time
# Django imports
from django.contrib.gis.geos import Point
from django.db.models import F
# Third-party imports
import pandas as pd
from geographiclib.geodesic import Geodesic
# Local imports
from mapsapp.models import Transponders
from .models import (
CustomUser,
Geo,
Mirror,
Modulation,
ObjItem,
Parameter,
Polarization,
Satellite,
SigmaParameter,
Source,
Standard,
)
def find_matching_transponder(satellite, frequency, polarization):
"""
Находит подходящий транспондер для заданных параметров.
Алгоритм:
1. Фильтрует транспондеры по спутнику и поляризации
2. Проверяет, входит ли частота в диапазон транспондера:
downlink - frequency_range/2 <= frequency <= downlink + frequency_range/2
3. Возвращает самый свежий транспондер (по created_at)
Args:
satellite: объект Satellite
frequency: частота в МГц
polarization: объект Polarization
Returns:
Transponders или None: найденный транспондер или None
"""
if not satellite or not polarization or frequency == -1.0:
return None
# Фильтруем транспондеры по спутнику и поляризации
transponders = Transponders.objects.filter(
sat_id=satellite,
polarization=polarization,
downlink__isnull=False,
frequency_range__isnull=False
).annotate(
# Вычисляем нижнюю и верхнюю границы диапазона
lower_bound=F('downlink') - F('frequency_range') / 2,
upper_bound=F('downlink') + F('frequency_range') / 2
).filter(
# Проверяем, входит ли частота в диапазон
lower_bound__lte=frequency,
upper_bound__gte=frequency
).order_by('-created_at') # Сортируем по дате создания (самые свежие первыми)
# Возвращаем самый свежий транспондер
return transponders.first()
# ============================================================================
# Константы
# ============================================================================
# Значения по умолчанию для пагинации
DEFAULT_ITEMS_PER_PAGE = 50
MAX_ITEMS_PER_PAGE = 10000
# Значения по умолчанию для данных
DEFAULT_NUMERIC_VALUE = -1.0
MINIMUM_BANDWIDTH_MHZ = 0.08
RANGE_DISTANCE = 56
def get_all_constants():
sats = [sat.name for sat in Satellite.objects.all()]
standards = [sat.name for sat in Standard.objects.all()]
pols = [sat.name for sat in Polarization.objects.all()]
mirrors = [sat.name for sat in Mirror.objects.all()]
modulations = [sat.name for sat in Modulation.objects.all()]
return sats, standards, pols, mirrors, modulations
def find_mirror_satellites(mirror_names: list) -> list:
"""
Находит спутники, которые соответствуют именам зеркал.
Алгоритм:
1. Для каждого имени зеркала:
- Обрезать пробелы и привести к нижнему регистру
- Найти все спутники, в имени которых содержится это имя
2. Вернуть список найденных спутников
Args:
mirror_names: список имен зеркал
Returns:
list: список объектов Satellite
"""
found_satellites = []
for mirror_name in mirror_names:
if not mirror_name or mirror_name == "-":
continue
# Обрезаем пробелы и приводим к нижнему регистру
mirror_name_clean = mirror_name.strip().lower()
if not mirror_name_clean:
continue
# Ищем спутники, в имени которых содержится имя зеркала
satellites = Satellite.objects.filter(
name__icontains=mirror_name_clean
)
found_satellites.extend(satellites)
# Убираем дубликаты
return list(set(found_satellites))
def coords_transform(coords: str):
lat_part, lon_part = coords.strip().split()
sign_map = {"N": 1, "E": 1, "S": -1, "W": -1}
lat_sign_char = lat_part[-1]
lat_value = float(lat_part[:-1].replace(",", "."))
latitude = lat_value * sign_map.get(lat_sign_char, 1)
lon_sign_char = lon_part[-1]
lon_value = float(lon_part[:-1].replace(",", "."))
longitude = lon_value * sign_map.get(lon_sign_char, 1)
return (longitude, latitude)
def remove_str(s: str):
if isinstance(s, str):
if (
s.strip() == "-"
or s.strip() == ""
or s.strip() == " "
or "неизв" in s.strip()
):
return -1
return float(s.strip().replace(",", "."))
return s
def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None):
"""
Импортирует данные из DataFrame с группировкой близких координат.
Улучшенный алгоритм с учетом существующих Source:
1. Извлечь все координаты и данные строк из DataFrame
2. Создать список необработанных записей (координата + данные строки)
3. Получить все существующие Source из БД
4. Для каждой необработанной записи:
a. Найти ближайший существующий Source (расстояние <= 56 км)
b. Если найден:
- Обновить coords_average этого Source (инкрементально)
- Создать ObjItem и связать с этим Source
- Удалить запись из списка необработанных
5. Пока список необработанных записей не пуст:
a. Взять первую запись из списка
b. Создать новый Source с coords_average = эта координата
c. Создать ObjItem для этой записи и связать с Source
d. Удалить запись из списка
e. Для каждой оставшейся записи в списке:
- Вычислить расстояние от её координаты до coords_average
- Если расстояние <= 56 км:
* Вычислить новое среднее ИНКРЕМЕНТАЛЬНО:
new_avg = (coords_average + current_coord) / 2
* Обновить coords_average в Source
* Создать ObjItem для этой записи и связать с Source
* Удалить запись из списка
- Иначе: пропустить и проверить следующую запись
6. Сохранить все изменения в БД
Важно: Среднее вычисляется инкрементально - каждая новая точка
усредняется с текущим средним, а не со всеми точками кластера.
Args:
df: DataFrame с данными
sat: объект Satellite
current_user: текущий пользователь (optional)
Returns:
int: количество созданных Source
"""
try:
df.rename(columns={"Модуляция ": "Модуляция"}, inplace=True)
except Exception as e:
print(e)
consts = get_all_constants()
df.fillna(-1, inplace=True)
# Шаг 1: Извлечь все координаты и данные строк из DataFrame
unprocessed_records = []
for idx, row in df.iterrows():
try:
# Извлекаем координату
coord_tuple = coords_transform(row["Координаты"])
# Сохраняем запись с координатой и данными строки
unprocessed_records.append({"coord": coord_tuple, "row": row, "index": idx})
except Exception as e:
print(f"Ошибка при обработке строки {idx}: {e}")
continue
user_to_use = current_user if current_user else CustomUser.objects.get(id=1)
source_count = 0
added_to_existing_count = 0
# Шаг 3: Получить все существующие Source из БД
existing_sources = list(Source.objects.filter(coords_average__isnull=False))
# Шаг 4: Попытка добавить записи к существующим Source
records_to_remove = []
for i, record in enumerate(unprocessed_records):
current_coord = record["coord"]
# Найти ближайший существующий Source
closest_source = None
min_distance = float('inf')
best_new_avg = None
for source in existing_sources:
source_coord = (source.coords_average.x, source.coords_average.y)
new_avg, distance = calculate_mean_coords(source_coord, current_coord)
if distance < min_distance:
min_distance = distance
closest_source = source
best_new_avg = new_avg
# Если найден близкий Source (расстояние <= 56 км)
if closest_source and min_distance <= RANGE_DISTANCE:
# Обновить coords_average инкрементально
closest_source.coords_average = Point(best_new_avg, srid=4326)
closest_source.save()
# Создать ObjItem и связать с существующим Source
_create_objitem_from_row(
record["row"], sat, closest_source, user_to_use, consts
)
added_to_existing_count += 1
# Пометить запись для удаления
records_to_remove.append(i)
# Удалить обработанные записи из списка (в обратном порядке, чтобы не сбить индексы)
for i in reversed(records_to_remove):
unprocessed_records.pop(i)
# Шаг 5: Цикл обработки оставшихся записей - создание новых Source
while unprocessed_records:
# Шаг 5a: Взять первую запись из списка
first_record = unprocessed_records.pop(0)
first_coord = first_record["coord"]
# Шаг 5b: Создать новый Source с coords_average = эта координата
source = Source.objects.create(
coords_average=Point(first_coord, srid=4326), created_by=user_to_use
)
source_count += 1
# Шаг 5c: Создать ObjItem для этой записи и связать с Source
_create_objitem_from_row(first_record["row"], sat, source, user_to_use, consts)
# Шаг 5e: Для каждой оставшейся записи в списке
records_to_remove = []
for i, record in enumerate(unprocessed_records):
current_coord = record["coord"]
# Вычислить расстояние от координаты до coords_average
current_avg = (source.coords_average.x, source.coords_average.y)
new_avg, distance = calculate_mean_coords(current_avg, current_coord)
if distance <= RANGE_DISTANCE:
# Обновить coords_average в Source
source.coords_average = Point(new_avg, srid=4326)
source.save()
# Создать ObjItem для этой записи и связать с Source
_create_objitem_from_row(
record["row"], sat, source, user_to_use, consts
)
# Пометить запись для удаления
records_to_remove.append(i)
# Удалить обработанные записи из списка (в обратном порядке, чтобы не сбить индексы)
for i in reversed(records_to_remove):
unprocessed_records.pop(i)
print(f"Импорт завершен: создано {source_count} новых источников, "
f"добавлено {added_to_existing_count} точек к существующим источникам")
return source_count
def _create_objitem_from_row(row, sat, source, user_to_use, consts):
"""
Вспомогательная функция для создания ObjItem из строки DataFrame.
Args:
row: строка DataFrame
sat: объект Satellite
source: объект Source для связи
user_to_use: пользователь для created_by
consts: константы из get_all_constants()
"""
# Извлекаем координату
geo_point = Point(coords_transform(row["Координаты"]), srid=4326)
# Обработка поляризации
try:
polarization_obj, _ = Polarization.objects.get_or_create(
name=row["Поляризация"].strip()
)
except KeyError:
polarization_obj, _ = Polarization.objects.get_or_create(name="-")
# Обработка ВЧ параметров
freq = remove_str(row["Частота, МГц"])
freq_line = remove_str(row["Полоса, МГц"])
v = remove_str(row["Символьная скорость, БОД"])
try:
mod_obj, _ = Modulation.objects.get_or_create(name=row["Модуляция"].strip())
except AttributeError:
mod_obj, _ = Modulation.objects.get_or_create(name="-")
snr = remove_str(row["ОСШ"])
# Обработка времени
date = row["Дата"].date()
time_ = row["Время"]
if isinstance(time_, str):
time_ = time_.strip()
time_ = time(0, 0, 0)
timestamp = datetime.combine(date, time_)
# Обработка зеркал - теперь это спутники
mirror_names = []
mirror_1 = row["Зеркало 1"].strip().split("\n")
mirror_2 = row["Зеркало 2"].strip().split("\n")
# Собираем все имена зеркал
for mir in mirror_1:
if mir.strip() and mir.strip() != "-":
mirror_names.append(mir.strip())
for mir in mirror_2:
if mir.strip() and mir.strip() != "-":
mirror_names.append(mir.strip())
# Находим спутники-зеркала
mirror_satellites = find_mirror_satellites(mirror_names)
location = row["Местоопределение"].strip()
comment = row["Комментарий"]
source_name = row["Объект наблюдения"]
geo, _ = Geo.objects.get_or_create(
timestamp=timestamp,
coords=geo_point,
defaults={
"location": location,
"comment": comment,
"is_average": (comment != -1.0),
},
)
geo.save()
# Устанавливаем связи с спутниками-зеркалами
if mirror_satellites:
geo.mirrors.set(mirror_satellites)
# Проверяем, существует ли уже ObjItem с таким же geo
existing_obj_item = ObjItem.objects.filter(geo_obj=geo).first()
if existing_obj_item:
# Проверяем, существует ли parameter с такими же значениями
if (
hasattr(existing_obj_item, "parameter_obj")
and existing_obj_item.parameter_obj
and existing_obj_item.parameter_obj.id_satellite == sat
and existing_obj_item.parameter_obj.polarization == polarization_obj
and existing_obj_item.parameter_obj.frequency == freq
and existing_obj_item.parameter_obj.freq_range == freq_line
and existing_obj_item.parameter_obj.bod_velocity == v
and existing_obj_item.parameter_obj.modulation == mod_obj
and existing_obj_item.parameter_obj.snr == snr
):
# Пропускаем создание дубликата
return
# Находим подходящий транспондер
transponder = find_matching_transponder(sat, freq, polarization_obj)
# Создаем новый ObjItem и связываем с Source и Transponder
obj_item = ObjItem.objects.create(
name=source_name,
source=source,
transponder=transponder,
created_by=user_to_use
)
# Создаем Parameter
Parameter.objects.create(
id_satellite=sat,
polarization=polarization_obj,
frequency=freq,
freq_range=freq_line,
bod_velocity=v,
modulation=mod_obj,
snr=snr,
objitem=obj_item,
)
# Связываем geo с objitem
geo.objitem = obj_item
geo.save()
def add_satellite_list():
sats = [
"AZERSPACE 2",
"Amos 4",
"Astra 4A",
"ComsatBW-1",
"Eutelsat 16A",
"Eutelsat 21B",
"Eutelsat 7B",
"ExpressAM6",
"Hellas Sat 3",
"Intelsat 39",
"Intelsat 17",
"NSS 12",
"Sicral 2",
"SkyNet 5B",
"SkyNet 5D",
"Syracuse 4A",
"Turksat 3A",
"Turksat 4A",
"WGS 10",
"Yamal 402",
]
for sat in sats:
sat_obj, _ = Satellite.objects.get_or_create(name=sat)
sat_obj.save()
def parse_string(s: str):
pattern = r"^(.+?) (-?\d+\,\d+) \[(-?\d+\,\d+)\] ([^\s]+) ([A-Za-z]) - (\d{1,2}\.\d{1,2}\.\d{1,4} \d{1,2}:\d{1,2}:\d{1,2})$"
match = re.match(pattern, s)
if match:
return list(match.groups())
else:
raise ValueError("Некорректный формат строки")
def get_point_from_json(filepath: str):
with open(filepath, encoding="utf-8-sig") as jf:
data = json.load(jf)
for obj in data:
if not obj.get("bearingBehavior", {}):
if obj["tacticObjectType"] == "source":
# if not obj['bearingBehavior']:
source_id = obj["id"]
name = obj["name"]
elements = parse_string(name)
sat_name = elements[0]
freq = elements[1]
freq_range = elements[2]
pol = elements[4]
timestamp = datetime.strptime(elements[-1], "%d.%m.%y %H:%M:%S")
lat = None
lon = None
for pos in data:
if pos["id"] == source_id and pos["tacticObjectType"] == "position":
lat = pos["latitude"]
lon = pos["longitude"]
break
print(
f"Name - {sat_name}, f - {freq}, f range - {freq_range}, pol - {pol} "
f"time - {timestamp}, pos - ({lat}, {lon})"
)
def get_points_from_csv(file_content, current_user=None):
"""
Импортирует данные из CSV с группировкой близких координат.
Улучшенный алгоритм с учетом существующих Source:
1. Извлечь все координаты и данные строк из DataFrame
2. Создать список необработанных записей (координата + данные строки)
3. Получить все существующие Source из БД
4. Для каждой записи:
a. Проверить, существует ли дубликат (координаты + частота)
b. Если дубликат найден, пропустить запись
c. Найти ближайший существующий Source (расстояние <= 56 км)
d. Если найден:
- Обновить coords_average этого Source (инкрементально)
- Создать ObjItem и связать с этим Source
e. Если не найден:
- Создать новый Source
- Создать ObjItem и связать с новым Source
- Добавить новый Source в список существующих
5. Сохранить все изменения в БД
Args:
file_content: содержимое CSV файла
current_user: текущий пользователь (optional)
Returns:
int: количество созданных Source
"""
df = pd.read_csv(
io.StringIO(file_content),
sep=";",
names=[
"id",
"obj",
"lat",
"lon",
"h",
"time",
"sat",
"norad_id",
"freq",
"f_range",
"et",
"qaul",
"mir_1",
"mir_2",
"mir_3",
],
)
df[["lat", "lon", "freq", "f_range"]] = (
df[["lat", "lon", "freq", "f_range"]]
.replace(",", ".", regex=True)
.astype(float)
)
df["time"] = pd.to_datetime(df["time"], format="%d.%m.%Y %H:%M:%S")
# Шаг 1: Извлечь все координаты и данные строк из DataFrame
records = []
for idx, row in df.iterrows():
try:
# Извлекаем координату из колонок lat и lon
coord_tuple = (row["lon"], row["lat"])
# Сохраняем запись с координатой и данными строки
records.append({"coord": coord_tuple, "row": row, "index": idx})
except Exception as e:
print(f"Ошибка при обработке строки {idx}: {e}")
continue
user_to_use = current_user if current_user else CustomUser.objects.get(id=1)
# Шаг 3: Получить все существующие Source из БД
existing_sources = list(Source.objects.filter(coords_average__isnull=False))
new_sources_count = 0
added_count = 0
skipped_count = 0
# Шаг 4: Обработка каждой записи
for record in records:
current_coord = record["coord"]
row = record["row"]
# Шаг 4a: Проверить, существует ли дубликат (координаты + частота)
if _is_duplicate_objitem(current_coord, row["freq"], row["f_range"]):
skipped_count += 1
continue
# Шаг 4c: Найти ближайший существующий Source
closest_source = None
min_distance = float('inf')
best_new_avg = None
for source in existing_sources:
source_coord = (source.coords_average.x, source.coords_average.y)
new_avg, distance = calculate_mean_coords(source_coord, current_coord)
if distance < min_distance:
min_distance = distance
closest_source = source
best_new_avg = new_avg
# Шаг 4d: Если найден близкий Source (расстояние <= 56 км)
if closest_source and min_distance <= RANGE_DISTANCE:
# Обновить coords_average инкрементально
closest_source.coords_average = Point(best_new_avg, srid=4326)
closest_source.save()
# Создать ObjItem и связать с существующим Source
_create_objitem_from_csv_row(row, closest_source, user_to_use)
added_count += 1
else:
# Шаг 4e: Создать новый Source
new_source = Source.objects.create(
coords_average=Point(current_coord, srid=4326),
created_by=user_to_use
)
new_sources_count += 1
# Создать ObjItem и связать с новым Source
_create_objitem_from_csv_row(row, new_source, user_to_use)
added_count += 1
# Добавить новый Source в список существующих
existing_sources.append(new_source)
print(f"Импорт завершен: создано {new_sources_count} новых источников, "
f"добавлено {added_count} точек, пропущено {skipped_count} дубликатов")
return new_sources_count
def _is_duplicate_objitem(coord_tuple, frequency, freq_range, tolerance=0.1):
"""
Проверяет, существует ли уже ObjItem с такими же координатами и частотой.
Args:
coord_tuple: кортеж (lon, lat) координат
frequency: частота в МГц
freq_range: полоса частот в МГц
tolerance: допуск для сравнения координат в километрах
Returns:
bool: True если дубликат найден, False иначе
"""
# Ищем ObjItems с близкими координатами через geo_obj
nearby_objitems = ObjItem.objects.filter(
geo_obj__coords__isnull=False
).select_related('parameter_obj', 'geo_obj')
for objitem in nearby_objitems:
if not objitem.geo_obj or not objitem.geo_obj.coords:
continue
# Проверяем расстояние между координатами
geo_coord = (objitem.geo_obj.coords.x, objitem.geo_obj.coords.y)
_, distance = calculate_mean_coords(coord_tuple, geo_coord)
if distance <= tolerance:
# Координаты совпадают, проверяем частоту
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
param = objitem.parameter_obj
# Проверяем совпадение частоты с небольшим допуском (0.1 МГц)
if (abs(param.frequency - frequency) < 0.1 and
abs(param.freq_range - freq_range) < 0.1):
return True
return False
def _create_objitem_from_csv_row(row, source, user_to_use):
"""
Вспомогательная функция для создания ObjItem из строки CSV DataFrame.
Args:
row: строка DataFrame
source: объект Source для связи
user_to_use: пользователь для created_by
"""
# Определяем поляризацию
match row["obj"].split(" ")[-1]:
case "V":
pol = "Вертикальная"
case "H":
pol = "Горизонтальная"
case "R":
pol = "Правая"
case "L":
pol = "Левая"
case _:
pol = "-"
pol_obj, _ = Polarization.objects.get_or_create(name=pol)
sat_obj, _ = Satellite.objects.get_or_create(
name=row["sat"], defaults={"norad": row["norad_id"]}
)
# Обработка зеркал - теперь это спутники
mirror_names = []
if not pd.isna(row["mir_1"]) and row["mir_1"].strip() != "-":
mirror_names.append(row["mir_1"])
if not pd.isna(row["mir_2"]) and row["mir_2"].strip() != "-":
mirror_names.append(row["mir_2"])
if not pd.isna(row["mir_3"]) and row["mir_3"].strip() != "-":
mirror_names.append(row["mir_3"])
# Находим спутники-зеркала
mirror_satellites = find_mirror_satellites(mirror_names)
# Создаем Geo объект
geo_obj, _ = Geo.objects.get_or_create(
timestamp=row["time"],
coords=Point(row["lon"], row["lat"], srid=4326),
defaults={
"is_average": False,
},
)
# Устанавливаем связи с спутниками-зеркалами
if mirror_satellites:
geo_obj.mirrors.set(mirror_satellites)
# Проверяем, существует ли уже ObjItem с таким же geo
existing_obj_item = ObjItem.objects.filter(geo_obj=geo_obj).first()
if existing_obj_item:
# Проверяем, существует ли parameter с такими же значениями
if (
hasattr(existing_obj_item, "parameter_obj")
and existing_obj_item.parameter_obj
and existing_obj_item.parameter_obj.id_satellite == sat_obj
and existing_obj_item.parameter_obj.polarization == pol_obj
and existing_obj_item.parameter_obj.frequency == row["freq"]
and existing_obj_item.parameter_obj.freq_range == row["f_range"]
):
# Пропускаем создание дубликата
return
# Находим подходящий транспондер
transponder = find_matching_transponder(sat_obj, row["freq"], pol_obj)
# Создаем новый ObjItem и связываем с Source и Transponder
obj_item = ObjItem.objects.create(
name=row["obj"],
source=source,
transponder=transponder,
created_by=user_to_use
)
# Создаем Parameter
Parameter.objects.create(
id_satellite=sat_obj,
polarization=pol_obj,
frequency=row["freq"],
freq_range=row["f_range"],
objitem=obj_item,
)
# Связываем geo с objitem
geo_obj.objitem = obj_item
geo_obj.save()
def get_vch_load_from_html(file, sat: Satellite) -> None:
filename = file.name.split("_")
transfer = filename[3]
match filename[2]:
case "H":
pol = "Горизонтальная"
case "V":
pol = "Вертикальная"
case "R":
pol = "Правая"
case "L":
pol = "Левая"
case _:
pol = "-"
tables = pd.read_html(file, encoding="windows-1251")
df = tables[0]
df = df.drop(0).reset_index(drop=True)
df.columns = df.iloc[0]
df = df.drop(0).reset_index(drop=True)
df.replace("Неизвестно", "-", inplace=True)
df[["Частота, МГц", "Полоса, МГц", "Мощность, дБм"]] = df[
["Частота, МГц", "Полоса, МГц", "Мощность, дБм"]
].apply(pd.to_numeric)
df["Время начала измерения"] = df["Время начала измерения"].apply(
lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S")
)
df["Время окончания измерения"] = df["Время окончания измерения"].apply(
lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S")
)
for stroka in df.iterrows():
value = stroka[1]
if value["Полоса, МГц"] < 0.08:
continue
if "-" in value["Символьная скорость"]:
bod_velocity = -1.0
else:
bod_velocity = value["Символьная скорость"]
if "-" in value["Сигнал/шум, дБ"]:
snr = -1.0
else:
snr = value["Сигнал/шум, дБ"]
if value["Пакетность"] == "да":
pack = True
elif value["Пакетность"] == "нет":
pack = False
else:
pack = None
polarization, _ = Polarization.objects.get_or_create(name=pol)
mod, _ = Modulation.objects.get_or_create(name=value["Модуляция"])
standard, _ = Standard.objects.get_or_create(name=value["Стандарт"])
sigma_load, _ = SigmaParameter.objects.get_or_create(
id_satellite=sat,
frequency=value["Частота, МГц"],
freq_range=value["Полоса, МГц"],
polarization=polarization,
defaults={
"transfer": float(transfer),
# "polarization": polarization,
"status": value["Статус"],
"power": value["Мощность, дБм"],
"bod_velocity": bod_velocity,
"modulation": mod,
"snr": snr,
"packets": pack,
"datetime_begin": value["Время начала измерения"],
"datetime_end": value["Время окончания измерения"],
},
)
sigma_load.save()
def get_frequency_tolerance_percent(freq_range_mhz: float) -> float:
"""
Определяет процент погрешности центральной частоты в зависимости от полосы частот.
Args:
freq_range_mhz (float): Полоса частот в МГц
Returns:
float: Процент погрешности для центральной частоты
Диапазоны:
- 0 - 0.5 МГц (0 - 500 кГц): 0.1%
- 0.5 - 1.5 МГц (500 кГц - 1.5 МГц): 0.5%
- 1.5 - 5 МГц: 1%
- 5 - 10 МГц: 2%
- > 10 МГц: 5%
"""
if freq_range_mhz < 0.5:
return 0.005
elif freq_range_mhz < 1.5:
return 0.01
elif freq_range_mhz < 5.0:
return 0.02
elif freq_range_mhz < 10.0:
return 0.05
else:
return 0.1
def compare_and_link_vch_load(
sat_id: Satellite, eps_freq: float, eps_frange: float, ku_range: float
):
"""
Привязывает SigmaParameter к Parameter на основе совпадения параметров.
Погрешность центральной частоты определяется автоматически в зависимости от полосы частот:
- 0-500 кГц: 0.1%
- 500 кГц-1.5 МГц: 0.5%
- 1.5-5 МГц: 1%
- 5-10 МГц: 2%
- >10 МГц: 5%
Args:
sat_id (Satellite): Спутник для фильтрации
eps_freq (float): Не используется (оставлен для обратной совместимости)
eps_frange (float): Погрешность полосы частот в процентах
ku_range (float): Не используется (оставлен для обратной совместимости)
Returns:
tuple: (количество объектов, количество привязок)
"""
# Получаем все ObjItem с Parameter для данного спутника
item_obj = ObjItem.objects.filter(
parameter_obj__id_satellite=sat_id
).select_related("parameter_obj", "parameter_obj__polarization")
vch_sigma = SigmaParameter.objects.filter(id_satellite=sat_id).select_related(
"polarization"
)
link_count = 0
obj_count = item_obj.count()
for obj in item_obj:
vch_load = obj.parameter_obj
# Пропускаем объекты с некорректной частотой
if not vch_load or vch_load.frequency == -1.0:
continue
# Определяем погрешность частоты на основе полосы
freq_tolerance_percent = get_frequency_tolerance_percent(vch_load.freq_range)
# Вычисляем допустимое отклонение частоты в МГц
freq_tolerance_mhz = vch_load.frequency * freq_tolerance_percent / 100
# Вычисляем допустимое отклонение полосы в МГц
frange_tolerance_mhz = vch_load.freq_range * eps_frange / 100
for sigma in vch_sigma:
# Проверяем совпадение по всем параметрам
freq_match = (
abs(sigma.transfer_frequency - vch_load.frequency) <= freq_tolerance_mhz
)
frange_match = (
abs(sigma.freq_range - vch_load.freq_range) <= frange_tolerance_mhz
)
pol_match = sigma.polarization == vch_load.polarization
if freq_match and frange_match and pol_match:
sigma.parameter = vch_load
sigma.save()
link_count += 1
return obj_count, link_count
def kub_report(data_in: io.StringIO) -> pd.DataFrame:
df_in = pd.read_excel(data_in)
df = pd.DataFrame(
columns=[
"Дата",
"Широта",
"Долгота",
"Высота",
"Населённый пункт",
"ИСЗ",
"Прямой канал, МГц",
"Обратный канал, МГц",
"Перенос, МГц",
"Полоса, МГц",
"Зеркала",
]
)
for row in df_in.iterrows():
value = row[1]
date = datetime.date(datetime.now())
isz = value["ИСЗ"]
try:
lat = float(value["Широта, град"].strip().replace(",", "."))
lon = float(value["Долгота, град"].strip().replace(",", "."))
downlink = float(value["Обратный канал, МГц"].strip().replace(",", "."))
freq_range = float(value["Полоса, МГц"].strip().replace(",", "."))
except Exception as e:
lat = value["Широта, град"]
lon = value["Долгота, град"]
downlink = value["Обратный канал, МГц"]
freq_range = value["Полоса, МГц"]
print(e)
norad = int(re.findall(r"\((\d+)\)", isz)[0])
sat_obj = Satellite.objects.get(norad=norad)
pol_obj = Polarization.objects.get(name=value["Поляризация"].strip())
transponder = Transponders.objects.filter(
sat_id=sat_obj,
polarization=pol_obj,
downlink__gte=downlink - F("frequency_range") / 2,
downlink__lte=downlink + F("frequency_range") / 2,
).first()
# try:
# location = geolocator.reverse(f"{lat}, {lon}", language="ru").raw['address']
# loc_name = location.get('city', '') or location.get('town', '') or location.get('province', '') or location.get('country', '')
# except AttributeError:
# loc_name = ''
# sleep(1)
loc_name = ""
if transponder: # and not (len(transponder) > 1):
transfer = transponder.transfer
uplink = transfer + downlink
new_row = pd.DataFrame(
[
{
"Дата": date,
"Широта": lat,
"Долгота": lon,
"Высота": 0.0,
"Населённый пункт": loc_name,
"ИСЗ": isz,
"Прямой канал, МГц": uplink,
"Обратный канал, МГц": downlink,
"Перенос, МГц": transfer,
"Полоса, МГц": freq_range,
"Зеркала": "",
}
]
)
df = pd.concat([df, new_row], ignore_index=True)
else:
print("Ничего не найдено в транспондерах")
return df
# ============================================================================
# Утилиты для работы с координатами
# ============================================================================
def calculate_mean_coords(coord1: tuple, coord2: tuple) -> tuple[tuple, float]:
"""
Вычисляет среднюю точку между двумя координатами с использованием геодезических вычислений (с учётом эллипсоида).
:param lat1: Широта первой точки в градусах.
:param lon1: Долгота первой точки в градусах.
:param lat2: Широта второй точки в градусах.
:param lon2: Долгота второй точки в градусах.
:return: Словарь с ключами 'lat' и 'lon' для средней точки, и расстояние(dist) в КМ.
"""
lon1, lat1 = coord1
lon2, lat2 = coord2
geod_inv = Geodesic.WGS84.Inverse(lat1, lon1, lat2, lon2)
azimuth1 = geod_inv['azi1']
distance = geod_inv['s12']
geod_direct = Geodesic.WGS84.Direct(lat1, lon1, azimuth1, distance / 2)
return (geod_direct['lon2'], geod_direct['lat2']), distance/1000
def calculate_average_coords_incremental(
current_average: tuple, new_coord: tuple
) -> tuple:
"""
Вычисляет новое среднее между текущим средним и новой координатой.
Использует инкрементальное усреднение: каждая новая точка усредняется
с текущим средним, а не со всеми точками кластера. Это упрощенный подход,
где новое среднее = (текущее_среднее + новая_координата) / 2.
Важно: Это НЕ среднее арифметическое всех точек кластера, а инкрементальное
усреднение между двумя точками (текущим средним и новой точкой).
Args:
current_average (tuple): Текущее среднее в формате (longitude, latitude)
new_coord (tuple): Новая координата в формате (longitude, latitude)
Returns:
tuple: Новое среднее в формате (longitude, latitude)
Example:
>>> avg1 = (37.62, 55.75) # Первая точка
>>> avg2 = calculate_average_coords_incremental(avg1, (37.63, 55.76))
>>> print(avg2)
(37.625, 55.755)
>>> avg3 = calculate_average_coords_incremental(avg2, (37.64, 55.77))
>>> print(avg3)
(37.6325, 55.7625)
>>> # Проверка: среднее между одинаковыми точками
>>> avg = calculate_average_coords_incremental((37.62, 55.75), (37.62, 55.75))
>>> print(avg)
(37.62, 55.75)
"""
current_lon, current_lat = current_average
new_lon, new_lat = new_coord
# Инкрементальное усреднение: (current + new) / 2
avg_lon = (current_lon + new_lon) / 2
avg_lat = (current_lat + new_lat) / 2
return (avg_lon, avg_lat)
# ============================================================================
# Утилиты для форматирования
# ============================================================================
def format_coordinates(longitude: float, latitude: float) -> str:
"""
Форматирует координаты в читаемый вид.
Преобразует числовые координаты в формат с указанием направления
(N/S для широты, E/W для долготы).
Args:
longitude (float): Долгота в десятичных градусах.
latitude (float): Широта в десятичных градусах.
Returns:
str: Отформатированная строка координат в формате "XXN/S YYE/W".
Example:
>>> format_coordinates(37.62, 55.75)
'55.75N 37.62E'
>>> format_coordinates(-122.42, 37.77)
'37.77N 122.42W'
"""
lon_direction = "E" if longitude > 0 else "W"
lat_direction = "N" if latitude > 0 else "S"
lon_value = abs(longitude)
lat_value = abs(latitude)
return f"{lat_value}{lat_direction} {lon_value}{lon_direction}"
def parse_pagination_params(
request, default_per_page: int = DEFAULT_ITEMS_PER_PAGE
) -> tuple:
"""
Извлекает и валидирует параметры пагинации из запроса.
Args:
request: HTTP запрос Django.
default_per_page (int): Количество элементов на странице по умолчанию.
Returns:
tuple: Кортеж (page_number, items_per_page), где:
- page_number (int): Номер текущей страницы (по умолчанию 1).
- items_per_page (int): Количество элементов на странице.
Example:
>>> page, per_page = parse_pagination_params(request, default_per_page=100)
>>> paginator = Paginator(objects, per_page)
>>> page_obj = paginator.get_page(page)
"""
page_number = request.GET.get("page", 1)
items_per_page = request.GET.get("items_per_page", str(default_per_page))
# Валидация page_number
try:
page_number = int(page_number)
if page_number < 1:
page_number = 1
except (ValueError, TypeError):
page_number = 1
# Валидация items_per_page
try:
items_per_page = int(items_per_page)
if items_per_page < 1:
items_per_page = default_per_page
# Ограничиваем максимальное значение для предотвращения перегрузки
if items_per_page > MAX_ITEMS_PER_PAGE:
items_per_page = MAX_ITEMS_PER_PAGE
except (ValueError, TypeError):
items_per_page = default_per_page
return page_number, items_per_page
def get_first_param_subquery(field_name: str):
"""
Возвращает F() выражение для доступа к полю параметра через OneToOne связь.
После рефакторинга связи Parameter-ObjItem с ManyToMany на OneToOne,
эта функция упрощена для возврата прямого F() выражения вместо подзапроса.
Args:
field_name (str): Имя поля модели Parameter для извлечения.
Может включать связанные поля через __ (например, 'id_satellite__name').
Returns:
F: Django F() объект для использования в annotate().
Example:
>>> freq_expr = get_first_param_subquery('frequency')
>>> objects = ObjItem.objects.annotate(first_freq=freq_expr)
>>> for obj in objects:
... print(obj.first_freq)
"""
return F(f"parameter_obj__{field_name}")