Files
dbstorage/dbapp/mainapp/utils.py

717 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Standard library imports
import io
import json
import re
from datetime import datetime, time
# Django imports
from django.contrib.gis.geos import Point
from django.db.models import F
# Third-party imports
import pandas as pd
# Local imports
from mapsapp.models import Transponders
from .models import (
CustomUser,
Geo,
Mirror,
Modulation,
ObjItem,
Parameter,
Polarization,
Satellite,
SigmaParameter,
Standard,
)
# ============================================================================
# Константы
# ============================================================================
# Значения по умолчанию для пагинации
DEFAULT_ITEMS_PER_PAGE = 50
MAX_ITEMS_PER_PAGE = 10000
# Значения по умолчанию для данных
DEFAULT_NUMERIC_VALUE = -1.0
MINIMUM_BANDWIDTH_MHZ = 0.08
def get_all_constants():
sats = [sat.name for sat in Satellite.objects.all()]
standards = [sat.name for sat in Standard.objects.all()]
pols = [sat.name for sat in Polarization.objects.all()]
mirrors = [sat.name for sat in Mirror.objects.all()]
modulations = [sat.name for sat in Modulation.objects.all()]
return sats, standards, pols, mirrors, modulations
def coords_transform(coords: str):
lat_part, lon_part = coords.strip().split()
sign_map = {"N": 1, "E": 1, "S": -1, "W": -1}
lat_sign_char = lat_part[-1]
lat_value = float(lat_part[:-1].replace(",", "."))
latitude = lat_value * sign_map.get(lat_sign_char, 1)
lon_sign_char = lon_part[-1]
lon_value = float(lon_part[:-1].replace(",", "."))
longitude = lon_value * sign_map.get(lon_sign_char, 1)
return (longitude, latitude)
def remove_str(s: str):
if isinstance(s, str):
if (
s.strip() == "-"
or s.strip() == ""
or s.strip() == " "
or "неизв" in s.strip()
):
return -1
return float(s.strip().replace(",", "."))
return s
def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None):
try:
df.rename(columns={"Модуляция ": "Модуляция"}, inplace=True)
except Exception as e:
print(e)
consts = get_all_constants()
df.fillna(-1, inplace=True)
for stroka in df.iterrows():
geo_point = Point(coords_transform(stroka[1]["Координаты"]), srid=4326)
valid_point = None
kupsat_point = None
try:
if (
stroka[1]["Координаты объекта"] != -1
and stroka[1]["Координаты Кубсата"] != "+"
):
if (
"ИРИ" not in stroka[1]["Координаты объекта"]
and "БЛА" not in stroka[1]["Координаты объекта"]
):
valid_point = list(
map(
float,
stroka[1]["Координаты объекта"]
.replace(",", ".")
.split(". "),
)
)
valid_point = Point(valid_point[1], valid_point[0], srid=4326)
if (
stroka[1]["Координаты Кубсата"] != -1
and stroka[1]["Координаты Кубсата"] != "+"
):
kupsat_point = list(
map(
float,
stroka[1]["Координаты Кубсата"].replace(",", ".").split(". "),
)
)
kupsat_point = Point(kupsat_point[1], kupsat_point[0], srid=4326)
except KeyError:
print("В таблице нет столбцов с координатами кубсата")
try:
polarization_obj, _ = Polarization.objects.get_or_create(
name=stroka[1]["Поляризация"].strip()
)
except KeyError:
polarization_obj, _ = Polarization.objects.get_or_create(name="-")
freq = remove_str(stroka[1]["Частота, МГц"])
freq_line = remove_str(stroka[1]["Полоса, МГц"])
v = remove_str(stroka[1]["Символьная скорость, БОД"])
try:
mod_obj, _ = Modulation.objects.get_or_create(
name=stroka[1]["Модуляция"].strip()
)
except AttributeError:
mod_obj, _ = Modulation.objects.get_or_create(name="-")
snr = remove_str(stroka[1]["ОСШ"])
date = stroka[1]["Дата"].date()
time_ = stroka[1]["Время"]
if isinstance(time_, str):
time_ = time_.strip()
time_ = time(0, 0, 0)
timestamp = datetime.combine(date, time_)
current_mirrors = []
mirror_1 = stroka[1]["Зеркало 1"].strip().split("\n")
mirror_2 = stroka[1]["Зеркало 2"].strip().split("\n")
if len(mirror_1) > 1:
for mir in mirror_1:
mir_obj, _ = Mirror.objects.get_or_create(name=mir.strip())
current_mirrors.append(mir.strip())
elif mirror_1[0] not in consts[3]:
mir_obj, _ = Mirror.objects.get_or_create(name=mirror_1[0].strip())
current_mirrors.append(mirror_1[0].strip())
if len(mirror_2) > 1:
for mir in mirror_2:
mir_obj, _ = Mirror.objects.get_or_create(name=mir.strip())
current_mirrors.append(mir.strip())
elif mirror_2[0] not in consts[3]:
mir_obj, _ = Mirror.objects.get_or_create(name=mirror_2[0].strip())
current_mirrors.append(mirror_2[0].strip())
location = stroka[1]["Местоопределение"].strip()
comment = stroka[1]["Комментарий"]
source = stroka[1]["Объект наблюдения"]
user_to_use = current_user if current_user else CustomUser.objects.get(id=1)
geo, _ = Geo.objects.get_or_create(
timestamp=timestamp,
coords=geo_point,
defaults={
"coords_kupsat": kupsat_point,
"coords_valid": valid_point,
"location": location,
"comment": comment,
"is_average": (comment != -1.0),
},
)
geo.save()
geo.mirrors.set(Mirror.objects.filter(name__in=current_mirrors))
# Check if ObjItem with same geo already exists
existing_obj_item = ObjItem.objects.filter(geo_obj=geo).first()
if existing_obj_item:
# Check if parameter with same values exists for this object
if (
hasattr(existing_obj_item, 'parameter_obj') and
existing_obj_item.parameter_obj and
existing_obj_item.parameter_obj.id_satellite == sat and
existing_obj_item.parameter_obj.polarization == polarization_obj and
existing_obj_item.parameter_obj.frequency == freq and
existing_obj_item.parameter_obj.freq_range == freq_line and
existing_obj_item.parameter_obj.bod_velocity == v and
existing_obj_item.parameter_obj.modulation == mod_obj and
existing_obj_item.parameter_obj.snr == snr
):
# Skip creating duplicate
continue
# Create new ObjItem and Parameter
obj_item = ObjItem.objects.create(name=source, created_by=user_to_use)
vch_load_obj = Parameter.objects.create(
id_satellite=sat,
polarization=polarization_obj,
frequency=freq,
freq_range=freq_line,
bod_velocity=v,
modulation=mod_obj,
snr=snr,
objitem=obj_item
)
geo.objitem = obj_item
geo.save()
def add_satellite_list():
sats = [
"AZERSPACE 2",
"Amos 4",
"Astra 4A",
"ComsatBW-1",
"Eutelsat 16A",
"Eutelsat 21B",
"Eutelsat 7B",
"ExpressAM6",
"Hellas Sat 3",
"Intelsat 39",
"Intelsat 17",
"NSS 12",
"Sicral 2",
"SkyNet 5B",
"SkyNet 5D",
"Syracuse 4A",
"Turksat 3A",
"Turksat 4A",
"WGS 10",
"Yamal 402",
]
for sat in sats:
sat_obj, _ = Satellite.objects.get_or_create(name=sat)
sat_obj.save()
def parse_string(s: str):
pattern = r"^(.+?) (-?\d+\,\d+) \[(-?\d+\,\d+)\] ([^\s]+) ([A-Za-z]) - (\d{1,2}\.\d{1,2}\.\d{1,4} \d{1,2}:\d{1,2}:\d{1,2})$"
match = re.match(pattern, s)
if match:
return list(match.groups())
else:
raise ValueError("Некорректный формат строки")
def get_point_from_json(filepath: str):
with open(filepath, encoding="utf-8-sig") as jf:
data = json.load(jf)
for obj in data:
if not obj.get("bearingBehavior", {}):
if obj["tacticObjectType"] == "source":
# if not obj['bearingBehavior']:
source_id = obj["id"]
name = obj["name"]
elements = parse_string(name)
sat_name = elements[0]
freq = elements[1]
freq_range = elements[2]
pol = elements[4]
timestamp = datetime.strptime(elements[-1], "%d.%m.%y %H:%M:%S")
lat = None
lon = None
for pos in data:
if pos["id"] == source_id and pos["tacticObjectType"] == "position":
lat = pos["latitude"]
lon = pos["longitude"]
break
print(
f"Name - {sat_name}, f - {freq}, f range - {freq_range}, pol - {pol} "
f"time - {timestamp}, pos - ({lat}, {lon})"
)
def get_points_from_csv(file_content, current_user=None):
df = pd.read_csv(
io.StringIO(file_content),
sep=";",
names=[
"id",
"obj",
"lat",
"lon",
"h",
"time",
"sat",
"norad_id",
"freq",
"f_range",
"et",
"qaul",
"mir_1",
"mir_2",
"mir_3",
],
)
df[["lat", "lon", "freq", "f_range"]] = (
df[["lat", "lon", "freq", "f_range"]]
.replace(",", ".", regex=True)
.astype(float)
)
df["time"] = pd.to_datetime(df["time"], format="%d.%m.%Y %H:%M:%S")
for row in df.iterrows():
row = row[1]
match row["obj"].split(" ")[-1]:
case "V":
pol = "Вертикальная"
case "H":
pol = "Горизонтальная"
case "R":
pol = "Правая"
case "L":
pol = "Левая"
case _:
pol = "-"
pol_obj, _ = Polarization.objects.get_or_create(name=pol)
sat_obj, _ = Satellite.objects.get_or_create(
name=row["sat"], defaults={"norad": row["norad_id"]}
)
mir_1_obj, _ = Mirror.objects.get_or_create(name=row["mir_1"])
mir_2_obj, _ = Mirror.objects.get_or_create(name=row["mir_2"])
mir_lst = [row["mir_1"], row["mir_2"]]
if not pd.isna(row["mir_3"]):
mir_3_obj, _ = Mirror.objects.get_or_create(name=row["mir_3"])
user_to_use = current_user if current_user else CustomUser.objects.get(id=1)
geo_obj, _ = Geo.objects.get_or_create(
timestamp=row["time"],
coords=Point(row["lon"], row["lat"], srid=4326),
defaults={
"is_average": False,
# 'id_user_add': user_to_use,
},
)
geo_obj.mirrors.set(Mirror.objects.filter(name__in=mir_lst))
# Check if ObjItem with same geo already exists
existing_obj_item = ObjItem.objects.filter(geo_obj=geo_obj).first()
if existing_obj_item:
# Check if parameter with same values exists for this object
if (
hasattr(existing_obj_item, 'parameter_obj') and
existing_obj_item.parameter_obj and
existing_obj_item.parameter_obj.id_satellite == sat_obj and
existing_obj_item.parameter_obj.polarization == pol_obj and
existing_obj_item.parameter_obj.frequency == row["freq"] and
existing_obj_item.parameter_obj.freq_range == row["f_range"]
):
# Skip creating duplicate
continue
# Create new ObjItem and Parameter
obj_item = ObjItem.objects.create(name=row["obj"], created_by=user_to_use)
vch_load_obj = Parameter.objects.create(
id_satellite=sat_obj,
polarization=pol_obj,
frequency=row["freq"],
freq_range=row["f_range"],
objitem=obj_item
)
geo_obj.objitem = obj_item
geo_obj.save()
def get_vch_load_from_html(file, sat: Satellite) -> None:
filename = file.name.split("_")
transfer = filename[3]
match filename[2]:
case "H":
pol = "Горизонтальная"
case "V":
pol = "Вертикальная"
case "R":
pol = "Правая"
case "L":
pol = "Левая"
case _:
pol = "-"
tables = pd.read_html(file, encoding="windows-1251")
df = tables[0]
df = df.drop(0).reset_index(drop=True)
df.columns = df.iloc[0]
df = df.drop(0).reset_index(drop=True)
df.replace("Неизвестно", "-", inplace=True)
df[["Частота, МГц", "Полоса, МГц", "Мощность, дБм"]] = df[
["Частота, МГц", "Полоса, МГц", "Мощность, дБм"]
].apply(pd.to_numeric)
df["Время начала измерения"] = df["Время начала измерения"].apply(
lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S")
)
df["Время окончания измерения"] = df["Время окончания измерения"].apply(
lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S")
)
for stroka in df.iterrows():
value = stroka[1]
if value["Полоса, МГц"] < 0.08:
continue
if "-" in value["Символьная скорость"]:
bod_velocity = -1.0
else:
bod_velocity = value["Символьная скорость"]
if "-" in value["Сигнал/шум, дБ"]:
snr = -1.0
else:
snr = value["Сигнал/шум, дБ"]
if value["Пакетность"] == "да":
pack = True
elif value["Пакетность"] == "нет":
pack = False
else:
pack = None
polarization, _ = Polarization.objects.get_or_create(name=pol)
mod, _ = Modulation.objects.get_or_create(name=value["Модуляция"])
standard, _ = Standard.objects.get_or_create(name=value["Стандарт"])
sigma_load, _ = SigmaParameter.objects.get_or_create(
id_satellite=sat,
frequency=value["Частота, МГц"],
freq_range=value["Полоса, МГц"],
polarization=polarization,
defaults={
"transfer": float(transfer),
# "polarization": polarization,
"status": value["Статус"],
"power": value["Мощность, дБм"],
"bod_velocity": bod_velocity,
"modulation": mod,
"snr": snr,
"packets": pack,
"datetime_begin": value["Время начала измерения"],
"datetime_end": value["Время окончания измерения"],
},
)
sigma_load.save()
def get_frequency_tolerance_percent(freq_range_mhz: float) -> float:
"""
Определяет процент погрешности центральной частоты в зависимости от полосы частот.
Args:
freq_range_mhz (float): Полоса частот в МГц
Returns:
float: Процент погрешности для центральной частоты
Диапазоны:
- 0 - 0.5 МГц (0 - 500 кГц): 0.1%
- 0.5 - 1.5 МГц (500 кГц - 1.5 МГц): 0.5%
- 1.5 - 5 МГц: 1%
- 5 - 10 МГц: 2%
- > 10 МГц: 5%
"""
if freq_range_mhz < 0.5:
return 0.005
elif freq_range_mhz < 1.5:
return 0.01
elif freq_range_mhz < 5.0:
return 0.02
elif freq_range_mhz < 10.0:
return 0.05
else:
return 0.1
def compare_and_link_vch_load(
sat_id: Satellite, eps_freq: float, eps_frange: float, ku_range: float
):
"""
Привязывает SigmaParameter к Parameter на основе совпадения параметров.
Погрешность центральной частоты определяется автоматически в зависимости от полосы частот:
- 0-500 кГц: 0.1%
- 500 кГц-1.5 МГц: 0.5%
- 1.5-5 МГц: 1%
- 5-10 МГц: 2%
- >10 МГц: 5%
Args:
sat_id (Satellite): Спутник для фильтрации
eps_freq (float): Не используется (оставлен для обратной совместимости)
eps_frange (float): Погрешность полосы частот в процентах
ku_range (float): Не используется (оставлен для обратной совместимости)
Returns:
tuple: (количество объектов, количество привязок)
"""
# Получаем все ObjItem с Parameter для данного спутника
item_obj = ObjItem.objects.filter(
parameter_obj__id_satellite=sat_id
).select_related('parameter_obj', 'parameter_obj__polarization')
vch_sigma = SigmaParameter.objects.filter(
id_satellite=sat_id
).select_related('polarization')
link_count = 0
obj_count = item_obj.count()
for obj in item_obj:
vch_load = obj.parameter_obj
# Пропускаем объекты с некорректной частотой
if not vch_load or vch_load.frequency == -1.0:
continue
# Определяем погрешность частоты на основе полосы
freq_tolerance_percent = get_frequency_tolerance_percent(vch_load.freq_range)
# Вычисляем допустимое отклонение частоты в МГц
freq_tolerance_mhz = vch_load.frequency * freq_tolerance_percent / 100
# Вычисляем допустимое отклонение полосы в МГц
frange_tolerance_mhz = vch_load.freq_range * eps_frange / 100
for sigma in vch_sigma:
# Проверяем совпадение по всем параметрам
freq_match = abs(sigma.transfer_frequency - vch_load.frequency) <= freq_tolerance_mhz
frange_match = abs(sigma.freq_range - vch_load.freq_range) <= frange_tolerance_mhz
pol_match = sigma.polarization == vch_load.polarization
if freq_match and frange_match and pol_match:
sigma.parameter = vch_load
sigma.save()
link_count += 1
return obj_count, link_count
def kub_report(data_in: io.StringIO) -> pd.DataFrame:
df_in = pd.read_excel(data_in)
df = pd.DataFrame(
columns=[
"Дата",
"Широта",
"Долгота",
"Высота",
"Населённый пункт",
"ИСЗ",
"Прямой канал, МГц",
"Обратный канал, МГц",
"Перенос, МГц",
"Полоса, МГц",
"Зеркала",
]
)
for row in df_in.iterrows():
value = row[1]
date = datetime.date(datetime.now())
isz = value["ИСЗ"]
try:
lat = float(value["Широта, град"].strip().replace(",", "."))
lon = float(value["Долгота, град"].strip().replace(",", "."))
downlink = float(value["Обратный канал, МГц"].strip().replace(",", "."))
freq_range = float(value["Полоса, МГц"].strip().replace(",", "."))
except Exception as e:
lat = value["Широта, град"]
lon = value["Долгота, град"]
downlink = value["Обратный канал, МГц"]
freq_range = value["Полоса, МГц"]
print(e)
norad = int(re.findall(r"\((\d+)\)", isz)[0])
sat_obj = Satellite.objects.get(norad=norad)
pol_obj = Polarization.objects.get(name=value["Поляризация"].strip())
transponder = Transponders.objects.filter(
sat_id=sat_obj,
polarization=pol_obj,
downlink__gte=downlink - F("frequency_range") / 2,
downlink__lte=downlink + F("frequency_range") / 2,
).first()
# try:
# location = geolocator.reverse(f"{lat}, {lon}", language="ru").raw['address']
# loc_name = location.get('city', '') or location.get('town', '') or location.get('province', '') or location.get('country', '')
# except AttributeError:
# loc_name = ''
# sleep(1)
loc_name = ""
if transponder: # and not (len(transponder) > 1):
transfer = transponder.transfer
uplink = transfer + downlink
new_row = pd.DataFrame(
[
{
"Дата": date,
"Широта": lat,
"Долгота": lon,
"Высота": 0.0,
"Населённый пункт": loc_name,
"ИСЗ": isz,
"Прямой канал, МГц": uplink,
"Обратный канал, МГц": downlink,
"Перенос, МГц": transfer,
"Полоса, МГц": freq_range,
"Зеркала": "",
}
]
)
df = pd.concat([df, new_row], ignore_index=True)
else:
print("Ничего не найдено в транспондерах")
return df
# ============================================================================
# Утилиты для форматирования
# ============================================================================
def format_coordinates(longitude: float, latitude: float) -> str:
"""
Форматирует координаты в читаемый вид.
Преобразует числовые координаты в формат с указанием направления
(N/S для широты, E/W для долготы).
Args:
longitude (float): Долгота в десятичных градусах.
latitude (float): Широта в десятичных градусах.
Returns:
str: Отформатированная строка координат в формате "XXN/S YYE/W".
Example:
>>> format_coordinates(37.62, 55.75)
'55.75N 37.62E'
>>> format_coordinates(-122.42, 37.77)
'37.77N 122.42W'
"""
lon_direction = "E" if longitude > 0 else "W"
lat_direction = "N" if latitude > 0 else "S"
lon_value = abs(longitude)
lat_value = abs(latitude)
return f"{lat_value}{lat_direction} {lon_value}{lon_direction}"
def parse_pagination_params(
request, default_per_page: int = DEFAULT_ITEMS_PER_PAGE
) -> tuple:
"""
Извлекает и валидирует параметры пагинации из запроса.
Args:
request: HTTP запрос Django.
default_per_page (int): Количество элементов на странице по умолчанию.
Returns:
tuple: Кортеж (page_number, items_per_page), где:
- page_number (int): Номер текущей страницы (по умолчанию 1).
- items_per_page (int): Количество элементов на странице.
Example:
>>> page, per_page = parse_pagination_params(request, default_per_page=100)
>>> paginator = Paginator(objects, per_page)
>>> page_obj = paginator.get_page(page)
"""
page_number = request.GET.get("page", 1)
items_per_page = request.GET.get("items_per_page", str(default_per_page))
# Валидация page_number
try:
page_number = int(page_number)
if page_number < 1:
page_number = 1
except (ValueError, TypeError):
page_number = 1
# Валидация items_per_page
try:
items_per_page = int(items_per_page)
if items_per_page < 1:
items_per_page = default_per_page
# Ограничиваем максимальное значение для предотвращения перегрузки
if items_per_page > MAX_ITEMS_PER_PAGE:
items_per_page = MAX_ITEMS_PER_PAGE
except (ValueError, TypeError):
items_per_page = default_per_page
return page_number, items_per_page
def get_first_param_subquery(field_name: str):
"""
Возвращает F() выражение для доступа к полю параметра через OneToOne связь.
После рефакторинга связи Parameter-ObjItem с ManyToMany на OneToOne,
эта функция упрощена для возврата прямого F() выражения вместо подзапроса.
Args:
field_name (str): Имя поля модели Parameter для извлечения.
Может включать связанные поля через __ (например, 'id_satellite__name').
Returns:
F: Django F() объект для использования в annotate().
Example:
>>> freq_expr = get_first_param_subquery('frequency')
>>> objects = ObjItem.objects.annotate(first_freq=freq_expr)
>>> for obj in objects:
... print(obj.first_freq)
"""
return F(f"parameter_obj__{field_name}")