# Standard library imports import io import json import re from datetime import datetime, time # Django imports from django.contrib.gis.geos import Point from django.db.models import F # Third-party imports import pandas as pd # Local imports from mapsapp.models import Transponders from .models import ( CustomUser, Geo, Mirror, Modulation, ObjItem, Parameter, Polarization, Satellite, SigmaParameter, Source, Standard, ) # ============================================================================ # Константы # ============================================================================ # Значения по умолчанию для пагинации DEFAULT_ITEMS_PER_PAGE = 50 MAX_ITEMS_PER_PAGE = 10000 # Значения по умолчанию для данных DEFAULT_NUMERIC_VALUE = -1.0 MINIMUM_BANDWIDTH_MHZ = 0.08 def get_all_constants(): sats = [sat.name for sat in Satellite.objects.all()] standards = [sat.name for sat in Standard.objects.all()] pols = [sat.name for sat in Polarization.objects.all()] mirrors = [sat.name for sat in Mirror.objects.all()] modulations = [sat.name for sat in Modulation.objects.all()] return sats, standards, pols, mirrors, modulations def coords_transform(coords: str): lat_part, lon_part = coords.strip().split() sign_map = {"N": 1, "E": 1, "S": -1, "W": -1} lat_sign_char = lat_part[-1] lat_value = float(lat_part[:-1].replace(",", ".")) latitude = lat_value * sign_map.get(lat_sign_char, 1) lon_sign_char = lon_part[-1] lon_value = float(lon_part[:-1].replace(",", ".")) longitude = lon_value * sign_map.get(lon_sign_char, 1) return (longitude, latitude) def remove_str(s: str): if isinstance(s, str): if ( s.strip() == "-" or s.strip() == "" or s.strip() == " " or "неизв" in s.strip() ): return -1 return float(s.strip().replace(",", ".")) return s def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None): """ Импортирует данные из DataFrame с группировкой близких координат. Алгоритм: 1. Извлечь все координаты и данные строк из DataFrame 2. Создать список необработанных записей (координата + данные строки) 3. Пока список не пуст: a. Взять первую запись из списка b. Создать новый Source с coords_average = эта координата c. Создать ObjItem для этой записи и связать с Source d. Удалить запись из списка e. Для каждой оставшейся записи в списке: - Вычислить расстояние от её координаты до coords_average - Если расстояние <= 0.5 градуса: * Вычислить новое среднее ИНКРЕМЕНТАЛЬНО: new_avg = (coords_average + current_coord) / 2 * Обновить coords_average в Source * Создать ObjItem для этой записи и связать с Source * Удалить запись из списка - Иначе: пропустить и проверить следующую запись 4. Сохранить все изменения в БД Важно: Среднее вычисляется инкрементально - каждая новая точка усредняется с текущим средним, а не со всеми точками кластера. Args: df: DataFrame с данными sat: объект Satellite current_user: текущий пользователь (optional) Returns: int: количество созданных Source """ try: df.rename(columns={"Модуляция ": "Модуляция"}, inplace=True) except Exception as e: print(e) consts = get_all_constants() df.fillna(-1, inplace=True) # Шаг 1: Извлечь все координаты и данные строк из DataFrame unprocessed_records = [] for idx, row in df.iterrows(): try: # Извлекаем координату coord_tuple = coords_transform(row["Координаты"]) # Сохраняем запись с координатой и данными строки unprocessed_records.append({"coord": coord_tuple, "row": row, "index": idx}) except Exception as e: print(f"Ошибка при обработке строки {idx}: {e}") continue user_to_use = current_user if current_user else CustomUser.objects.get(id=1) source_count = 0 # Шаг 3: Цикл обработки пока список не пуст while unprocessed_records: # Шаг 3a: Взять первую запись из списка first_record = unprocessed_records.pop(0) first_coord = first_record["coord"] # Шаг 3b: Создать новый Source с coords_average = эта координата source = Source.objects.create( coords_average=Point(first_coord, srid=4326), created_by=user_to_use ) source_count += 1 # Шаг 3c: Создать ObjItem для этой записи и связать с Source _create_objitem_from_row(first_record["row"], sat, source, user_to_use, consts) # Шаг 3e: Для каждой оставшейся записи в списке records_to_remove = [] for i, record in enumerate(unprocessed_records): current_coord = record["coord"] # Вычислить расстояние от координаты до coords_average current_avg = (source.coords_average.x, source.coords_average.y) distance = calculate_distance_degrees(current_avg, current_coord) # Если расстояние <= 0.5 градуса if distance <= 0.5: # Вычислить новое среднее ИНКРЕМЕНТАЛЬНО new_avg = calculate_average_coords_incremental( current_avg, current_coord ) # Обновить coords_average в Source source.coords_average = Point(new_avg, srid=4326) source.save() # Создать ObjItem для этой записи и связать с Source _create_objitem_from_row( record["row"], sat, source, user_to_use, consts ) # Пометить запись для удаления records_to_remove.append(i) # Удалить обработанные записи из списка (в обратном порядке, чтобы не сбить индексы) for i in reversed(records_to_remove): unprocessed_records.pop(i) return source_count def _create_objitem_from_row(row, sat, source, user_to_use, consts): """ Вспомогательная функция для создания ObjItem из строки DataFrame. Args: row: строка DataFrame sat: объект Satellite source: объект Source для связи user_to_use: пользователь для created_by consts: константы из get_all_constants() """ # Извлекаем координату geo_point = Point(coords_transform(row["Координаты"]), srid=4326) # Обработка поляризации try: polarization_obj, _ = Polarization.objects.get_or_create( name=row["Поляризация"].strip() ) except KeyError: polarization_obj, _ = Polarization.objects.get_or_create(name="-") # Обработка ВЧ параметров freq = remove_str(row["Частота, МГц"]) freq_line = remove_str(row["Полоса, МГц"]) v = remove_str(row["Символьная скорость, БОД"]) try: mod_obj, _ = Modulation.objects.get_or_create(name=row["Модуляция"].strip()) except AttributeError: mod_obj, _ = Modulation.objects.get_or_create(name="-") snr = remove_str(row["ОСШ"]) # Обработка времени date = row["Дата"].date() time_ = row["Время"] if isinstance(time_, str): time_ = time_.strip() time_ = time(0, 0, 0) timestamp = datetime.combine(date, time_) # Обработка зеркал current_mirrors = [] mirror_1 = row["Зеркало 1"].strip().split("\n") mirror_2 = row["Зеркало 2"].strip().split("\n") if len(mirror_1) > 1: for mir in mirror_1: Mirror.objects.get_or_create(name=mir.strip()) current_mirrors.append(mir.strip()) elif mirror_1[0] not in consts[3]: Mirror.objects.get_or_create(name=mirror_1[0].strip()) current_mirrors.append(mirror_1[0].strip()) if len(mirror_2) > 1: for mir in mirror_2: Mirror.objects.get_or_create(name=mir.strip()) current_mirrors.append(mir.strip()) elif mirror_2[0] not in consts[3]: Mirror.objects.get_or_create(name=mirror_2[0].strip()) current_mirrors.append(mirror_2[0].strip()) location = row["Местоопределение"].strip() comment = row["Комментарий"] source_name = row["Объект наблюдения"] # Создаем Geo объект (БЕЗ coords_kupsat и coords_valid) geo, _ = Geo.objects.get_or_create( timestamp=timestamp, coords=geo_point, defaults={ "location": location, "comment": comment, "is_average": (comment != -1.0), }, ) geo.save() geo.mirrors.set(Mirror.objects.filter(name__in=current_mirrors)) # Проверяем, существует ли уже ObjItem с таким же geo existing_obj_item = ObjItem.objects.filter(geo_obj=geo).first() if existing_obj_item: # Проверяем, существует ли parameter с такими же значениями if ( hasattr(existing_obj_item, "parameter_obj") and existing_obj_item.parameter_obj and existing_obj_item.parameter_obj.id_satellite == sat and existing_obj_item.parameter_obj.polarization == polarization_obj and existing_obj_item.parameter_obj.frequency == freq and existing_obj_item.parameter_obj.freq_range == freq_line and existing_obj_item.parameter_obj.bod_velocity == v and existing_obj_item.parameter_obj.modulation == mod_obj and existing_obj_item.parameter_obj.snr == snr ): # Пропускаем создание дубликата return # Создаем новый ObjItem и связываем с Source obj_item = ObjItem.objects.create( name=source_name, source=source, created_by=user_to_use ) # Создаем Parameter Parameter.objects.create( id_satellite=sat, polarization=polarization_obj, frequency=freq, freq_range=freq_line, bod_velocity=v, modulation=mod_obj, snr=snr, objitem=obj_item, ) # Связываем geo с objitem geo.objitem = obj_item geo.save() def add_satellite_list(): sats = [ "AZERSPACE 2", "Amos 4", "Astra 4A", "ComsatBW-1", "Eutelsat 16A", "Eutelsat 21B", "Eutelsat 7B", "ExpressAM6", "Hellas Sat 3", "Intelsat 39", "Intelsat 17", "NSS 12", "Sicral 2", "SkyNet 5B", "SkyNet 5D", "Syracuse 4A", "Turksat 3A", "Turksat 4A", "WGS 10", "Yamal 402", ] for sat in sats: sat_obj, _ = Satellite.objects.get_or_create(name=sat) sat_obj.save() def parse_string(s: str): pattern = r"^(.+?) (-?\d+\,\d+) \[(-?\d+\,\d+)\] ([^\s]+) ([A-Za-z]) - (\d{1,2}\.\d{1,2}\.\d{1,4} \d{1,2}:\d{1,2}:\d{1,2})$" match = re.match(pattern, s) if match: return list(match.groups()) else: raise ValueError("Некорректный формат строки") def get_point_from_json(filepath: str): with open(filepath, encoding="utf-8-sig") as jf: data = json.load(jf) for obj in data: if not obj.get("bearingBehavior", {}): if obj["tacticObjectType"] == "source": # if not obj['bearingBehavior']: source_id = obj["id"] name = obj["name"] elements = parse_string(name) sat_name = elements[0] freq = elements[1] freq_range = elements[2] pol = elements[4] timestamp = datetime.strptime(elements[-1], "%d.%m.%y %H:%M:%S") lat = None lon = None for pos in data: if pos["id"] == source_id and pos["tacticObjectType"] == "position": lat = pos["latitude"] lon = pos["longitude"] break print( f"Name - {sat_name}, f - {freq}, f range - {freq_range}, pol - {pol} " f"time - {timestamp}, pos - ({lat}, {lon})" ) def get_points_from_csv(file_content, current_user=None): """ Импортирует данные из CSV с группировкой близких координат. Улучшенный алгоритм с учетом существующих Source: 1. Извлечь все координаты и данные строк из DataFrame 2. Создать список необработанных записей (координата + данные строки) 3. Получить все существующие Source из БД 4. Для каждой записи: a. Проверить, существует ли дубликат (координаты + частота) b. Если дубликат найден, пропустить запись c. Найти ближайший существующий Source (расстояние <= 0.5 градуса) d. Если найден: - Обновить coords_average этого Source (инкрементально) - Создать ObjItem и связать с этим Source e. Если не найден: - Создать новый Source - Создать ObjItem и связать с новым Source - Добавить новый Source в список существующих 5. Сохранить все изменения в БД Args: file_content: содержимое CSV файла current_user: текущий пользователь (optional) Returns: int: количество созданных Source """ df = pd.read_csv( io.StringIO(file_content), sep=";", names=[ "id", "obj", "lat", "lon", "h", "time", "sat", "norad_id", "freq", "f_range", "et", "qaul", "mir_1", "mir_2", "mir_3", ], ) df[["lat", "lon", "freq", "f_range"]] = ( df[["lat", "lon", "freq", "f_range"]] .replace(",", ".", regex=True) .astype(float) ) df["time"] = pd.to_datetime(df["time"], format="%d.%m.%Y %H:%M:%S") # Шаг 1: Извлечь все координаты и данные строк из DataFrame records = [] for idx, row in df.iterrows(): try: # Извлекаем координату из колонок lat и lon coord_tuple = (row["lon"], row["lat"]) # Сохраняем запись с координатой и данными строки records.append({"coord": coord_tuple, "row": row, "index": idx}) except Exception as e: print(f"Ошибка при обработке строки {idx}: {e}") continue user_to_use = current_user if current_user else CustomUser.objects.get(id=1) # Шаг 3: Получить все существующие Source из БД existing_sources = list(Source.objects.filter(coords_average__isnull=False)) new_sources_count = 0 added_count = 0 skipped_count = 0 # Шаг 4: Обработка каждой записи for record in records: current_coord = record["coord"] row = record["row"] # Шаг 4a: Проверить, существует ли дубликат (координаты + частота) if _is_duplicate_objitem(current_coord, row["freq"], row["f_range"]): skipped_count += 1 continue # Шаг 4c: Найти ближайший существующий Source closest_source = None min_distance = float('inf') for source in existing_sources: source_coord = (source.coords_average.x, source.coords_average.y) distance = calculate_distance_degrees(source_coord, current_coord) if distance < min_distance: min_distance = distance closest_source = source # Шаг 4d: Если найден близкий Source (расстояние <= 0.5 градуса) if closest_source and min_distance <= 0.5: # Обновить coords_average инкрементально current_avg = (closest_source.coords_average.x, closest_source.coords_average.y) new_avg = calculate_average_coords_incremental(current_avg, current_coord) closest_source.coords_average = Point(new_avg, srid=4326) closest_source.save() # Создать ObjItem и связать с существующим Source _create_objitem_from_csv_row(row, closest_source, user_to_use) added_count += 1 else: # Шаг 4e: Создать новый Source new_source = Source.objects.create( coords_average=Point(current_coord, srid=4326), created_by=user_to_use ) new_sources_count += 1 # Создать ObjItem и связать с новым Source _create_objitem_from_csv_row(row, new_source, user_to_use) added_count += 1 # Добавить новый Source в список существующих existing_sources.append(new_source) print(f"Импорт завершен: создано {new_sources_count} новых источников, " f"добавлено {added_count} точек, пропущено {skipped_count} дубликатов") return new_sources_count def _is_duplicate_objitem(coord_tuple, frequency, freq_range, tolerance=0.001): """ Проверяет, существует ли уже ObjItem с такими же координатами и частотой. Args: coord_tuple: кортеж (lon, lat) координат frequency: частота в МГц freq_range: полоса частот в МГц tolerance: допуск для сравнения координат в градусах (по умолчанию 0.001 ≈ 100м) Returns: bool: True если дубликат найден, False иначе """ # Ищем ObjItems с близкими координатами через geo_obj nearby_objitems = ObjItem.objects.filter( geo_obj__coords__isnull=False ).select_related('parameter_obj', 'geo_obj') for objitem in nearby_objitems: if not objitem.geo_obj or not objitem.geo_obj.coords: continue # Проверяем расстояние между координатами geo_coord = (objitem.geo_obj.coords.x, objitem.geo_obj.coords.y) distance = calculate_distance_degrees(coord_tuple, geo_coord) if distance <= tolerance: # Координаты совпадают, проверяем частоту if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj: param = objitem.parameter_obj # Проверяем совпадение частоты с небольшим допуском (0.1 МГц) if (abs(param.frequency - frequency) < 0.1 and abs(param.freq_range - freq_range) < 0.1): return True return False def _create_objitem_from_csv_row(row, source, user_to_use): """ Вспомогательная функция для создания ObjItem из строки CSV DataFrame. Args: row: строка DataFrame source: объект Source для связи user_to_use: пользователь для created_by """ # Определяем поляризацию match row["obj"].split(" ")[-1]: case "V": pol = "Вертикальная" case "H": pol = "Горизонтальная" case "R": pol = "Правая" case "L": pol = "Левая" case _: pol = "-" pol_obj, _ = Polarization.objects.get_or_create(name=pol) sat_obj, _ = Satellite.objects.get_or_create( name=row["sat"], defaults={"norad": row["norad_id"]} ) mir_1_obj, _ = Mirror.objects.get_or_create(name=row["mir_1"]) mir_2_obj, _ = Mirror.objects.get_or_create(name=row["mir_2"]) mir_lst = [row["mir_1"], row["mir_2"]] if not pd.isna(row["mir_3"]): mir_3_obj, _ = Mirror.objects.get_or_create(name=row["mir_3"]) mir_lst.append(row["mir_3"]) # Создаем Geo объект (БЕЗ coords_kupsat и coords_valid) geo_obj, _ = Geo.objects.get_or_create( timestamp=row["time"], coords=Point(row["lon"], row["lat"], srid=4326), defaults={ "is_average": False, }, ) geo_obj.mirrors.set(Mirror.objects.filter(name__in=mir_lst)) # Проверяем, существует ли уже ObjItem с таким же geo existing_obj_item = ObjItem.objects.filter(geo_obj=geo_obj).first() if existing_obj_item: # Проверяем, существует ли parameter с такими же значениями if ( hasattr(existing_obj_item, "parameter_obj") and existing_obj_item.parameter_obj and existing_obj_item.parameter_obj.id_satellite == sat_obj and existing_obj_item.parameter_obj.polarization == pol_obj and existing_obj_item.parameter_obj.frequency == row["freq"] and existing_obj_item.parameter_obj.freq_range == row["f_range"] ): # Пропускаем создание дубликата return # Создаем новый ObjItem и связываем с Source obj_item = ObjItem.objects.create( name=row["obj"], source=source, created_by=user_to_use ) # Создаем Parameter Parameter.objects.create( id_satellite=sat_obj, polarization=pol_obj, frequency=row["freq"], freq_range=row["f_range"], objitem=obj_item, ) # Связываем geo с objitem geo_obj.objitem = obj_item geo_obj.save() def get_vch_load_from_html(file, sat: Satellite) -> None: filename = file.name.split("_") transfer = filename[3] match filename[2]: case "H": pol = "Горизонтальная" case "V": pol = "Вертикальная" case "R": pol = "Правая" case "L": pol = "Левая" case _: pol = "-" tables = pd.read_html(file, encoding="windows-1251") df = tables[0] df = df.drop(0).reset_index(drop=True) df.columns = df.iloc[0] df = df.drop(0).reset_index(drop=True) df.replace("Неизвестно", "-", inplace=True) df[["Частота, МГц", "Полоса, МГц", "Мощность, дБм"]] = df[ ["Частота, МГц", "Полоса, МГц", "Мощность, дБм"] ].apply(pd.to_numeric) df["Время начала измерения"] = df["Время начала измерения"].apply( lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S") ) df["Время окончания измерения"] = df["Время окончания измерения"].apply( lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S") ) for stroka in df.iterrows(): value = stroka[1] if value["Полоса, МГц"] < 0.08: continue if "-" in value["Символьная скорость"]: bod_velocity = -1.0 else: bod_velocity = value["Символьная скорость"] if "-" in value["Сигнал/шум, дБ"]: snr = -1.0 else: snr = value["Сигнал/шум, дБ"] if value["Пакетность"] == "да": pack = True elif value["Пакетность"] == "нет": pack = False else: pack = None polarization, _ = Polarization.objects.get_or_create(name=pol) mod, _ = Modulation.objects.get_or_create(name=value["Модуляция"]) standard, _ = Standard.objects.get_or_create(name=value["Стандарт"]) sigma_load, _ = SigmaParameter.objects.get_or_create( id_satellite=sat, frequency=value["Частота, МГц"], freq_range=value["Полоса, МГц"], polarization=polarization, defaults={ "transfer": float(transfer), # "polarization": polarization, "status": value["Статус"], "power": value["Мощность, дБм"], "bod_velocity": bod_velocity, "modulation": mod, "snr": snr, "packets": pack, "datetime_begin": value["Время начала измерения"], "datetime_end": value["Время окончания измерения"], }, ) sigma_load.save() def get_frequency_tolerance_percent(freq_range_mhz: float) -> float: """ Определяет процент погрешности центральной частоты в зависимости от полосы частот. Args: freq_range_mhz (float): Полоса частот в МГц Returns: float: Процент погрешности для центральной частоты Диапазоны: - 0 - 0.5 МГц (0 - 500 кГц): 0.1% - 0.5 - 1.5 МГц (500 кГц - 1.5 МГц): 0.5% - 1.5 - 5 МГц: 1% - 5 - 10 МГц: 2% - > 10 МГц: 5% """ if freq_range_mhz < 0.5: return 0.005 elif freq_range_mhz < 1.5: return 0.01 elif freq_range_mhz < 5.0: return 0.02 elif freq_range_mhz < 10.0: return 0.05 else: return 0.1 def compare_and_link_vch_load( sat_id: Satellite, eps_freq: float, eps_frange: float, ku_range: float ): """ Привязывает SigmaParameter к Parameter на основе совпадения параметров. Погрешность центральной частоты определяется автоматически в зависимости от полосы частот: - 0-500 кГц: 0.1% - 500 кГц-1.5 МГц: 0.5% - 1.5-5 МГц: 1% - 5-10 МГц: 2% - >10 МГц: 5% Args: sat_id (Satellite): Спутник для фильтрации eps_freq (float): Не используется (оставлен для обратной совместимости) eps_frange (float): Погрешность полосы частот в процентах ku_range (float): Не используется (оставлен для обратной совместимости) Returns: tuple: (количество объектов, количество привязок) """ # Получаем все ObjItem с Parameter для данного спутника item_obj = ObjItem.objects.filter( parameter_obj__id_satellite=sat_id ).select_related("parameter_obj", "parameter_obj__polarization") vch_sigma = SigmaParameter.objects.filter(id_satellite=sat_id).select_related( "polarization" ) link_count = 0 obj_count = item_obj.count() for obj in item_obj: vch_load = obj.parameter_obj # Пропускаем объекты с некорректной частотой if not vch_load or vch_load.frequency == -1.0: continue # Определяем погрешность частоты на основе полосы freq_tolerance_percent = get_frequency_tolerance_percent(vch_load.freq_range) # Вычисляем допустимое отклонение частоты в МГц freq_tolerance_mhz = vch_load.frequency * freq_tolerance_percent / 100 # Вычисляем допустимое отклонение полосы в МГц frange_tolerance_mhz = vch_load.freq_range * eps_frange / 100 for sigma in vch_sigma: # Проверяем совпадение по всем параметрам freq_match = ( abs(sigma.transfer_frequency - vch_load.frequency) <= freq_tolerance_mhz ) frange_match = ( abs(sigma.freq_range - vch_load.freq_range) <= frange_tolerance_mhz ) pol_match = sigma.polarization == vch_load.polarization if freq_match and frange_match and pol_match: sigma.parameter = vch_load sigma.save() link_count += 1 return obj_count, link_count def kub_report(data_in: io.StringIO) -> pd.DataFrame: df_in = pd.read_excel(data_in) df = pd.DataFrame( columns=[ "Дата", "Широта", "Долгота", "Высота", "Населённый пункт", "ИСЗ", "Прямой канал, МГц", "Обратный канал, МГц", "Перенос, МГц", "Полоса, МГц", "Зеркала", ] ) for row in df_in.iterrows(): value = row[1] date = datetime.date(datetime.now()) isz = value["ИСЗ"] try: lat = float(value["Широта, град"].strip().replace(",", ".")) lon = float(value["Долгота, град"].strip().replace(",", ".")) downlink = float(value["Обратный канал, МГц"].strip().replace(",", ".")) freq_range = float(value["Полоса, МГц"].strip().replace(",", ".")) except Exception as e: lat = value["Широта, град"] lon = value["Долгота, град"] downlink = value["Обратный канал, МГц"] freq_range = value["Полоса, МГц"] print(e) norad = int(re.findall(r"\((\d+)\)", isz)[0]) sat_obj = Satellite.objects.get(norad=norad) pol_obj = Polarization.objects.get(name=value["Поляризация"].strip()) transponder = Transponders.objects.filter( sat_id=sat_obj, polarization=pol_obj, downlink__gte=downlink - F("frequency_range") / 2, downlink__lte=downlink + F("frequency_range") / 2, ).first() # try: # location = geolocator.reverse(f"{lat}, {lon}", language="ru").raw['address'] # loc_name = location.get('city', '') or location.get('town', '') or location.get('province', '') or location.get('country', '') # except AttributeError: # loc_name = '' # sleep(1) loc_name = "" if transponder: # and not (len(transponder) > 1): transfer = transponder.transfer uplink = transfer + downlink new_row = pd.DataFrame( [ { "Дата": date, "Широта": lat, "Долгота": lon, "Высота": 0.0, "Населённый пункт": loc_name, "ИСЗ": isz, "Прямой канал, МГц": uplink, "Обратный канал, МГц": downlink, "Перенос, МГц": transfer, "Полоса, МГц": freq_range, "Зеркала": "", } ] ) df = pd.concat([df, new_row], ignore_index=True) else: print("Ничего не найдено в транспондерах") return df # ============================================================================ # Утилиты для работы с координатами # ============================================================================ def calculate_distance_degrees(coord1: tuple, coord2: tuple) -> float: """ Вычисляет расстояние между двумя координатами в градусах. Использует простую евклидову метрику для малых расстояний. Подходит для определения близости точек в радиусе до нескольких градусов. Args: coord1 (tuple): Первая координата в формате (longitude, latitude) coord2 (tuple): Вторая координата в формате (longitude, latitude) Returns: float: Расстояние в градусах Example: >>> dist = calculate_distance_degrees((37.62, 55.75), (37.63, 55.76)) >>> print(f"{dist:.4f}") # ~0.0141 градусов 0.0141 >>> dist = calculate_distance_degrees((37.62, 55.75), (37.62, 55.75)) >>> print(dist) # Одинаковые координаты 0.0 """ lon1, lat1 = coord1 lon2, lat2 = coord2 # Простая евклидова метрика для малых расстояний # Для более точных расчетов на больших расстояниях можно использовать формулу гаверсинуса delta_lon = lon2 - lon1 delta_lat = lat2 - lat1 distance = (delta_lon**2 + delta_lat**2) ** 0.5 return distance def calculate_average_coords_incremental( current_average: tuple, new_coord: tuple ) -> tuple: """ Вычисляет новое среднее между текущим средним и новой координатой. Использует инкрементальное усреднение: каждая новая точка усредняется с текущим средним, а не со всеми точками кластера. Это упрощенный подход, где новое среднее = (текущее_среднее + новая_координата) / 2. Важно: Это НЕ среднее арифметическое всех точек кластера, а инкрементальное усреднение между двумя точками (текущим средним и новой точкой). Args: current_average (tuple): Текущее среднее в формате (longitude, latitude) new_coord (tuple): Новая координата в формате (longitude, latitude) Returns: tuple: Новое среднее в формате (longitude, latitude) Example: >>> avg1 = (37.62, 55.75) # Первая точка >>> avg2 = calculate_average_coords_incremental(avg1, (37.63, 55.76)) >>> print(avg2) (37.625, 55.755) >>> avg3 = calculate_average_coords_incremental(avg2, (37.64, 55.77)) >>> print(avg3) (37.6325, 55.7625) >>> # Проверка: среднее между одинаковыми точками >>> avg = calculate_average_coords_incremental((37.62, 55.75), (37.62, 55.75)) >>> print(avg) (37.62, 55.75) """ current_lon, current_lat = current_average new_lon, new_lat = new_coord # Инкрементальное усреднение: (current + new) / 2 avg_lon = (current_lon + new_lon) / 2 avg_lat = (current_lat + new_lat) / 2 return (avg_lon, avg_lat) # ============================================================================ # Утилиты для форматирования # ============================================================================ def format_coordinates(longitude: float, latitude: float) -> str: """ Форматирует координаты в читаемый вид. Преобразует числовые координаты в формат с указанием направления (N/S для широты, E/W для долготы). Args: longitude (float): Долгота в десятичных градусах. latitude (float): Широта в десятичных градусах. Returns: str: Отформатированная строка координат в формате "XXN/S YYE/W". Example: >>> format_coordinates(37.62, 55.75) '55.75N 37.62E' >>> format_coordinates(-122.42, 37.77) '37.77N 122.42W' """ lon_direction = "E" if longitude > 0 else "W" lat_direction = "N" if latitude > 0 else "S" lon_value = abs(longitude) lat_value = abs(latitude) return f"{lat_value}{lat_direction} {lon_value}{lon_direction}" def parse_pagination_params( request, default_per_page: int = DEFAULT_ITEMS_PER_PAGE ) -> tuple: """ Извлекает и валидирует параметры пагинации из запроса. Args: request: HTTP запрос Django. default_per_page (int): Количество элементов на странице по умолчанию. Returns: tuple: Кортеж (page_number, items_per_page), где: - page_number (int): Номер текущей страницы (по умолчанию 1). - items_per_page (int): Количество элементов на странице. Example: >>> page, per_page = parse_pagination_params(request, default_per_page=100) >>> paginator = Paginator(objects, per_page) >>> page_obj = paginator.get_page(page) """ page_number = request.GET.get("page", 1) items_per_page = request.GET.get("items_per_page", str(default_per_page)) # Валидация page_number try: page_number = int(page_number) if page_number < 1: page_number = 1 except (ValueError, TypeError): page_number = 1 # Валидация items_per_page try: items_per_page = int(items_per_page) if items_per_page < 1: items_per_page = default_per_page # Ограничиваем максимальное значение для предотвращения перегрузки if items_per_page > MAX_ITEMS_PER_PAGE: items_per_page = MAX_ITEMS_PER_PAGE except (ValueError, TypeError): items_per_page = default_per_page return page_number, items_per_page def get_first_param_subquery(field_name: str): """ Возвращает F() выражение для доступа к полю параметра через OneToOne связь. После рефакторинга связи Parameter-ObjItem с ManyToMany на OneToOne, эта функция упрощена для возврата прямого F() выражения вместо подзапроса. Args: field_name (str): Имя поля модели Parameter для извлечения. Может включать связанные поля через __ (например, 'id_satellite__name'). Returns: F: Django F() объект для использования в annotate(). Example: >>> freq_expr = get_first_param_subquery('frequency') >>> objects = ObjItem.objects.annotate(first_freq=freq_expr) >>> for obj in objects: ... print(obj.first_freq) """ return F(f"parameter_obj__{field_name}")