# Standard library imports import io import json import re from datetime import datetime, time # Django imports from django.contrib.gis.geos import Point from django.db.models import F # Third-party imports import pandas as pd # Local imports from mapsapp.models import Transponders from .models import ( CustomUser, Geo, Mirror, Modulation, ObjItem, Parameter, Polarization, Satellite, SigmaParameter, Standard, ) # ============================================================================ # Константы # ============================================================================ # Значения по умолчанию для пагинации DEFAULT_ITEMS_PER_PAGE = 50 MAX_ITEMS_PER_PAGE = 10000 # Значения по умолчанию для данных DEFAULT_NUMERIC_VALUE = -1.0 MINIMUM_BANDWIDTH_MHZ = 0.08 def get_all_constants(): sats = [sat.name for sat in Satellite.objects.all()] standards = [sat.name for sat in Standard.objects.all()] pols = [sat.name for sat in Polarization.objects.all()] mirrors = [sat.name for sat in Mirror.objects.all()] modulations = [sat.name for sat in Modulation.objects.all()] return sats, standards, pols, mirrors, modulations def coords_transform(coords: str): lat_part, lon_part = coords.strip().split() sign_map = {"N": 1, "E": 1, "S": -1, "W": -1} lat_sign_char = lat_part[-1] lat_value = float(lat_part[:-1].replace(",", ".")) latitude = lat_value * sign_map.get(lat_sign_char, 1) lon_sign_char = lon_part[-1] lon_value = float(lon_part[:-1].replace(",", ".")) longitude = lon_value * sign_map.get(lon_sign_char, 1) return (longitude, latitude) def remove_str(s: str): if isinstance(s, str): if ( s.strip() == "-" or s.strip() == "" or s.strip() == " " or "неизв" in s.strip() ): return -1 return float(s.strip().replace(",", ".")) return s def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None): try: df.rename(columns={"Модуляция ": "Модуляция"}, inplace=True) except Exception as e: print(e) consts = get_all_constants() df.fillna(-1, inplace=True) for stroka in df.iterrows(): geo_point = Point(coords_transform(stroka[1]["Координаты"]), srid=4326) valid_point = None kupsat_point = None try: if ( stroka[1]["Координаты объекта"] != -1 and stroka[1]["Координаты Кубсата"] != "+" ): if ( "ИРИ" not in stroka[1]["Координаты объекта"] and "БЛА" not in stroka[1]["Координаты объекта"] ): valid_point = list( map( float, stroka[1]["Координаты объекта"] .replace(",", ".") .split(". "), ) ) valid_point = Point(valid_point[1], valid_point[0], srid=4326) if ( stroka[1]["Координаты Кубсата"] != -1 and stroka[1]["Координаты Кубсата"] != "+" ): kupsat_point = list( map( float, stroka[1]["Координаты Кубсата"].replace(",", ".").split(". "), ) ) kupsat_point = Point(kupsat_point[1], kupsat_point[0], srid=4326) except KeyError: print("В таблице нет столбцов с координатами кубсата") try: polarization_obj, _ = Polarization.objects.get_or_create( name=stroka[1]["Поляризация"].strip() ) except KeyError: polarization_obj, _ = Polarization.objects.get_or_create(name="-") freq = remove_str(stroka[1]["Частота, МГц"]) freq_line = remove_str(stroka[1]["Полоса, МГц"]) v = remove_str(stroka[1]["Символьная скорость, БОД"]) try: mod_obj, _ = Modulation.objects.get_or_create( name=stroka[1]["Модуляция"].strip() ) except AttributeError: mod_obj, _ = Modulation.objects.get_or_create(name="-") snr = remove_str(stroka[1]["ОСШ"]) date = stroka[1]["Дата"].date() time_ = stroka[1]["Время"] if isinstance(time_, str): time_ = time_.strip() time_ = time(0, 0, 0) timestamp = datetime.combine(date, time_) current_mirrors = [] mirror_1 = stroka[1]["Зеркало 1"].strip().split("\n") mirror_2 = stroka[1]["Зеркало 2"].strip().split("\n") if len(mirror_1) > 1: for mir in mirror_1: mir_obj, _ = Mirror.objects.get_or_create(name=mir.strip()) current_mirrors.append(mir.strip()) elif mirror_1[0] not in consts[3]: mir_obj, _ = Mirror.objects.get_or_create(name=mirror_1[0].strip()) current_mirrors.append(mirror_1[0].strip()) if len(mirror_2) > 1: for mir in mirror_2: mir_obj, _ = Mirror.objects.get_or_create(name=mir.strip()) current_mirrors.append(mir.strip()) elif mirror_2[0] not in consts[3]: mir_obj, _ = Mirror.objects.get_or_create(name=mirror_2[0].strip()) current_mirrors.append(mirror_2[0].strip()) location = stroka[1]["Местоопределение"].strip() comment = stroka[1]["Комментарий"] source = stroka[1]["Объект наблюдения"] user_to_use = current_user if current_user else CustomUser.objects.get(id=1) geo, _ = Geo.objects.get_or_create( timestamp=timestamp, coords=geo_point, defaults={ "coords_kupsat": kupsat_point, "coords_valid": valid_point, "location": location, "comment": comment, "is_average": (comment != -1.0), }, ) geo.save() geo.mirrors.set(Mirror.objects.filter(name__in=current_mirrors)) # Check if ObjItem with same geo already exists existing_obj_item = ObjItem.objects.filter(geo_obj=geo).first() if existing_obj_item: # Check if parameter with same values exists for this object if ( hasattr(existing_obj_item, 'parameter_obj') and existing_obj_item.parameter_obj and existing_obj_item.parameter_obj.id_satellite == sat and existing_obj_item.parameter_obj.polarization == polarization_obj and existing_obj_item.parameter_obj.frequency == freq and existing_obj_item.parameter_obj.freq_range == freq_line and existing_obj_item.parameter_obj.bod_velocity == v and existing_obj_item.parameter_obj.modulation == mod_obj and existing_obj_item.parameter_obj.snr == snr ): # Skip creating duplicate continue # Create new ObjItem and Parameter obj_item = ObjItem.objects.create(name=source, created_by=user_to_use) vch_load_obj = Parameter.objects.create( id_satellite=sat, polarization=polarization_obj, frequency=freq, freq_range=freq_line, bod_velocity=v, modulation=mod_obj, snr=snr, objitem=obj_item ) geo.objitem = obj_item geo.save() def add_satellite_list(): sats = [ "AZERSPACE 2", "Amos 4", "Astra 4A", "ComsatBW-1", "Eutelsat 16A", "Eutelsat 21B", "Eutelsat 7B", "ExpressAM6", "Hellas Sat 3", "Intelsat 39", "Intelsat 17", "NSS 12", "Sicral 2", "SkyNet 5B", "SkyNet 5D", "Syracuse 4A", "Turksat 3A", "Turksat 4A", "WGS 10", "Yamal 402", ] for sat in sats: sat_obj, _ = Satellite.objects.get_or_create(name=sat) sat_obj.save() def parse_string(s: str): pattern = r"^(.+?) (-?\d+\,\d+) \[(-?\d+\,\d+)\] ([^\s]+) ([A-Za-z]) - (\d{1,2}\.\d{1,2}\.\d{1,4} \d{1,2}:\d{1,2}:\d{1,2})$" match = re.match(pattern, s) if match: return list(match.groups()) else: raise ValueError("Некорректный формат строки") def get_point_from_json(filepath: str): with open(filepath, encoding="utf-8-sig") as jf: data = json.load(jf) for obj in data: if not obj.get("bearingBehavior", {}): if obj["tacticObjectType"] == "source": # if not obj['bearingBehavior']: source_id = obj["id"] name = obj["name"] elements = parse_string(name) sat_name = elements[0] freq = elements[1] freq_range = elements[2] pol = elements[4] timestamp = datetime.strptime(elements[-1], "%d.%m.%y %H:%M:%S") lat = None lon = None for pos in data: if pos["id"] == source_id and pos["tacticObjectType"] == "position": lat = pos["latitude"] lon = pos["longitude"] break print( f"Name - {sat_name}, f - {freq}, f range - {freq_range}, pol - {pol} " f"time - {timestamp}, pos - ({lat}, {lon})" ) def get_points_from_csv(file_content, current_user=None): df = pd.read_csv( io.StringIO(file_content), sep=";", names=[ "id", "obj", "lat", "lon", "h", "time", "sat", "norad_id", "freq", "f_range", "et", "qaul", "mir_1", "mir_2", "mir_3", ], ) df[["lat", "lon", "freq", "f_range"]] = ( df[["lat", "lon", "freq", "f_range"]] .replace(",", ".", regex=True) .astype(float) ) df["time"] = pd.to_datetime(df["time"], format="%d.%m.%Y %H:%M:%S") for row in df.iterrows(): row = row[1] match row["obj"].split(" ")[-1]: case "V": pol = "Вертикальная" case "H": pol = "Горизонтальная" case "R": pol = "Правая" case "L": pol = "Левая" case _: pol = "-" pol_obj, _ = Polarization.objects.get_or_create(name=pol) sat_obj, _ = Satellite.objects.get_or_create( name=row["sat"], defaults={"norad": row["norad_id"]} ) mir_1_obj, _ = Mirror.objects.get_or_create(name=row["mir_1"]) mir_2_obj, _ = Mirror.objects.get_or_create(name=row["mir_2"]) mir_lst = [row["mir_1"], row["mir_2"]] if not pd.isna(row["mir_3"]): mir_3_obj, _ = Mirror.objects.get_or_create(name=row["mir_3"]) user_to_use = current_user if current_user else CustomUser.objects.get(id=1) geo_obj, _ = Geo.objects.get_or_create( timestamp=row["time"], coords=Point(row["lon"], row["lat"], srid=4326), defaults={ "is_average": False, # 'id_user_add': user_to_use, }, ) geo_obj.mirrors.set(Mirror.objects.filter(name__in=mir_lst)) # Check if ObjItem with same geo already exists existing_obj_item = ObjItem.objects.filter(geo_obj=geo_obj).first() if existing_obj_item: # Check if parameter with same values exists for this object if ( hasattr(existing_obj_item, 'parameter_obj') and existing_obj_item.parameter_obj and existing_obj_item.parameter_obj.id_satellite == sat_obj and existing_obj_item.parameter_obj.polarization == pol_obj and existing_obj_item.parameter_obj.frequency == row["freq"] and existing_obj_item.parameter_obj.freq_range == row["f_range"] ): # Skip creating duplicate continue # Create new ObjItem and Parameter obj_item = ObjItem.objects.create(name=row["obj"], created_by=user_to_use) vch_load_obj = Parameter.objects.create( id_satellite=sat_obj, polarization=pol_obj, frequency=row["freq"], freq_range=row["f_range"], objitem=obj_item ) geo_obj.objitem = obj_item geo_obj.save() def get_vch_load_from_html(file, sat: Satellite) -> None: filename = file.name.split("_") transfer = filename[3] match filename[2]: case "H": pol = "Горизонтальная" case "V": pol = "Вертикальная" case "R": pol = "Правая" case "L": pol = "Левая" case _: pol = "-" tables = pd.read_html(file, encoding="windows-1251") df = tables[0] df = df.drop(0).reset_index(drop=True) df.columns = df.iloc[0] df = df.drop(0).reset_index(drop=True) df.replace("Неизвестно", "-", inplace=True) df[["Частота, МГц", "Полоса, МГц", "Мощность, дБм"]] = df[ ["Частота, МГц", "Полоса, МГц", "Мощность, дБм"] ].apply(pd.to_numeric) df["Время начала измерения"] = df["Время начала измерения"].apply( lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S") ) df["Время окончания измерения"] = df["Время окончания измерения"].apply( lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S") ) for stroka in df.iterrows(): value = stroka[1] if value["Полоса, МГц"] < 0.08: continue if "-" in value["Символьная скорость"]: bod_velocity = -1.0 else: bod_velocity = value["Символьная скорость"] if "-" in value["Сигнал/шум, дБ"]: snr = -1.0 else: snr = value["Сигнал/шум, дБ"] if value["Пакетность"] == "да": pack = True elif value["Пакетность"] == "нет": pack = False else: pack = None polarization, _ = Polarization.objects.get_or_create(name=pol) mod, _ = Modulation.objects.get_or_create(name=value["Модуляция"]) standard, _ = Standard.objects.get_or_create(name=value["Стандарт"]) sigma_load, _ = SigmaParameter.objects.get_or_create( id_satellite=sat, frequency=value["Частота, МГц"], freq_range=value["Полоса, МГц"], polarization=polarization, defaults={ "transfer": float(transfer), # "polarization": polarization, "status": value["Статус"], "power": value["Мощность, дБм"], "bod_velocity": bod_velocity, "modulation": mod, "snr": snr, "packets": pack, "datetime_begin": value["Время начала измерения"], "datetime_end": value["Время окончания измерения"], }, ) sigma_load.save() def get_frequency_tolerance_percent(freq_range_mhz: float) -> float: """ Определяет процент погрешности центральной частоты в зависимости от полосы частот. Args: freq_range_mhz (float): Полоса частот в МГц Returns: float: Процент погрешности для центральной частоты Диапазоны: - 0 - 0.5 МГц (0 - 500 кГц): 0.1% - 0.5 - 1.5 МГц (500 кГц - 1.5 МГц): 0.5% - 1.5 - 5 МГц: 1% - 5 - 10 МГц: 2% - > 10 МГц: 5% """ if freq_range_mhz < 0.5: return 0.005 elif freq_range_mhz < 1.5: return 0.01 elif freq_range_mhz < 5.0: return 0.02 elif freq_range_mhz < 10.0: return 0.05 else: return 0.1 def compare_and_link_vch_load( sat_id: Satellite, eps_freq: float, eps_frange: float, ku_range: float ): """ Привязывает SigmaParameter к Parameter на основе совпадения параметров. Погрешность центральной частоты определяется автоматически в зависимости от полосы частот: - 0-500 кГц: 0.1% - 500 кГц-1.5 МГц: 0.5% - 1.5-5 МГц: 1% - 5-10 МГц: 2% - >10 МГц: 5% Args: sat_id (Satellite): Спутник для фильтрации eps_freq (float): Не используется (оставлен для обратной совместимости) eps_frange (float): Погрешность полосы частот в процентах ku_range (float): Не используется (оставлен для обратной совместимости) Returns: tuple: (количество объектов, количество привязок) """ # Получаем все ObjItem с Parameter для данного спутника item_obj = ObjItem.objects.filter( parameter_obj__id_satellite=sat_id ).select_related('parameter_obj', 'parameter_obj__polarization') vch_sigma = SigmaParameter.objects.filter( id_satellite=sat_id ).select_related('polarization') link_count = 0 obj_count = item_obj.count() for obj in item_obj: vch_load = obj.parameter_obj # Пропускаем объекты с некорректной частотой if not vch_load or vch_load.frequency == -1.0: continue # Определяем погрешность частоты на основе полосы freq_tolerance_percent = get_frequency_tolerance_percent(vch_load.freq_range) # Вычисляем допустимое отклонение частоты в МГц freq_tolerance_mhz = vch_load.frequency * freq_tolerance_percent / 100 # Вычисляем допустимое отклонение полосы в МГц frange_tolerance_mhz = vch_load.freq_range * eps_frange / 100 for sigma in vch_sigma: # Проверяем совпадение по всем параметрам freq_match = abs(sigma.transfer_frequency - vch_load.frequency) <= freq_tolerance_mhz frange_match = abs(sigma.freq_range - vch_load.freq_range) <= frange_tolerance_mhz pol_match = sigma.polarization == vch_load.polarization if freq_match and frange_match and pol_match: sigma.parameter = vch_load sigma.save() link_count += 1 return obj_count, link_count def kub_report(data_in: io.StringIO) -> pd.DataFrame: df_in = pd.read_excel(data_in) df = pd.DataFrame( columns=[ "Дата", "Широта", "Долгота", "Высота", "Населённый пункт", "ИСЗ", "Прямой канал, МГц", "Обратный канал, МГц", "Перенос, МГц", "Полоса, МГц", "Зеркала", ] ) for row in df_in.iterrows(): value = row[1] date = datetime.date(datetime.now()) isz = value["ИСЗ"] try: lat = float(value["Широта, град"].strip().replace(",", ".")) lon = float(value["Долгота, град"].strip().replace(",", ".")) downlink = float(value["Обратный канал, МГц"].strip().replace(",", ".")) freq_range = float(value["Полоса, МГц"].strip().replace(",", ".")) except Exception as e: lat = value["Широта, град"] lon = value["Долгота, град"] downlink = value["Обратный канал, МГц"] freq_range = value["Полоса, МГц"] print(e) norad = int(re.findall(r"\((\d+)\)", isz)[0]) sat_obj = Satellite.objects.get(norad=norad) pol_obj = Polarization.objects.get(name=value["Поляризация"].strip()) transponder = Transponders.objects.filter( sat_id=sat_obj, polarization=pol_obj, downlink__gte=downlink - F("frequency_range") / 2, downlink__lte=downlink + F("frequency_range") / 2, ).first() # try: # location = geolocator.reverse(f"{lat}, {lon}", language="ru").raw['address'] # loc_name = location.get('city', '') or location.get('town', '') or location.get('province', '') or location.get('country', '') # except AttributeError: # loc_name = '' # sleep(1) loc_name = "" if transponder: # and not (len(transponder) > 1): transfer = transponder.transfer uplink = transfer + downlink new_row = pd.DataFrame( [ { "Дата": date, "Широта": lat, "Долгота": lon, "Высота": 0.0, "Населённый пункт": loc_name, "ИСЗ": isz, "Прямой канал, МГц": uplink, "Обратный канал, МГц": downlink, "Перенос, МГц": transfer, "Полоса, МГц": freq_range, "Зеркала": "", } ] ) df = pd.concat([df, new_row], ignore_index=True) else: print("Ничего не найдено в транспондерах") return df # ============================================================================ # Утилиты для форматирования # ============================================================================ def format_coordinates(longitude: float, latitude: float) -> str: """ Форматирует координаты в читаемый вид. Преобразует числовые координаты в формат с указанием направления (N/S для широты, E/W для долготы). Args: longitude (float): Долгота в десятичных градусах. latitude (float): Широта в десятичных градусах. Returns: str: Отформатированная строка координат в формате "XXN/S YYE/W". Example: >>> format_coordinates(37.62, 55.75) '55.75N 37.62E' >>> format_coordinates(-122.42, 37.77) '37.77N 122.42W' """ lon_direction = "E" if longitude > 0 else "W" lat_direction = "N" if latitude > 0 else "S" lon_value = abs(longitude) lat_value = abs(latitude) return f"{lat_value}{lat_direction} {lon_value}{lon_direction}" def parse_pagination_params( request, default_per_page: int = DEFAULT_ITEMS_PER_PAGE ) -> tuple: """ Извлекает и валидирует параметры пагинации из запроса. Args: request: HTTP запрос Django. default_per_page (int): Количество элементов на странице по умолчанию. Returns: tuple: Кортеж (page_number, items_per_page), где: - page_number (int): Номер текущей страницы (по умолчанию 1). - items_per_page (int): Количество элементов на странице. Example: >>> page, per_page = parse_pagination_params(request, default_per_page=100) >>> paginator = Paginator(objects, per_page) >>> page_obj = paginator.get_page(page) """ page_number = request.GET.get("page", 1) items_per_page = request.GET.get("items_per_page", str(default_per_page)) # Валидация page_number try: page_number = int(page_number) if page_number < 1: page_number = 1 except (ValueError, TypeError): page_number = 1 # Валидация items_per_page try: items_per_page = int(items_per_page) if items_per_page < 1: items_per_page = default_per_page # Ограничиваем максимальное значение для предотвращения перегрузки if items_per_page > MAX_ITEMS_PER_PAGE: items_per_page = MAX_ITEMS_PER_PAGE except (ValueError, TypeError): items_per_page = default_per_page return page_number, items_per_page def get_first_param_subquery(field_name: str): """ Возвращает F() выражение для доступа к полю параметра через OneToOne связь. После рефакторинга связи Parameter-ObjItem с ManyToMany на OneToOne, эта функция упрощена для возврата прямого F() выражения вместо подзапроса. Args: field_name (str): Имя поля модели Parameter для извлечения. Может включать связанные поля через __ (например, 'id_satellite__name'). Returns: F: Django F() объект для использования в annotate(). Example: >>> freq_expr = get_first_param_subquery('frequency') >>> objects = ObjItem.objects.annotate(first_freq=freq_expr) >>> for obj in objects: ... print(obj.first_freq) """ return F(f"parameter_obj__{field_name}")