# Standard library imports import io import json import re from datetime import datetime, time # Django imports from django.contrib.gis.geos import Point from django.db.models import F from django.utils import timezone # Third-party imports import pandas as pd from geographiclib.geodesic import Geodesic # Local imports from mapsapp.models import Transponders from .models import ( CustomUser, Geo, Modulation, ObjItem, Parameter, Polarization, Satellite, SigmaParameter, Source, Standard, ) def find_matching_transponder(satellite, frequency, polarization): """ Находит подходящий транспондер для заданных параметров. Алгоритм: 1. Фильтрует транспондеры по спутнику и поляризации 2. Проверяет, входит ли частота в диапазон транспондера: downlink - frequency_range/2 <= frequency <= downlink + frequency_range/2 3. Возвращает самый свежий транспондер (по created_at) Args: satellite: объект Satellite frequency: частота в МГц polarization: объект Polarization Returns: Transponders или None: найденный транспондер или None """ if not satellite or not polarization or frequency == -1.0: return None # Фильтруем транспондеры по спутнику и поляризации transponders = Transponders.objects.filter( sat_id=satellite, polarization=polarization, downlink__isnull=False, frequency_range__isnull=False ).annotate( # Вычисляем нижнюю и верхнюю границы диапазона lower_bound=F('downlink') - F('frequency_range') / 2, upper_bound=F('downlink') + F('frequency_range') / 2 ).filter( # Проверяем, входит ли частота в диапазон lower_bound__lte=frequency, upper_bound__gte=frequency ).order_by('-created_at') # Сортируем по дате создания (самые свежие первыми) # Возвращаем самый свежий транспондер return transponders.first() def find_matching_lyngsat(satellite, frequency, polarization, tolerance_mhz=0.1): """ Находит подходящий источник LyngSat для заданных параметров. Алгоритм: 1. Фильтрует источники LyngSat по спутнику и поляризации 2. Проверяет, совпадает ли частота с заданной точностью (по умолчанию ±0.1 МГц) 3. Возвращает самый свежий источник (по last_update) Args: satellite: объект Satellite frequency: частота в МГц polarization: объект Polarization tolerance_mhz: допуск по частоте в МГц (по умолчанию 0.1) Returns: LyngSat или None: найденный источник LyngSat или None """ # Импортируем здесь, чтобы избежать циклических импортов from lyngsatapp.models import LyngSat if not satellite or not polarization or frequency == -1.0: return None # Фильтруем источники LyngSat по спутнику и поляризации lyngsat_sources = LyngSat.objects.filter( id_satellite=satellite, polarization=polarization, frequency__isnull=False ).filter( # Проверяем, входит ли частота в допуск frequency__gte=frequency - tolerance_mhz, frequency__lte=frequency + tolerance_mhz ).order_by('-last_update') # Сортируем по дате обновления (самые свежие первыми) # Возвращаем самый свежий источник return lyngsat_sources.first() # ============================================================================ # Константы # ============================================================================ # Значения по умолчанию для пагинации DEFAULT_ITEMS_PER_PAGE = 50 MAX_ITEMS_PER_PAGE = 10000 # Значения по умолчанию для данных DEFAULT_NUMERIC_VALUE = -1.0 MINIMUM_BANDWIDTH_MHZ = 0.08 RANGE_DISTANCE = 56 # ============================================================================ # Вспомогательные функции для работы со спутниками # ============================================================================ class SatelliteNotFoundError(Exception): """Исключение, возникающее когда спутник не найден в базе данных.""" pass def get_satellite_by_norad(norad_id: int) -> Satellite: """ Получает спутник по NORAD ID с обработкой ошибок. Args: norad_id: NORAD ID спутника Returns: Satellite: объект спутника Raises: SatelliteNotFoundError: если спутник не найден ValueError: если norad_id некорректен """ if not norad_id or norad_id == -1: raise ValueError(f"Некорректный NORAD ID: {norad_id}") try: return Satellite.objects.get(norad=norad_id) except Satellite.DoesNotExist: raise SatelliteNotFoundError( f"Спутник с NORAD ID {norad_id} не найден в базе данных. " f"Добавьте спутник в справочник перед импортом данных." ) except Satellite.MultipleObjectsReturned: # Если по какой-то причине есть дубликаты, берем первый return Satellite.objects.filter(norad=norad_id).first() def get_satellite_by_name(name: str) -> Satellite: """ Получает спутник по имени с обработкой ошибок. Args: name: имя спутника Returns: Satellite: объект спутника Raises: SatelliteNotFoundError: если спутник не найден """ if not name or name.strip() == "-": raise ValueError(f"Некорректное имя спутника: {name}") try: return Satellite.objects.get(name=name.strip()) except Satellite.DoesNotExist: raise SatelliteNotFoundError( f"Спутник '{name}' не найден в базе данных. " f"Добавьте спутник в справочник перед импортом данных." ) except Satellite.MultipleObjectsReturned: # Если есть дубликаты по имени, берем первый return Satellite.objects.filter(name=name.strip()).first() def get_all_constants(): sats = [sat.name for sat in Satellite.objects.all()] standards = [sat.name for sat in Standard.objects.all()] pols = [sat.name for sat in Polarization.objects.all()] # mirrors = [sat.name for sat in Mirror.objects.all()] modulations = [sat.name for sat in Modulation.objects.all()] return sats, standards, pols, modulations def find_mirror_satellites(mirror_names: list) -> list: """ Находит спутники, которые соответствуют именам зеркал. Алгоритм: 1. Для каждого имени зеркала: - Обрезать пробелы - Извлечь первую часть имени (до скобки), если есть двойное имя - Привести к нижнему регистру - Найти все спутники, в имени или альтернативном имени которых содержится это имя 2. Вернуть список найденных спутников Примеры обработки: - "DSN-3 (SUPERBIRD-C2)" -> "dsn-3" - "Turksat 3A" -> "turksat 3a" - " Amos 4 " -> "amos 4" Args: mirror_names: список имен зеркал Returns: list: список объектов Satellite """ from django.db.models import Q found_satellites = [] for mirror_name in mirror_names: if not mirror_name or mirror_name == "-": continue # Обрезаем пробелы mirror_name_clean = mirror_name.strip() if not mirror_name_clean or mirror_name_clean == "-": continue # Извлекаем первую часть имени (до скобки), если есть двойное имя # Например: "DSN-3 (SUPERBIRD-C2)" -> "DSN-3" if "(" in mirror_name_clean: mirror_name_clean = mirror_name_clean.split("(")[0].strip() # Приводим к нижнему регистру для поиска mirror_name_lower = mirror_name_clean.lower() if not mirror_name_lower: continue # Ищем спутники, в имени или альтернативном имени которых содержится имя зеркала satellites = Satellite.objects.filter( Q(name__icontains=mirror_name_lower) | Q(alternative_name__icontains=mirror_name_lower) ) found_satellites.extend(satellites) # Убираем дубликаты return list(set(found_satellites)) def coords_transform(coords: str): lat_part, lon_part = coords.strip().split() sign_map = {"N": 1, "E": 1, "S": -1, "W": -1} lat_sign_char = lat_part[-1] lat_value = float(lat_part[:-1].replace(",", ".")) latitude = lat_value * sign_map.get(lat_sign_char, 1) lon_sign_char = lon_part[-1] lon_value = float(lon_part[:-1].replace(",", ".")) longitude = lon_value * sign_map.get(lon_sign_char, 1) return (longitude, latitude) def remove_str(s: str): if isinstance(s, str): if ( s.strip() == "-" or s.strip() == "" or s.strip() == " " or "неизв" in s.strip() ): return -1 return float(s.strip().replace(",", ".")) return s def _find_or_create_source_by_name_and_distance( source_name: str, sat: Satellite, coord: tuple, user ) -> Source: """ Находит или создает Source на основе имени источника, спутника и расстояния. Логика: 1. Ищет все существующие Source с ObjItem, у которых: - Совпадает спутник - Совпадает имя (source_name) 2. Для каждого найденного Source проверяет расстояние до новой координаты 3. Если найден Source в радиусе ≤56 км: - Возвращает его и обновляет coords_average через метод update_coords_average 4. Если не найден подходящий Source: - Создает новый Source с типом "стационарные" Важно: Может существовать несколько Source с одинаковым именем и спутником, но они должны быть географически разделены (>56 км друг от друга). Args: source_name: имя источника (например, "Turksat 3A 10967,397 [9,348] МГц V") sat: объект Satellite coord: координата в формате (lon, lat) user: пользователь для created_by Returns: Source: найденный или созданный объект Source """ # Ищем все существующие ObjItem с таким же именем и спутником existing_objitems = ObjItem.objects.filter( name=source_name, parameter_obj__id_satellite=sat, source__isnull=False, source__coords_average__isnull=False ).select_related('source', 'parameter_obj', 'source__info') # Собираем уникальные Source из найденных ObjItem existing_sources = {} for objitem in existing_objitems: if objitem.source.id not in existing_sources: existing_sources[objitem.source.id] = objitem.source # Проверяем расстояние до каждого существующего Source closest_source = None min_distance = float('inf') for source in existing_sources.values(): if source.coords_average: source_coord = (source.coords_average.x, source.coords_average.y) _, distance = calculate_mean_coords(source_coord, coord) if distance <= RANGE_DISTANCE and distance < min_distance: min_distance = distance closest_source = source # Если найден близкий Source (≤56 км) if closest_source: # Обновляем coords_average через метод модели closest_source.update_coords_average(coord) closest_source.save() return closest_source # Если не найден подходящий Source - создаем новый с типом "Стационарные" from .models import ObjectInfo stationary_info, _ = ObjectInfo.objects.get_or_create(name="Стационарные") source = Source.objects.create( coords_average=Point(coord, srid=4326), info=stationary_info, created_by=user ) return source def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None, is_automatic=False): """ Импортирует данные из DataFrame с группировкой по имени источника и расстоянию. Алгоритм: 1. Для каждой строки DataFrame: a. Извлечь имя источника (из колонки "Объект наблюдения") b. Если is_automatic=False: - Найти подходящий Source: * Ищет все Source с таким же именем и спутником * Проверяет расстояние до каждого Source * Если найден Source в радиусе ≤56 км - использует его * Иначе создает новый Source - Обновить coords_average инкрементально - Создать ObjItem и связать с Source c. Если is_automatic=True: - Создать ObjItem без связи с Source (source=None) - Точка просто хранится в базе Важные правила: - Источники разных спутников НЕ объединяются - Может быть несколько Source с одинаковым именем, но разделенных географически - Точка добавляется к Source только если расстояние ≤56 км - Координаты усредняются инкрементально для каждого источника - Если точка уже существует (по координатам и времени ГЛ), она не добавляется повторно Args: df: DataFrame с данными sat: объект Satellite current_user: текущий пользователь (optional) is_automatic: если True, точки не добавляются к Source (optional, default=False) Returns: dict: словарь с результатами импорта { 'new_sources': количество созданных Source, 'added': количество добавленных точек, 'skipped': количество пропущенных дубликатов, 'errors': список ошибок } """ try: df.rename(columns={"Модуляция ": "Модуляция"}, inplace=True) except Exception as e: print(e) consts = get_all_constants() df.fillna(-1, inplace=True) df.sort_values('Дата') user_to_use = current_user if current_user else CustomUser.objects.get(id=1) new_sources_count = 0 added_count = 0 skipped_count = 0 errors = [] # Словарь для кэширования Source в рамках текущего импорта # Ключ: (имя источника, id Source), Значение: объект Source # Используем id в ключе, т.к. может быть несколько Source с одним именем sources_cache = {} for idx, row in df.iterrows(): try: # Извлекаем координату coord_tuple = coords_transform(row["Координаты"]) # Извлекаем имя источника source_name = row["Объект наблюдения"] # Извлекаем время для проверки дубликатов date = row["Дата"].date() time_ = row["Время"] if isinstance(time_, str): time_ = time_.strip() time_ = time(0, 0, 0) timestamp = datetime.combine(date, time_) # Проверяем дубликаты по координатам и времени if _is_duplicate_by_coords_and_time(coord_tuple, timestamp): skipped_count += 1 continue source = None # Если is_automatic=False, работаем с Source if not is_automatic: found_in_cache = False for cache_key, cached_source in sources_cache.items(): cached_name, cached_id = cache_key # Проверяем имя if cached_name != source_name: continue # Проверяем расстояние if cached_source.coords_average: source_coord = (cached_source.coords_average.x, cached_source.coords_average.y) _, distance = calculate_mean_coords(source_coord, coord_tuple) if distance <= RANGE_DISTANCE: # Нашли подходящий Source в кэше cached_source.update_coords_average(coord_tuple) cached_source.save() source = cached_source found_in_cache = True break if not found_in_cache: # Ищем в БД или создаем новый Source source = _find_or_create_source_by_name_and_distance( source_name, sat, coord_tuple, user_to_use ) # Проверяем, был ли создан новый Source if source.created_at.timestamp() > (datetime.now().timestamp() - 1): new_sources_count += 1 # Добавляем в кэш sources_cache[(source_name, source.id)] = source # Создаем ObjItem (с Source или без, в зависимости от is_automatic) _create_objitem_from_row(row, sat, source, user_to_use, consts, is_automatic) added_count += 1 except Exception as e: error_msg = f"Строка {idx + 2}: {str(e)}" print(f"Ошибка при обработке строки {idx}: {e}") errors.append(error_msg) continue print(f"Импорт завершен: создано {new_sources_count} новых источников, " f"добавлено {added_count} точек, пропущено {skipped_count} дубликатов, " f"ошибок: {len(errors)}") return { 'new_sources': new_sources_count, 'added': added_count, 'skipped': skipped_count, 'errors': errors } def _create_objitem_from_row(row, sat, source, user_to_use, consts, is_automatic=False): """ Вспомогательная функция для создания ObjItem из строки DataFrame. Теперь ищет дополнительные данные (модуляция, стандарт, символьная скорость) в таблице TechAnalyze по имени источника и спутнику, если они не указаны в Excel. Args: row: строка DataFrame sat: объект Satellite source: объект Source для связи (может быть None если is_automatic=True) user_to_use: пользователь для created_by consts: константы из get_all_constants() is_automatic: если True, точка не связывается с Source """ # Извлекаем координату geo_point = Point(coords_transform(row["Координаты"]), srid=4326) # Обработка поляризации try: polarization_obj, _ = Polarization.objects.get_or_create( name=row["Поляризация"].strip() ) except KeyError: polarization_obj, _ = Polarization.objects.get_or_create(name="-") # Обработка ВЧ параметров из Excel freq = remove_str(row["Частота, МГц"]) freq_line = remove_str(row["Полоса, МГц"]) v = remove_str(row["Символьная скорость, БОД"]) try: mod_obj, _ = Modulation.objects.get_or_create(name=row["Модуляция"].strip()) except AttributeError: mod_obj, _ = Modulation.objects.get_or_create(name="-") # Ищем данные в TechAnalyze (если не указаны в Excel или указаны как "-") source_name = row["Объект наблюдения"] tech_data = None # Проверяем, нужно ли искать данные в TechAnalyze # (если модуляция "-" или символьная скорость не указана) if mod_obj.name == "-" or v == -1.0: tech_data = _find_tech_analyze_data(source_name, sat) # Если нашли данные в TechAnalyze, используем их if tech_data: if mod_obj.name == "-": mod_obj = tech_data['modulation'] if v == -1.0: v = tech_data['bod_velocity'] snr = remove_str(row["ОСШ"]) # Обработка стандарта (если есть в Excel или из TechAnalyze) try: standard_name = row.get("Стандарт", "-") if pd.isna(standard_name) or standard_name == "-": # Если стандарт не указан в Excel, пытаемся взять из TechAnalyze if tech_data and tech_data['standard']: standard_obj = tech_data['standard'] else: standard_obj, _ = Standard.objects.get_or_create(name="-") else: standard_obj, _ = Standard.objects.get_or_create(name=standard_name.strip()) except (KeyError, AttributeError): # Если столбца "Стандарт" нет, пытаемся взять из TechAnalyze if tech_data and tech_data['standard']: standard_obj = tech_data['standard'] else: standard_obj, _ = Standard.objects.get_or_create(name="-") # Обработка времени date = row["Дата"].date() time_ = row["Время"] if isinstance(time_, str): time_ = time_.strip() time_ = time(0, 0, 0) timestamp = datetime.combine(date, time_) # Обработка зеркал - теперь это спутники mirror_names = [] mirror_1 = row["Зеркало 1"].strip().split("\n") mirror_2 = row["Зеркало 2"].strip().split("\n") # Собираем все имена зеркал for mir in mirror_1: if mir.strip() and mir.strip() != "-": mirror_names.append(mir.strip()) for mir in mirror_2: if mir.strip() and mir.strip() != "-": mirror_names.append(mir.strip()) # Находим спутники-зеркала mirror_satellites = find_mirror_satellites(mirror_names) location = row["Местоопределение"].strip() comment = row["Комментарий"] source_name = row["Объект наблюдения"] geo, _ = Geo.objects.get_or_create( timestamp=timestamp, coords=geo_point, defaults={ "location": location, "comment": comment, "is_average": (comment != -1.0), }, ) geo.save() # Устанавливаем связи с спутниками-зеркалами if mirror_satellites: geo.mirrors.set(mirror_satellites) # Проверяем, существует ли уже ObjItem с таким же geo existing_obj_item = ObjItem.objects.filter(geo_obj=geo).first() if existing_obj_item: # Проверяем, существует ли parameter с такими же значениями if ( hasattr(existing_obj_item, "parameter_obj") and existing_obj_item.parameter_obj and existing_obj_item.parameter_obj.id_satellite == sat and existing_obj_item.parameter_obj.polarization == polarization_obj and existing_obj_item.parameter_obj.frequency == freq and existing_obj_item.parameter_obj.freq_range == freq_line and existing_obj_item.parameter_obj.bod_velocity == v and existing_obj_item.parameter_obj.modulation == mod_obj and existing_obj_item.parameter_obj.snr == snr ): # Пропускаем создание дубликата return # Находим подходящий транспондер transponder = find_matching_transponder(sat, freq, polarization_obj) # Находим подходящий источник LyngSat (точность 0.1 МГц) lyngsat_source = find_matching_lyngsat(sat, freq, polarization_obj, tolerance_mhz=0.1) # Создаем новый ObjItem и связываем с Source (если не автоматическая), Transponder и LyngSat obj_item = ObjItem.objects.create( name=source_name, source=source if not is_automatic else None, transponder=transponder, lyngsat_source=lyngsat_source, is_automatic=is_automatic, created_by=user_to_use ) # Создаем Parameter (с данными из TechAnalyze если они были найдены) Parameter.objects.create( id_satellite=sat, polarization=polarization_obj, frequency=freq, freq_range=freq_line, bod_velocity=v, modulation=mod_obj, snr=snr, standard=standard_obj, objitem=obj_item, ) # Связываем geo с objitem geo.objitem = obj_item geo.save() # Обновляем дату подтверждения источника (только если не автоматическая) if source and not is_automatic: source.update_confirm_at() source.save() def add_satellite_list(): """ Добавляет список спутников в базу данных (если их еще нет). Примечание: Эта функция устарела. Используйте админ-панель для добавления спутников. """ sats = [ "AZERSPACE 2", "Amos 4", "Astra 4A", "ComsatBW-1", "Eutelsat 16A", "Eutelsat 21B", "Eutelsat 7B", "ExpressAM6", "Hellas Sat 3", "Intelsat 39", "Intelsat 17", "NSS 12", "Sicral 2", "SkyNet 5B", "SkyNet 5D", "Syracuse 4A", "Turksat 3A", "Turksat 4A", "WGS 10", "Yamal 402", ] for sat in sats: sat_obj, _ = Satellite.objects.get_or_create(name=sat) sat_obj.save() def parse_string(s: str): pattern = r"^(.+?) (-?\d+\,\d+) \[(-?\d+\,\d+)\] ([^\s]+) ([A-Za-z]) - (\d{1,2}\.\d{1,2}\.\d{1,4} \d{1,2}:\d{1,2}:\d{1,2})$" match = re.match(pattern, s) if match: return list(match.groups()) else: raise ValueError("Некорректный формат строки") def get_point_from_json(filepath: str): with open(filepath, encoding="utf-8-sig") as jf: data = json.load(jf) for obj in data: if not obj.get("bearingBehavior", {}): if obj["tacticObjectType"] == "source": # if not obj['bearingBehavior']: source_id = obj["id"] name = obj["name"] elements = parse_string(name) sat_name = elements[0] freq = elements[1] freq_range = elements[2] pol = elements[4] timestamp = datetime.strptime(elements[-1], "%d.%m.%y %H:%M:%S") lat = None lon = None for pos in data: if pos["id"] == source_id and pos["tacticObjectType"] == "position": lat = pos["latitude"] lon = pos["longitude"] break print( f"Name - {sat_name}, f - {freq}, f range - {freq_range}, pol - {pol} " f"time - {timestamp}, pos - ({lat}, {lon})" ) def get_points_from_csv(file_content, current_user=None, is_automatic=False): """ Импортирует данные из CSV с группировкой по имени источника и расстоянию. Алгоритм: 1. Для каждой строки CSV: a. Извлечь имя источника (из колонки "obj") и спутник b. Проверить дубликаты (координаты + время ГЛ) c. Если is_automatic=False: - Найти подходящий Source: * Ищет все Source с таким же именем и спутником * Проверяет расстояние до каждого Source * Если найден Source в радиусе ≤56 км - использует его * Иначе создает новый Source - Обновить coords_average инкрементально - Создать ObjItem и связать с Source d. Если is_automatic=True: - Создать ObjItem без связи с Source (source=None) - Точка просто хранится в базе Важные правила: - Источники разных спутников НЕ объединяются - Может быть несколько Source с одинаковым именем, но разделенных географически - Точка добавляется к Source только если расстояние ≤56 км - Координаты усредняются инкрементально для каждого источника - Если точка уже существует (по координатам и времени ГЛ), она не добавляется повторно Args: file_content: содержимое CSV файла current_user: текущий пользователь (optional) is_automatic: если True, точки не добавляются к Source (optional, default=False) Returns: dict: словарь с результатами импорта { 'new_sources': количество созданных Source, 'added': количество добавленных точек, 'skipped': количество пропущенных дубликатов, 'errors': список ошибок } """ # Читаем CSV без предопределенных имен колонок df = pd.read_csv( io.StringIO(file_content), sep=";", header=None ) # Присваиваем имена первым 12 колонкам base_columns = [ "id", "obj", "lat", "lon", "h", "time", "sat", "norad_id", "freq", "f_range", "et", "qual", ] # Все колонки после "qual" (индекс 11) - это зеркала num_columns = len(df.columns) mirror_columns = [f"mir_{i+1}" for i in range(num_columns - len(base_columns))] # Объединяем имена колонок df.columns = base_columns + mirror_columns # Преобразуем типы данных df[["lat", "lon", "freq", "f_range"]] = ( df[["lat", "lon", "freq", "f_range"]] .replace(",", ".", regex=True) .astype(float) ) df["time"] = pd.to_datetime(df["time"], format="%d.%m.%Y %H:%M:%S") df.sort_values('time') user_to_use = current_user if current_user else CustomUser.objects.get(id=1) new_sources_count = 0 added_count = 0 skipped_count = 0 errors = [] # Словарь для кэширования Source в рамках текущего импорта # Ключ: (имя источника, имя спутника, id Source), Значение: объект Source sources_cache = {} for idx, row in df.iterrows(): try: # Извлекаем координату из колонок lat и lon coord_tuple = (row["lon"], row["lat"]) # Извлекаем имя источника и спутника source_name = row["obj"] sat_name = row["sat"] # Извлекаем время для проверки дубликатов timestamp = timezone.make_aware(row["time"]) # Проверяем дубликаты по координатам и времени if _is_duplicate_by_coords_and_time(coord_tuple, timestamp): skipped_count += 1 continue # Получаем объект спутника по NORAD ID try: sat_obj = get_satellite_by_norad(row["norad_id"]) except (SatelliteNotFoundError, ValueError) as e: error_msg = f"Строка {idx + 2}: {str(e)}" print(error_msg) errors.append(error_msg) continue source = None # Если is_automatic=False, работаем с Source if not is_automatic: # Проверяем кэш: ищем подходящий Source среди закэшированных found_in_cache = False for cache_key, cached_source in sources_cache.items(): cached_name, cached_sat, cached_id = cache_key # Проверяем имя и спутник if cached_name != source_name or cached_sat != sat_name: continue # Проверяем расстояние if cached_source.coords_average: source_coord = (cached_source.coords_average.x, cached_source.coords_average.y) _, distance = calculate_mean_coords(source_coord, coord_tuple) if distance <= RANGE_DISTANCE: # Нашли подходящий Source в кэше cached_source.update_coords_average(coord_tuple) cached_source.save() source = cached_source found_in_cache = True break if not found_in_cache: # Ищем в БД или создаем новый Source source = _find_or_create_source_by_name_and_distance( source_name, sat_obj, coord_tuple, user_to_use ) # Проверяем, был ли создан новый Source if source.created_at.timestamp() > (datetime.now().timestamp() - 1): new_sources_count += 1 # Добавляем в кэш sources_cache[(source_name, sat_name, source.id)] = source # Создаем ObjItem (с Source или без, в зависимости от is_automatic) _create_objitem_from_csv_row(row, source, user_to_use, is_automatic, mirror_columns) added_count += 1 except Exception as e: error_msg = f"Строка {idx + 2}: {str(e)}" print(f"Ошибка при обработке строки {idx}: {e}") errors.append(error_msg) continue print(f"Импорт завершен: создано {new_sources_count} новых источников, " f"добавлено {added_count} точек, пропущено {skipped_count} дубликатов, " f"ошибок: {len(errors)}") return { 'new_sources': new_sources_count, 'added': added_count, 'skipped': skipped_count, 'errors': errors } def _is_duplicate_by_coords_and_time(coord_tuple, timestamp, tolerance_km=0.001): """ Проверяет, существует ли уже ObjItem с такими же координатами и временем ГЛ. Args: coord_tuple: кортеж (lon, lat) координат timestamp: время ГЛ (datetime) tolerance_km: допуск для сравнения координат в километрах (default=0.1) Returns: bool: True если дубликат найден, False иначе """ # Ищем Geo с таким же timestamp и близкими координатами existing_geo = Geo.objects.filter( timestamp=timestamp, coords__isnull=False ) for geo in existing_geo: if not geo.coords: continue # Проверяем расстояние между координатами geo_coord = (geo.coords.x, geo.coords.y) _, distance = calculate_mean_coords(coord_tuple, geo_coord) if distance <= tolerance_km: # Найден дубликат return True return False def _is_duplicate_objitem(coord_tuple, frequency, freq_range, tolerance=0.1): """ Проверяет, существует ли уже ObjItem с такими же координатами и частотой. Args: coord_tuple: кортеж (lon, lat) координат frequency: частота в МГц freq_range: полоса частот в МГц tolerance: допуск для сравнения координат в километрах Returns: bool: True если дубликат найден, False иначе """ # Ищем ObjItems с близкими координатами через geo_obj nearby_objitems = ObjItem.objects.filter( geo_obj__coords__isnull=False ).select_related('parameter_obj', 'geo_obj') for objitem in nearby_objitems: if not objitem.geo_obj or not objitem.geo_obj.coords: continue # Проверяем расстояние между координатами geo_coord = (objitem.geo_obj.coords.x, objitem.geo_obj.coords.y) _, distance = calculate_mean_coords(coord_tuple, geo_coord) if distance <= tolerance: # Координаты совпадают, проверяем частоту if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj: param = objitem.parameter_obj # Проверяем совпадение частоты с небольшим допуском (0.1 МГц) if (abs(param.frequency - frequency) < 0.1 and abs(param.freq_range - freq_range) < 0.1): return True return False def _find_tech_analyze_data(name: str, satellite: Satellite): """ Ищет данные технического анализа по имени и спутнику. Args: name: имя источника satellite: объект Satellite Returns: dict или None: словарь с данными {modulation, standard, bod_velocity} или None """ from .models import TechAnalyze try: tech_analyze = TechAnalyze.objects.filter( name=name, satellite=satellite ).select_related('modulation', 'standard').first() if tech_analyze: return { 'modulation': tech_analyze.modulation, 'standard': tech_analyze.standard, 'bod_velocity': tech_analyze.bod_velocity if tech_analyze.bod_velocity else -1.0 } except Exception as e: print(f"Ошибка при поиске TechAnalyze для {name}: {e}") return None def _create_objitem_from_csv_row(row, source, user_to_use, is_automatic=False, mirror_columns=None): """ Вспомогательная функция для создания ObjItem из строки CSV DataFrame. Теперь ищет дополнительные данные (модуляция, стандарт, символьная скорость) в таблице TechAnalyze по имени источника и спутнику. Args: row: строка DataFrame source: объект Source для связи (может быть None если is_automatic=True) user_to_use: пользователь для created_by is_automatic: если True, точка не связывается с Source mirror_columns: список имен колонок с зеркалами (optional) """ # Определяем поляризацию match row["obj"].split(" ")[-1]: case "V": pol = "Вертикальная" case "H": pol = "Горизонтальная" case "R": pol = "Правая" case "L": pol = "Левая" case _: pol = "-" pol_obj, _ = Polarization.objects.get_or_create(name=pol) # Получаем объект спутника по NORAD ID try: sat_obj = get_satellite_by_norad(row["norad_id"]) except (SatelliteNotFoundError, ValueError) as e: raise Exception(f"Не удалось получить спутник: {str(e)}") # Ищем данные в TechAnalyze tech_data = _find_tech_analyze_data(row["obj"], sat_obj) # Обработка зеркал - теперь это спутники mirror_names = [] # Если переданы имена колонок зеркал, используем их if mirror_columns: for mir_col in mirror_columns: if mir_col in row.index: mir_value = row[mir_col] if not pd.isna(mir_value) and str(mir_value).strip() != "-" and str(mir_value).strip() != "": mirror_names.append(str(mir_value).strip()) else: # Fallback на старый способ (для обратной совместимости) for i in range(1, 100): # Проверяем до 100 колонок зеркал mir_col = f"mir_{i}" if mir_col in row.index: mir_value = row[mir_col] if not pd.isna(mir_value) and str(mir_value).strip() != "-" and str(mir_value).strip() != "": mirror_names.append(str(mir_value).strip()) else: break # Находим спутники-зеркала mirror_satellites = find_mirror_satellites(mirror_names) # Создаем Geo объект geo_obj, _ = Geo.objects.get_or_create( timestamp=timezone.make_aware(row["time"]), coords=Point(row["lon"], row["lat"], srid=4326), defaults={ "is_average": False, }, ) # Устанавливаем связи с спутниками-зеркалами if mirror_satellites: geo_obj.mirrors.set(mirror_satellites) # Проверяем, существует ли уже ObjItem с таким же geo existing_obj_item = ObjItem.objects.filter(geo_obj=geo_obj).first() if existing_obj_item: # Проверяем, существует ли parameter с такими же значениями if ( hasattr(existing_obj_item, "parameter_obj") and existing_obj_item.parameter_obj and existing_obj_item.parameter_obj.id_satellite == sat_obj and existing_obj_item.parameter_obj.polarization == pol_obj and existing_obj_item.parameter_obj.frequency == row["freq"] and existing_obj_item.parameter_obj.freq_range == row["f_range"] ): # Пропускаем создание дубликата return # Находим подходящий транспондер transponder = find_matching_transponder(sat_obj, row["freq"], pol_obj) # Находим подходящий источник LyngSat (точность 0.1 МГц) lyngsat_source = find_matching_lyngsat(sat_obj, row["freq"], pol_obj, tolerance_mhz=0.1) # Создаем новый ObjItem и связываем с Source (если не автоматическая), Transponder и LyngSat obj_item = ObjItem.objects.create( name=row["obj"], source=source if not is_automatic else None, transponder=transponder, lyngsat_source=lyngsat_source, is_automatic=is_automatic, created_by=user_to_use ) # Создаем Parameter с данными из TechAnalyze (если найдены) if tech_data: # Используем данные из TechAnalyze Parameter.objects.create( id_satellite=sat_obj, polarization=pol_obj, frequency=row["freq"], freq_range=row["f_range"], bod_velocity=tech_data['bod_velocity'], modulation=tech_data['modulation'], standard=tech_data['standard'], objitem=obj_item, ) else: # Создаем без дополнительных данных (как раньше) Parameter.objects.create( id_satellite=sat_obj, polarization=pol_obj, frequency=row["freq"], freq_range=row["f_range"], objitem=obj_item, ) # Связываем geo с objitem geo_obj.objitem = obj_item geo_obj.save() # Обновляем дату подтверждения источника (только если не автоматическая) if source and not is_automatic: source.update_confirm_at() source.save() def get_vch_load_from_html(file, sat: Satellite) -> None: filename = file.name.split("_") transfer = filename[3] match filename[2]: case "H": pol = "Горизонтальная" case "V": pol = "Вертикальная" case "R": pol = "Правая" case "L": pol = "Левая" case _: pol = "-" tables = pd.read_html(file, encoding="windows-1251") df = tables[0] df = df.drop(0).reset_index(drop=True) df.columns = df.iloc[0] df = df.drop(0).reset_index(drop=True) df.replace("Неизвестно", "-", inplace=True) df[["Частота, МГц", "Полоса, МГц", "Мощность, дБм"]] = df[ ["Частота, МГц", "Полоса, МГц", "Мощность, дБм"] ].apply(pd.to_numeric) df["Время начала измерения"] = df["Время начала измерения"].apply( lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S") ) df["Время окончания измерения"] = df["Время окончания измерения"].apply( lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S") ) for stroka in df.iterrows(): value = stroka[1] if value["Полоса, МГц"] < 0.08: continue if "-" in value["Символьная скорость"]: bod_velocity = -1.0 else: bod_velocity = value["Символьная скорость"] if "-" in value["Сигнал/шум, дБ"]: snr = -1.0 else: snr = value["Сигнал/шум, дБ"] if value["Пакетность"] == "да": pack = True elif value["Пакетность"] == "нет": pack = False else: pack = None polarization, _ = Polarization.objects.get_or_create(name=pol) mod, _ = Modulation.objects.get_or_create(name=value["Модуляция"]) standard, _ = Standard.objects.get_or_create(name=value["Стандарт"]) sigma_load, _ = SigmaParameter.objects.get_or_create( id_satellite=sat, frequency=value["Частота, МГц"], freq_range=value["Полоса, МГц"], polarization=polarization, defaults={ "transfer": float(transfer), # "polarization": polarization, "status": value["Статус"], "power": value["Мощность, дБм"], "bod_velocity": bod_velocity, "modulation": mod, "snr": snr, "packets": pack, "datetime_begin": value["Время начала измерения"], "datetime_end": value["Время окончания измерения"], }, ) sigma_load.save() def get_frequency_tolerance_percent(freq_range_mhz: float) -> float: """ Определяет процент погрешности центральной частоты в зависимости от полосы частот. Args: freq_range_mhz (float): Полоса частот в МГц Returns: float: Процент погрешности для центральной частоты Диапазоны: - 0 - 0.5 МГц (0 - 500 кГц): 0.1% - 0.5 - 1.5 МГц (500 кГц - 1.5 МГц): 0.5% - 1.5 - 5 МГц: 1% - 5 - 10 МГц: 2% - > 10 МГц: 5% """ if freq_range_mhz < 0.5: return 0.005 elif freq_range_mhz < 1.5: return 0.01 elif freq_range_mhz < 5.0: return 0.02 elif freq_range_mhz < 10.0: return 0.05 else: return 0.1 def compare_and_link_vch_load( sat_id: Satellite, eps_freq: float, eps_frange: float, ku_range: float ): """ Привязывает SigmaParameter к Parameter на основе совпадения параметров. Погрешность центральной частоты определяется автоматически в зависимости от полосы частот: - 0-500 кГц: 0.1% - 500 кГц-1.5 МГц: 0.5% - 1.5-5 МГц: 1% - 5-10 МГц: 2% - >10 МГц: 5% Args: sat_id (Satellite): Спутник для фильтрации eps_freq (float): Не используется (оставлен для обратной совместимости) eps_frange (float): Погрешность полосы частот в процентах ku_range (float): Не используется (оставлен для обратной совместимости) Returns: tuple: (количество объектов, количество привязок) """ # Получаем все ObjItem с Parameter для данного спутника item_obj = ObjItem.objects.filter( parameter_obj__id_satellite=sat_id ).select_related("parameter_obj", "parameter_obj__polarization") vch_sigma = SigmaParameter.objects.filter(id_satellite=sat_id).select_related( "polarization" ) link_count = 0 obj_count = item_obj.count() for obj in item_obj: vch_load = obj.parameter_obj # Пропускаем объекты с некорректной частотой if not vch_load or vch_load.frequency == -1.0: continue # Определяем погрешность частоты на основе полосы freq_tolerance_percent = get_frequency_tolerance_percent(vch_load.freq_range) # Вычисляем допустимое отклонение частоты в МГц freq_tolerance_mhz = vch_load.frequency * freq_tolerance_percent / 100 # Вычисляем допустимое отклонение полосы в МГц frange_tolerance_mhz = vch_load.freq_range * eps_frange / 100 for sigma in vch_sigma: # Проверяем совпадение по всем параметрам freq_match = ( abs(sigma.transfer_frequency - vch_load.frequency) <= freq_tolerance_mhz ) frange_match = ( abs(sigma.freq_range - vch_load.freq_range) <= frange_tolerance_mhz ) pol_match = sigma.polarization == vch_load.polarization if freq_match and frange_match and pol_match: sigma.parameter = vch_load sigma.save() link_count += 1 return obj_count, link_count def kub_report(data_in: io.StringIO) -> pd.DataFrame: df_in = pd.read_excel(data_in) df = pd.DataFrame( columns=[ "Дата", "Широта", "Долгота", "Высота", "Населённый пункт", "ИСЗ", "Прямой канал, МГц", "Обратный канал, МГц", "Перенос, МГц", "Полоса, МГц", "Зеркала", ] ) for row in df_in.iterrows(): value = row[1] date = datetime.date(datetime.now()) isz = value["ИСЗ"] try: lat = float(value["Широта, град"].strip().replace(",", ".")) lon = float(value["Долгота, град"].strip().replace(",", ".")) downlink = float(value["Обратный канал, МГц"].strip().replace(",", ".")) freq_range = float(value["Полоса, МГц"].strip().replace(",", ".")) except Exception as e: lat = value["Широта, град"] lon = value["Долгота, град"] downlink = value["Обратный канал, МГц"] freq_range = value["Полоса, МГц"] print(e) norad = int(re.findall(r"\((\d+)\)", isz)[0]) sat_obj = Satellite.objects.get(norad=norad) pol_obj = Polarization.objects.get(name=value["Поляризация"].strip()) transponder = Transponders.objects.filter( sat_id=sat_obj, polarization=pol_obj, downlink__gte=downlink - F("frequency_range") / 2, downlink__lte=downlink + F("frequency_range") / 2, ).first() # try: # location = geolocator.reverse(f"{lat}, {lon}", language="ru").raw['address'] # loc_name = location.get('city', '') or location.get('town', '') or location.get('province', '') or location.get('country', '') # except AttributeError: # loc_name = '' # sleep(1) loc_name = "" if transponder: # and not (len(transponder) > 1): transfer = transponder.transfer uplink = transfer + downlink new_row = pd.DataFrame( [ { "Дата": date, "Широта": lat, "Долгота": lon, "Высота": 0.0, "Населённый пункт": loc_name, "ИСЗ": isz, "Прямой канал, МГц": uplink, "Обратный канал, МГц": downlink, "Перенос, МГц": transfer, "Полоса, МГц": freq_range, "Зеркала": "", } ] ) df = pd.concat([df, new_row], ignore_index=True) else: print("Ничего не найдено в транспондерах") return df # ============================================================================ # Утилиты для работы с координатами # ============================================================================ # Импорт pyproj для работы с проекциями from pyproj import CRS, Transformer def get_gauss_kruger_zone(longitude: float) -> int | None: """ Определяет номер зоны Гаусса-Крюгера по долготе. Зоны ГК (Пулково 1942) имеют EPSG коды 28404-28432 (зоны 4-32). Каждая зона охватывает 6° долготы. Args: longitude: Долгота в градусах (от -180 до 180) Returns: int | None: Номер зоны ГК (4-32) или None если координаты вне зон ГК """ # Нормализуем долготу к диапазону 0-360 lon_normalized = longitude if longitude >= 0 else longitude + 360 # Вычисляем номер зоны (1-60) zone = int((lon_normalized + 6) / 6) # EPSG коды Пулково 1942 существуют только для зон 4-32 if zone < 4 or zone > 32: return None return zone def get_gauss_kruger_epsg(zone: int) -> int: """ Возвращает EPSG код для зоны Гаусса-Крюгера (Pulkovo 1942 / Gauss-Kruger). Args: zone: Номер зоны ГК (4-32) Returns: int: EPSG код проекции """ return 28400 + zone def get_utm_zone(longitude: float) -> int: """ Определяет номер зоны UTM по долготе. UTM зоны нумеруются от 1 до 60, каждая зона охватывает 6° долготы. Args: longitude: Долгота в градусах (от -180 до 180) Returns: int: Номер зоны UTM (1-60) """ zone = int((longitude + 180) / 6) + 1 if zone > 60: zone = 60 if zone < 1: zone = 1 return zone def get_utm_epsg(zone: int, is_northern: bool = True) -> int: """ Возвращает EPSG код для зоны UTM (WGS 84 / UTM). Args: zone: Номер зоны UTM (1-60) is_northern: True для северного полушария, False для южного Returns: int: EPSG код проекции """ if is_northern: return 32600 + zone else: return 32700 + zone def transform_wgs84_to_gk(coord: tuple, zone: int = None) -> tuple: """ Преобразует координаты из WGS84 в проекцию Гаусса-Крюгера. Args: coord: Координаты в формате (longitude, latitude) в WGS84 zone: Номер зоны ГК (если None, определяется автоматически) Returns: tuple: Координаты (x, y) в метрах в проекции ГК """ lon, lat = coord if zone is None: zone = get_gauss_kruger_zone(lon) if zone is None: raise ValueError(f"Координаты ({lon}, {lat}) вне зон Гаусса-Крюгера (4-32)") epsg_gk = get_gauss_kruger_epsg(zone) transformer = Transformer.from_crs( CRS.from_epsg(4326), CRS.from_epsg(epsg_gk), always_xy=True ) x, y = transformer.transform(lon, lat) return (x, y) def transform_gk_to_wgs84(coord: tuple, zone: int) -> tuple: """ Преобразует координаты из проекции Гаусса-Крюгера в WGS84. Args: coord: Координаты (x, y) в метрах в проекции ГК zone: Номер зоны ГК Returns: tuple: Координаты (longitude, latitude) в WGS84 """ x, y = coord epsg_gk = get_gauss_kruger_epsg(zone) transformer = Transformer.from_crs( CRS.from_epsg(epsg_gk), CRS.from_epsg(4326), always_xy=True ) lon, lat = transformer.transform(x, y) return (lon, lat) def transform_wgs84_to_utm(coord: tuple, zone: int = None, is_northern: bool = None) -> tuple: """ Преобразует координаты из WGS84 в проекцию UTM. Args: coord: Координаты в формате (longitude, latitude) в WGS84 zone: Номер зоны UTM (если None, определяется автоматически) is_northern: Северное полушарие (если None, определяется по широте) Returns: tuple: Координаты (x, y) в метрах в проекции UTM """ lon, lat = coord if zone is None: zone = get_utm_zone(lon) if is_northern is None: is_northern = lat >= 0 epsg_utm = get_utm_epsg(zone, is_northern) transformer = Transformer.from_crs( CRS.from_epsg(4326), CRS.from_epsg(epsg_utm), always_xy=True ) x, y = transformer.transform(lon, lat) return (x, y) def transform_utm_to_wgs84(coord: tuple, zone: int, is_northern: bool = True) -> tuple: """ Преобразует координаты из проекции UTM в WGS84. Args: coord: Координаты (x, y) в метрах в проекции UTM zone: Номер зоны UTM is_northern: Северное полушарие Returns: tuple: Координаты (longitude, latitude) в WGS84 """ x, y = coord epsg_utm = get_utm_epsg(zone, is_northern) transformer = Transformer.from_crs( CRS.from_epsg(epsg_utm), CRS.from_epsg(4326), always_xy=True ) lon, lat = transformer.transform(x, y) return (lon, lat) def average_coords_in_gk(coords: list[tuple], zone: int = None) -> tuple[tuple, str]: """ Вычисляет среднее арифметическое координат в проекции. Приоритет: 1. Гаусс-Крюгер (Пулково 1942) для зон 4-32 2. UTM для координат вне зон ГК 3. Геодезическое усреднение как последний fallback Args: coords: Список координат в формате [(lon1, lat1), (lon2, lat2), ...] zone: Номер зоны (если None, определяется по первой точке) Returns: tuple: (координаты (lon, lat), тип_усреднения) тип_усреднения: "ГК" | "UTM" | "Геод" """ if not coords: return (0, 0), "ГК" if len(coords) == 1: return coords[0], "ГК" first_lon, first_lat = coords[0] # Пытаемся использовать Гаусс-Крюгер if zone is None: gk_zone = get_gauss_kruger_zone(first_lon) else: gk_zone = zone if 4 <= zone <= 32 else None # Если координаты в зонах ГК (4-32), используем ГК if gk_zone is not None: try: coords_projected = [transform_wgs84_to_gk(c, gk_zone) for c in coords] avg_x = sum(c[0] for c in coords_projected) / len(coords_projected) avg_y = sum(c[1] for c in coords_projected) / len(coords_projected) return transform_gk_to_wgs84((avg_x, avg_y), gk_zone), "ГК" except Exception: pass # Fallback на UTM # Fallback на UTM для координат вне зон ГК try: utm_zone = get_utm_zone(first_lon) is_northern = first_lat >= 0 coords_utm = [transform_wgs84_to_utm(c, utm_zone, is_northern) for c in coords] avg_x = sum(c[0] for c in coords_utm) / len(coords_utm) avg_y = sum(c[1] for c in coords_utm) / len(coords_utm) return transform_utm_to_wgs84((avg_x, avg_y), utm_zone, is_northern), "UTM" except Exception: # Последний fallback - геодезическое усреднение return _average_coords_geodesic(coords), "Геод" def _average_coords_geodesic(coords: list[tuple]) -> tuple: """ Вычисляет среднее координат через последовательное геодезическое усреднение. Используется как fallback при ошибках проекции. Args: coords: Список координат в формате [(lon1, lat1), (lon2, lat2), ...] Returns: tuple: Средние координаты (longitude, latitude) в WGS84 """ if not coords: return (0, 0) if len(coords) == 1: return coords[0] # Последовательно усредняем точки result = coords[0] for i in range(1, len(coords)): result, _ = calculate_mean_coords(result, coords[i]) return result def calculate_mean_coords(coord1: tuple, coord2: tuple) -> tuple[tuple, float]: """ Вычисляет среднюю точку между двумя координатами с использованием геодезических вычислений (с учётом эллипсоида). :param lat1: Широта первой точки в градусах. :param lon1: Долгота первой точки в градусах. :param lat2: Широта второй точки в градусах. :param lon2: Долгота второй точки в градусах. :return: Словарь с ключами 'lat' и 'lon' для средней точки, и расстояние(dist) в КМ. """ lon1, lat1 = coord1 lon2, lat2 = coord2 geod_inv = Geodesic.WGS84.Inverse(lat1, lon1, lat2, lon2) azimuth1 = geod_inv['azi1'] distance = geod_inv['s12'] geod_direct = Geodesic.WGS84.Direct(lat1, lon1, azimuth1, distance / 2) return (geod_direct['lon2'], geod_direct['lat2']), distance/1000 def calculate_distance_wgs84(coord1: tuple, coord2: tuple) -> float: """ Вычисляет расстояние между двумя точками в WGS84 (в километрах). Args: coord1: Первая точка (longitude, latitude) coord2: Вторая точка (longitude, latitude) Returns: float: Расстояние в километрах """ lon1, lat1 = coord1 lon2, lat2 = coord2 geod_inv = Geodesic.WGS84.Inverse(lat1, lon1, lat2, lon2) return geod_inv['s12'] / 1000 def calculate_average_coords_incremental( current_average: tuple, new_coord: tuple ) -> tuple: """ Вычисляет новое среднее между текущим средним и новой координатой. Использует инкрементальное усреднение: каждая новая точка усредняется с текущим средним, а не со всеми точками кластера. Это упрощенный подход, где новое среднее = (текущее_среднее + новая_координата) / 2. Важно: Это НЕ среднее арифметическое всех точек кластера, а инкрементальное усреднение между двумя точками (текущим средним и новой точкой). Args: current_average (tuple): Текущее среднее в формате (longitude, latitude) new_coord (tuple): Новая координата в формате (longitude, latitude) Returns: tuple: Новое среднее в формате (longitude, latitude) Example: >>> avg1 = (37.62, 55.75) # Первая точка >>> avg2 = calculate_average_coords_incremental(avg1, (37.63, 55.76)) >>> print(avg2) (37.625, 55.755) >>> avg3 = calculate_average_coords_incremental(avg2, (37.64, 55.77)) >>> print(avg3) (37.6325, 55.7625) >>> # Проверка: среднее между одинаковыми точками >>> avg = calculate_average_coords_incremental((37.62, 55.75), (37.62, 55.75)) >>> print(avg) (37.62, 55.75) """ current_lon, current_lat = current_average new_lon, new_lat = new_coord # Инкрементальное усреднение: (current + new) / 2 avg_lon = (current_lon + new_lon) / 2 avg_lat = (current_lat + new_lat) / 2 return (avg_lon, avg_lat) # ============================================================================ # Утилиты для форматирования # ============================================================================ def format_coordinates(longitude: float, latitude: float) -> str: """ Форматирует координаты в читаемый вид. Преобразует числовые координаты в формат с указанием направления (N/S для широты, E/W для долготы). Args: longitude (float): Долгота в десятичных градусах. latitude (float): Широта в десятичных градусах. Returns: str: Отформатированная строка координат в формате "XXN/S YYE/W". Example: >>> format_coordinates(37.62, 55.75) '55.75N 37.62E' >>> format_coordinates(-122.42, 37.77) '37.77N 122.42W' """ lon_direction = "E" if longitude > 0 else "W" lat_direction = "N" if latitude > 0 else "S" lon_value = abs(longitude) lat_value = abs(latitude) return f"{lat_value}{lat_direction} {lon_value}{lon_direction}" def parse_pagination_params( request, default_per_page: int = DEFAULT_ITEMS_PER_PAGE ) -> tuple: """ Извлекает и валидирует параметры пагинации из запроса. Args: request: HTTP запрос Django. default_per_page (int): Количество элементов на странице по умолчанию. Returns: tuple: Кортеж (page_number, items_per_page), где: - page_number (int): Номер текущей страницы (по умолчанию 1). - items_per_page (int): Количество элементов на странице. Example: >>> page, per_page = parse_pagination_params(request, default_per_page=100) >>> paginator = Paginator(objects, per_page) >>> page_obj = paginator.get_page(page) """ page_number = request.GET.get("page", 1) items_per_page = request.GET.get("items_per_page", str(default_per_page)) # Валидация page_number try: page_number = int(page_number) if page_number < 1: page_number = 1 except (ValueError, TypeError): page_number = 1 # Валидация items_per_page try: items_per_page = int(items_per_page) if items_per_page < 1: items_per_page = default_per_page # Ограничиваем максимальное значение для предотвращения перегрузки if items_per_page > MAX_ITEMS_PER_PAGE: items_per_page = MAX_ITEMS_PER_PAGE except (ValueError, TypeError): items_per_page = default_per_page return page_number, items_per_page def get_first_param_subquery(field_name: str): """ Возвращает F() выражение для доступа к полю параметра через OneToOne связь. После рефакторинга связи Parameter-ObjItem с ManyToMany на OneToOne, эта функция упрощена для возврата прямого F() выражения вместо подзапроса. Args: field_name (str): Имя поля модели Parameter для извлечения. Может включать связанные поля через __ (например, 'id_satellite__name'). Returns: F: Django F() объект для использования в annotate(). Example: >>> freq_expr = get_first_param_subquery('frequency') >>> objects = ObjItem.objects.annotate(first_freq=freq_expr) >>> for obj in objects: ... print(obj.first_freq) """ return F(f"parameter_obj__{field_name}") # ============================================================================ # Number Formatting Functions # ============================================================================ def format_coordinate(value): """ Format coordinate value to 4 decimal places. Args: value: Numeric coordinate value Returns: str: Formatted coordinate or '-' if None """ if value is None: return '-' try: return f"{float(value):.4f}" except (ValueError, TypeError): return '-' def format_frequency(value): """ Format frequency value to 3 decimal places. Args: value: Numeric frequency value in MHz Returns: str: Formatted frequency or '-' if None """ if value is None: return '-' try: return f"{float(value):.3f}" except (ValueError, TypeError): return '-' def format_symbol_rate(value): """ Format symbol rate (bod_velocity) to integer. Args: value: Numeric symbol rate value Returns: str: Formatted symbol rate or '-' if None """ if value is None: return '-' try: return f"{float(value):.0f}" except (ValueError, TypeError): return '-' def format_coords_display(point): """ Format geographic point coordinates for display. Args: point: GeoDjango Point object Returns: str: Formatted coordinates as "LAT LON" or '-' if None """ if not point: return '-' try: longitude = point.coords[0] latitude = point.coords[1] lon = f"{abs(longitude):.4f}E" if longitude > 0 else f"{abs(longitude):.4f}W" lat = f"{abs(latitude):.4f}N" if latitude > 0 else f"{abs(latitude):.4f}S" return f"{lat} {lon}" except (AttributeError, IndexError, TypeError): return '-'