# Standard library imports import io import json import re from datetime import datetime, time # Django imports from django.contrib.gis.geos import Point from django.db.models import F # Third-party imports import pandas as pd from geographiclib.geodesic import Geodesic # Local imports from mapsapp.models import Transponders from .models import ( CustomUser, Geo, Mirror, Modulation, ObjItem, Parameter, Polarization, Satellite, SigmaParameter, Source, Standard, ) def find_matching_transponder(satellite, frequency, polarization): """ Находит подходящий транспондер для заданных параметров. Алгоритм: 1. Фильтрует транспондеры по спутнику и поляризации 2. Проверяет, входит ли частота в диапазон транспондера: downlink - frequency_range/2 <= frequency <= downlink + frequency_range/2 3. Возвращает самый свежий транспондер (по created_at) Args: satellite: объект Satellite frequency: частота в МГц polarization: объект Polarization Returns: Transponders или None: найденный транспондер или None """ if not satellite or not polarization or frequency == -1.0: return None # Фильтруем транспондеры по спутнику и поляризации transponders = Transponders.objects.filter( sat_id=satellite, polarization=polarization, downlink__isnull=False, frequency_range__isnull=False ).annotate( # Вычисляем нижнюю и верхнюю границы диапазона lower_bound=F('downlink') - F('frequency_range') / 2, upper_bound=F('downlink') + F('frequency_range') / 2 ).filter( # Проверяем, входит ли частота в диапазон lower_bound__lte=frequency, upper_bound__gte=frequency ).order_by('-created_at') # Сортируем по дате создания (самые свежие первыми) # Возвращаем самый свежий транспондер return transponders.first() def find_matching_lyngsat(satellite, frequency, polarization, tolerance_mhz=0.1): """ Находит подходящий источник LyngSat для заданных параметров. Алгоритм: 1. Фильтрует источники LyngSat по спутнику и поляризации 2. Проверяет, совпадает ли частота с заданной точностью (по умолчанию ±0.1 МГц) 3. Возвращает самый свежий источник (по last_update) Args: satellite: объект Satellite frequency: частота в МГц polarization: объект Polarization tolerance_mhz: допуск по частоте в МГц (по умолчанию 0.1) Returns: LyngSat или None: найденный источник LyngSat или None """ # Импортируем здесь, чтобы избежать циклических импортов from lyngsatapp.models import LyngSat if not satellite or not polarization or frequency == -1.0: return None # Фильтруем источники LyngSat по спутнику и поляризации lyngsat_sources = LyngSat.objects.filter( id_satellite=satellite, polarization=polarization, frequency__isnull=False ).filter( # Проверяем, входит ли частота в допуск frequency__gte=frequency - tolerance_mhz, frequency__lte=frequency + tolerance_mhz ).order_by('-last_update') # Сортируем по дате обновления (самые свежие первыми) # Возвращаем самый свежий источник return lyngsat_sources.first() # ============================================================================ # Константы # ============================================================================ # Значения по умолчанию для пагинации DEFAULT_ITEMS_PER_PAGE = 50 MAX_ITEMS_PER_PAGE = 10000 # Значения по умолчанию для данных DEFAULT_NUMERIC_VALUE = -1.0 MINIMUM_BANDWIDTH_MHZ = 0.08 RANGE_DISTANCE = 56 def get_all_constants(): sats = [sat.name for sat in Satellite.objects.all()] standards = [sat.name for sat in Standard.objects.all()] pols = [sat.name for sat in Polarization.objects.all()] mirrors = [sat.name for sat in Mirror.objects.all()] modulations = [sat.name for sat in Modulation.objects.all()] return sats, standards, pols, mirrors, modulations def find_mirror_satellites(mirror_names: list) -> list: """ Находит спутники, которые соответствуют именам зеркал. Алгоритм: 1. Для каждого имени зеркала: - Обрезать пробелы и привести к нижнему регистру - Найти все спутники, в имени которых содержится это имя 2. Вернуть список найденных спутников Args: mirror_names: список имен зеркал Returns: list: список объектов Satellite """ found_satellites = [] for mirror_name in mirror_names: if not mirror_name or mirror_name == "-": continue # Обрезаем пробелы и приводим к нижнему регистру mirror_name_clean = mirror_name.strip().lower() if not mirror_name_clean: continue # Ищем спутники, в имени которых содержится имя зеркала satellites = Satellite.objects.filter( name__icontains=mirror_name_clean ) found_satellites.extend(satellites) # Убираем дубликаты return list(set(found_satellites)) def coords_transform(coords: str): lat_part, lon_part = coords.strip().split() sign_map = {"N": 1, "E": 1, "S": -1, "W": -1} lat_sign_char = lat_part[-1] lat_value = float(lat_part[:-1].replace(",", ".")) latitude = lat_value * sign_map.get(lat_sign_char, 1) lon_sign_char = lon_part[-1] lon_value = float(lon_part[:-1].replace(",", ".")) longitude = lon_value * sign_map.get(lon_sign_char, 1) return (longitude, latitude) def remove_str(s: str): if isinstance(s, str): if ( s.strip() == "-" or s.strip() == "" or s.strip() == " " or "неизв" in s.strip() ): return -1 return float(s.strip().replace(",", ".")) return s def _find_or_create_source_by_name_and_distance( source_name: str, sat: Satellite, coord: tuple, user ) -> Source: """ Находит или создает Source на основе имени источника, спутника и расстояния. Логика: 1. Ищет все существующие Source с ObjItem, у которых: - Совпадает спутник - Совпадает имя (source_name) 2. Для каждого найденного Source проверяет расстояние до новой координаты 3. Если найден Source в радиусе ≤56 км: - Возвращает его и обновляет coords_average через метод update_coords_average 4. Если не найден подходящий Source: - Создает новый Source с типом "стационарные" Важно: Может существовать несколько Source с одинаковым именем и спутником, но они должны быть географически разделены (>56 км друг от друга). Args: source_name: имя источника (например, "Turksat 3A 10967,397 [9,348] МГц V") sat: объект Satellite coord: координата в формате (lon, lat) user: пользователь для created_by Returns: Source: найденный или созданный объект Source """ # Ищем все существующие ObjItem с таким же именем и спутником existing_objitems = ObjItem.objects.filter( name=source_name, parameter_obj__id_satellite=sat, source__isnull=False, source__coords_average__isnull=False ).select_related('source', 'parameter_obj', 'source__info') # Собираем уникальные Source из найденных ObjItem existing_sources = {} for objitem in existing_objitems: if objitem.source.id not in existing_sources: existing_sources[objitem.source.id] = objitem.source # Проверяем расстояние до каждого существующего Source closest_source = None min_distance = float('inf') for source in existing_sources.values(): if source.coords_average: source_coord = (source.coords_average.x, source.coords_average.y) _, distance = calculate_mean_coords(source_coord, coord) if distance <= RANGE_DISTANCE and distance < min_distance: min_distance = distance closest_source = source # Если найден близкий Source (≤56 км) if closest_source: # Обновляем coords_average через метод модели closest_source.update_coords_average(coord) closest_source.save() return closest_source # Если не найден подходящий Source - создаем новый с типом "Стационарные" from .models import ObjectInfo stationary_info, _ = ObjectInfo.objects.get_or_create(name="Стационарные") source = Source.objects.create( coords_average=Point(coord, srid=4326), info=stationary_info, created_by=user ) return source def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None): """ Импортирует данные из DataFrame с группировкой по имени источника и расстоянию. Алгоритм: 1. Для каждой строки DataFrame: a. Извлечь имя источника (из колонки "Объект наблюдения") b. Найти подходящий Source: - Ищет все Source с таким же именем и спутником - Проверяет расстояние до каждого Source - Если найден Source в радиусе ≤56 км - использует его - Иначе создает новый Source c. Обновить coords_average инкрементально d. Создать ObjItem и связать с Source Важные правила: - Источники разных спутников НЕ объединяются - Может быть несколько Source с одинаковым именем, но разделенных географически - Точка добавляется к Source только если расстояние ≤56 км - Координаты усредняются инкрементально для каждого источника Args: df: DataFrame с данными sat: объект Satellite current_user: текущий пользователь (optional) Returns: int: количество созданных Source """ try: df.rename(columns={"Модуляция ": "Модуляция"}, inplace=True) except Exception as e: print(e) consts = get_all_constants() df.fillna(-1, inplace=True) df.sort_values('Дата') user_to_use = current_user if current_user else CustomUser.objects.get(id=1) new_sources_count = 0 added_count = 0 # Словарь для кэширования Source в рамках текущего импорта # Ключ: (имя источника, id Source), Значение: объект Source # Используем id в ключе, т.к. может быть несколько Source с одним именем sources_cache = {} for idx, row in df.iterrows(): try: # Извлекаем координату coord_tuple = coords_transform(row["Координаты"]) # Извлекаем имя источника source_name = row["Объект наблюдения"] found_in_cache = False for cache_key, cached_source in sources_cache.items(): cached_name, cached_id = cache_key # Проверяем имя if cached_name != source_name: continue # Проверяем расстояние if cached_source.coords_average: source_coord = (cached_source.coords_average.x, cached_source.coords_average.y) _, distance = calculate_mean_coords(source_coord, coord_tuple) if distance <= RANGE_DISTANCE: # Нашли подходящий Source в кэше cached_source.update_coords_average(coord_tuple) cached_source.save() source = cached_source found_in_cache = True break if not found_in_cache: # Ищем в БД или создаем новый Source source = _find_or_create_source_by_name_and_distance( source_name, sat, coord_tuple, user_to_use ) # Проверяем, был ли создан новый Source if source.created_at.timestamp() > (datetime.now().timestamp() - 1): new_sources_count += 1 # Добавляем в кэш sources_cache[(source_name, source.id)] = source # Создаем ObjItem и связываем с Source _create_objitem_from_row(row, sat, source, user_to_use, consts) added_count += 1 except Exception as e: print(f"Ошибка при обработке строки {idx}: {e}") continue print(f"Импорт завершен: создано {new_sources_count} новых источников, " f"добавлено {added_count} точек") return new_sources_count def _create_objitem_from_row(row, sat, source, user_to_use, consts): """ Вспомогательная функция для создания ObjItem из строки DataFrame. Args: row: строка DataFrame sat: объект Satellite source: объект Source для связи user_to_use: пользователь для created_by consts: константы из get_all_constants() """ # Извлекаем координату geo_point = Point(coords_transform(row["Координаты"]), srid=4326) # Обработка поляризации try: polarization_obj, _ = Polarization.objects.get_or_create( name=row["Поляризация"].strip() ) except KeyError: polarization_obj, _ = Polarization.objects.get_or_create(name="-") # Обработка ВЧ параметров freq = remove_str(row["Частота, МГц"]) freq_line = remove_str(row["Полоса, МГц"]) v = remove_str(row["Символьная скорость, БОД"]) try: mod_obj, _ = Modulation.objects.get_or_create(name=row["Модуляция"].strip()) except AttributeError: mod_obj, _ = Modulation.objects.get_or_create(name="-") snr = remove_str(row["ОСШ"]) # Обработка времени date = row["Дата"].date() time_ = row["Время"] if isinstance(time_, str): time_ = time_.strip() time_ = time(0, 0, 0) timestamp = datetime.combine(date, time_) # Обработка зеркал - теперь это спутники mirror_names = [] mirror_1 = row["Зеркало 1"].strip().split("\n") mirror_2 = row["Зеркало 2"].strip().split("\n") # Собираем все имена зеркал for mir in mirror_1: if mir.strip() and mir.strip() != "-": mirror_names.append(mir.strip()) for mir in mirror_2: if mir.strip() and mir.strip() != "-": mirror_names.append(mir.strip()) # Находим спутники-зеркала mirror_satellites = find_mirror_satellites(mirror_names) location = row["Местоопределение"].strip() comment = row["Комментарий"] source_name = row["Объект наблюдения"] geo, _ = Geo.objects.get_or_create( timestamp=timestamp, coords=geo_point, defaults={ "location": location, "comment": comment, "is_average": (comment != -1.0), }, ) geo.save() # Устанавливаем связи с спутниками-зеркалами if mirror_satellites: geo.mirrors.set(mirror_satellites) # Проверяем, существует ли уже ObjItem с таким же geo existing_obj_item = ObjItem.objects.filter(geo_obj=geo).first() if existing_obj_item: # Проверяем, существует ли parameter с такими же значениями if ( hasattr(existing_obj_item, "parameter_obj") and existing_obj_item.parameter_obj and existing_obj_item.parameter_obj.id_satellite == sat and existing_obj_item.parameter_obj.polarization == polarization_obj and existing_obj_item.parameter_obj.frequency == freq and existing_obj_item.parameter_obj.freq_range == freq_line and existing_obj_item.parameter_obj.bod_velocity == v and existing_obj_item.parameter_obj.modulation == mod_obj and existing_obj_item.parameter_obj.snr == snr ): # Пропускаем создание дубликата return # Находим подходящий транспондер transponder = find_matching_transponder(sat, freq, polarization_obj) # Находим подходящий источник LyngSat (точность 0.1 МГц) lyngsat_source = find_matching_lyngsat(sat, freq, polarization_obj, tolerance_mhz=0.1) # Создаем новый ObjItem и связываем с Source, Transponder и LyngSat obj_item = ObjItem.objects.create( name=source_name, source=source, transponder=transponder, lyngsat_source=lyngsat_source, created_by=user_to_use ) # Создаем Parameter Parameter.objects.create( id_satellite=sat, polarization=polarization_obj, frequency=freq, freq_range=freq_line, bod_velocity=v, modulation=mod_obj, snr=snr, objitem=obj_item, ) # Связываем geo с objitem geo.objitem = obj_item geo.save() # Обновляем дату подтверждения источника source.update_confirm_at() source.save() def add_satellite_list(): sats = [ "AZERSPACE 2", "Amos 4", "Astra 4A", "ComsatBW-1", "Eutelsat 16A", "Eutelsat 21B", "Eutelsat 7B", "ExpressAM6", "Hellas Sat 3", "Intelsat 39", "Intelsat 17", "NSS 12", "Sicral 2", "SkyNet 5B", "SkyNet 5D", "Syracuse 4A", "Turksat 3A", "Turksat 4A", "WGS 10", "Yamal 402", ] for sat in sats: sat_obj, _ = Satellite.objects.get_or_create(name=sat) sat_obj.save() def parse_string(s: str): pattern = r"^(.+?) (-?\d+\,\d+) \[(-?\d+\,\d+)\] ([^\s]+) ([A-Za-z]) - (\d{1,2}\.\d{1,2}\.\d{1,4} \d{1,2}:\d{1,2}:\d{1,2})$" match = re.match(pattern, s) if match: return list(match.groups()) else: raise ValueError("Некорректный формат строки") def get_point_from_json(filepath: str): with open(filepath, encoding="utf-8-sig") as jf: data = json.load(jf) for obj in data: if not obj.get("bearingBehavior", {}): if obj["tacticObjectType"] == "source": # if not obj['bearingBehavior']: source_id = obj["id"] name = obj["name"] elements = parse_string(name) sat_name = elements[0] freq = elements[1] freq_range = elements[2] pol = elements[4] timestamp = datetime.strptime(elements[-1], "%d.%m.%y %H:%M:%S") lat = None lon = None for pos in data: if pos["id"] == source_id and pos["tacticObjectType"] == "position": lat = pos["latitude"] lon = pos["longitude"] break print( f"Name - {sat_name}, f - {freq}, f range - {freq_range}, pol - {pol} " f"time - {timestamp}, pos - ({lat}, {lon})" ) def get_points_from_csv(file_content, current_user=None): """ Импортирует данные из CSV с группировкой по имени источника и расстоянию. Алгоритм: 1. Для каждой строки CSV: a. Извлечь имя источника (из колонки "obj") и спутник b. Проверить дубликаты (координаты + частота) c. Найти подходящий Source: - Ищет все Source с таким же именем и спутником - Проверяет расстояние до каждого Source - Если найден Source в радиусе ≤56 км - использует его - Иначе создает новый Source d. Обновить coords_average инкрементально e. Создать ObjItem и связать с Source Важные правила: - Источники разных спутников НЕ объединяются - Может быть несколько Source с одинаковым именем, но разделенных географически - Точка добавляется к Source только если расстояние ≤56 км - Координаты усредняются инкрементально для каждого источника Args: file_content: содержимое CSV файла current_user: текущий пользователь (optional) Returns: int: количество созданных Source """ df = pd.read_csv( io.StringIO(file_content), sep=";", names=[ "id", "obj", "lat", "lon", "h", "time", "sat", "norad_id", "freq", "f_range", "et", "qaul", "mir_1", "mir_2", "mir_3", ], ) df[["lat", "lon", "freq", "f_range"]] = ( df[["lat", "lon", "freq", "f_range"]] .replace(",", ".", regex=True) .astype(float) ) df["time"] = pd.to_datetime(df["time"], format="%d.%m.%Y %H:%M:%S") df.sort_values('time') user_to_use = current_user if current_user else CustomUser.objects.get(id=1) new_sources_count = 0 added_count = 0 skipped_count = 0 # Словарь для кэширования Source в рамках текущего импорта # Ключ: (имя источника, имя спутника, id Source), Значение: объект Source sources_cache = {} for idx, row in df.iterrows(): try: # Извлекаем координату из колонок lat и lon coord_tuple = (row["lon"], row["lat"]) # Извлекаем имя источника и спутника source_name = row["obj"] sat_name = row["sat"] # Проверяем дубликаты if _is_duplicate_objitem(coord_tuple, row["freq"], row["f_range"]): skipped_count += 1 continue # Получаем или создаем объект спутника sat_obj, _ = Satellite.objects.get_or_create( name=sat_name, defaults={"norad": row["norad_id"]} ) # Проверяем кэш: ищем подходящий Source среди закэшированных found_in_cache = False for cache_key, cached_source in sources_cache.items(): cached_name, cached_sat, cached_id = cache_key # Проверяем имя и спутник if cached_name != source_name or cached_sat != sat_name: continue # Проверяем расстояние if cached_source.coords_average: source_coord = (cached_source.coords_average.x, cached_source.coords_average.y) _, distance = calculate_mean_coords(source_coord, coord_tuple) if distance <= RANGE_DISTANCE: # Нашли подходящий Source в кэше cached_source.update_coords_average(coord_tuple) cached_source.save() source = cached_source found_in_cache = True break if not found_in_cache: # Ищем в БД или создаем новый Source source = _find_or_create_source_by_name_and_distance( source_name, sat_obj, coord_tuple, user_to_use ) # Проверяем, был ли создан новый Source if source.created_at.timestamp() > (datetime.now().timestamp() - 1): new_sources_count += 1 # Добавляем в кэш sources_cache[(source_name, sat_name, source.id)] = source # Создаем ObjItem и связываем с Source _create_objitem_from_csv_row(row, source, user_to_use) added_count += 1 except Exception as e: print(f"Ошибка при обработке строки {idx}: {e}") continue print(f"Импорт завершен: создано {new_sources_count} новых источников, " f"добавлено {added_count} точек, пропущено {skipped_count} дубликатов") return new_sources_count def _is_duplicate_objitem(coord_tuple, frequency, freq_range, tolerance=0.1): """ Проверяет, существует ли уже ObjItem с такими же координатами и частотой. Args: coord_tuple: кортеж (lon, lat) координат frequency: частота в МГц freq_range: полоса частот в МГц tolerance: допуск для сравнения координат в километрах Returns: bool: True если дубликат найден, False иначе """ # Ищем ObjItems с близкими координатами через geo_obj nearby_objitems = ObjItem.objects.filter( geo_obj__coords__isnull=False ).select_related('parameter_obj', 'geo_obj') for objitem in nearby_objitems: if not objitem.geo_obj or not objitem.geo_obj.coords: continue # Проверяем расстояние между координатами geo_coord = (objitem.geo_obj.coords.x, objitem.geo_obj.coords.y) _, distance = calculate_mean_coords(coord_tuple, geo_coord) if distance <= tolerance: # Координаты совпадают, проверяем частоту if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj: param = objitem.parameter_obj # Проверяем совпадение частоты с небольшим допуском (0.1 МГц) if (abs(param.frequency - frequency) < 0.1 and abs(param.freq_range - freq_range) < 0.1): return True return False def _create_objitem_from_csv_row(row, source, user_to_use): """ Вспомогательная функция для создания ObjItem из строки CSV DataFrame. Args: row: строка DataFrame source: объект Source для связи user_to_use: пользователь для created_by """ # Определяем поляризацию match row["obj"].split(" ")[-1]: case "V": pol = "Вертикальная" case "H": pol = "Горизонтальная" case "R": pol = "Правая" case "L": pol = "Левая" case _: pol = "-" pol_obj, _ = Polarization.objects.get_or_create(name=pol) sat_obj, _ = Satellite.objects.get_or_create( name=row["sat"], defaults={"norad": row["norad_id"]} ) # Обработка зеркал - теперь это спутники mirror_names = [] if not pd.isna(row["mir_1"]) and row["mir_1"].strip() != "-": mirror_names.append(row["mir_1"]) if not pd.isna(row["mir_2"]) and row["mir_2"].strip() != "-": mirror_names.append(row["mir_2"]) if not pd.isna(row["mir_3"]) and row["mir_3"].strip() != "-": mirror_names.append(row["mir_3"]) # Находим спутники-зеркала mirror_satellites = find_mirror_satellites(mirror_names) # Создаем Geo объект geo_obj, _ = Geo.objects.get_or_create( timestamp=row["time"], coords=Point(row["lon"], row["lat"], srid=4326), defaults={ "is_average": False, }, ) # Устанавливаем связи с спутниками-зеркалами if mirror_satellites: geo_obj.mirrors.set(mirror_satellites) # Проверяем, существует ли уже ObjItem с таким же geo existing_obj_item = ObjItem.objects.filter(geo_obj=geo_obj).first() if existing_obj_item: # Проверяем, существует ли parameter с такими же значениями if ( hasattr(existing_obj_item, "parameter_obj") and existing_obj_item.parameter_obj and existing_obj_item.parameter_obj.id_satellite == sat_obj and existing_obj_item.parameter_obj.polarization == pol_obj and existing_obj_item.parameter_obj.frequency == row["freq"] and existing_obj_item.parameter_obj.freq_range == row["f_range"] ): # Пропускаем создание дубликата return # Находим подходящий транспондер transponder = find_matching_transponder(sat_obj, row["freq"], pol_obj) # Находим подходящий источник LyngSat (точность 0.1 МГц) lyngsat_source = find_matching_lyngsat(sat_obj, row["freq"], pol_obj, tolerance_mhz=0.1) # Создаем новый ObjItem и связываем с Source, Transponder и LyngSat obj_item = ObjItem.objects.create( name=row["obj"], source=source, transponder=transponder, lyngsat_source=lyngsat_source, created_by=user_to_use ) # Создаем Parameter Parameter.objects.create( id_satellite=sat_obj, polarization=pol_obj, frequency=row["freq"], freq_range=row["f_range"], objitem=obj_item, ) # Связываем geo с objitem geo_obj.objitem = obj_item geo_obj.save() # Обновляем дату подтверждения источника source.update_confirm_at() source.save() def get_vch_load_from_html(file, sat: Satellite) -> None: filename = file.name.split("_") transfer = filename[3] match filename[2]: case "H": pol = "Горизонтальная" case "V": pol = "Вертикальная" case "R": pol = "Правая" case "L": pol = "Левая" case _: pol = "-" tables = pd.read_html(file, encoding="windows-1251") df = tables[0] df = df.drop(0).reset_index(drop=True) df.columns = df.iloc[0] df = df.drop(0).reset_index(drop=True) df.replace("Неизвестно", "-", inplace=True) df[["Частота, МГц", "Полоса, МГц", "Мощность, дБм"]] = df[ ["Частота, МГц", "Полоса, МГц", "Мощность, дБм"] ].apply(pd.to_numeric) df["Время начала измерения"] = df["Время начала измерения"].apply( lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S") ) df["Время окончания измерения"] = df["Время окончания измерения"].apply( lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S") ) for stroka in df.iterrows(): value = stroka[1] if value["Полоса, МГц"] < 0.08: continue if "-" in value["Символьная скорость"]: bod_velocity = -1.0 else: bod_velocity = value["Символьная скорость"] if "-" in value["Сигнал/шум, дБ"]: snr = -1.0 else: snr = value["Сигнал/шум, дБ"] if value["Пакетность"] == "да": pack = True elif value["Пакетность"] == "нет": pack = False else: pack = None polarization, _ = Polarization.objects.get_or_create(name=pol) mod, _ = Modulation.objects.get_or_create(name=value["Модуляция"]) standard, _ = Standard.objects.get_or_create(name=value["Стандарт"]) sigma_load, _ = SigmaParameter.objects.get_or_create( id_satellite=sat, frequency=value["Частота, МГц"], freq_range=value["Полоса, МГц"], polarization=polarization, defaults={ "transfer": float(transfer), # "polarization": polarization, "status": value["Статус"], "power": value["Мощность, дБм"], "bod_velocity": bod_velocity, "modulation": mod, "snr": snr, "packets": pack, "datetime_begin": value["Время начала измерения"], "datetime_end": value["Время окончания измерения"], }, ) sigma_load.save() def get_frequency_tolerance_percent(freq_range_mhz: float) -> float: """ Определяет процент погрешности центральной частоты в зависимости от полосы частот. Args: freq_range_mhz (float): Полоса частот в МГц Returns: float: Процент погрешности для центральной частоты Диапазоны: - 0 - 0.5 МГц (0 - 500 кГц): 0.1% - 0.5 - 1.5 МГц (500 кГц - 1.5 МГц): 0.5% - 1.5 - 5 МГц: 1% - 5 - 10 МГц: 2% - > 10 МГц: 5% """ if freq_range_mhz < 0.5: return 0.005 elif freq_range_mhz < 1.5: return 0.01 elif freq_range_mhz < 5.0: return 0.02 elif freq_range_mhz < 10.0: return 0.05 else: return 0.1 def compare_and_link_vch_load( sat_id: Satellite, eps_freq: float, eps_frange: float, ku_range: float ): """ Привязывает SigmaParameter к Parameter на основе совпадения параметров. Погрешность центральной частоты определяется автоматически в зависимости от полосы частот: - 0-500 кГц: 0.1% - 500 кГц-1.5 МГц: 0.5% - 1.5-5 МГц: 1% - 5-10 МГц: 2% - >10 МГц: 5% Args: sat_id (Satellite): Спутник для фильтрации eps_freq (float): Не используется (оставлен для обратной совместимости) eps_frange (float): Погрешность полосы частот в процентах ku_range (float): Не используется (оставлен для обратной совместимости) Returns: tuple: (количество объектов, количество привязок) """ # Получаем все ObjItem с Parameter для данного спутника item_obj = ObjItem.objects.filter( parameter_obj__id_satellite=sat_id ).select_related("parameter_obj", "parameter_obj__polarization") vch_sigma = SigmaParameter.objects.filter(id_satellite=sat_id).select_related( "polarization" ) link_count = 0 obj_count = item_obj.count() for obj in item_obj: vch_load = obj.parameter_obj # Пропускаем объекты с некорректной частотой if not vch_load or vch_load.frequency == -1.0: continue # Определяем погрешность частоты на основе полосы freq_tolerance_percent = get_frequency_tolerance_percent(vch_load.freq_range) # Вычисляем допустимое отклонение частоты в МГц freq_tolerance_mhz = vch_load.frequency * freq_tolerance_percent / 100 # Вычисляем допустимое отклонение полосы в МГц frange_tolerance_mhz = vch_load.freq_range * eps_frange / 100 for sigma in vch_sigma: # Проверяем совпадение по всем параметрам freq_match = ( abs(sigma.transfer_frequency - vch_load.frequency) <= freq_tolerance_mhz ) frange_match = ( abs(sigma.freq_range - vch_load.freq_range) <= frange_tolerance_mhz ) pol_match = sigma.polarization == vch_load.polarization if freq_match and frange_match and pol_match: sigma.parameter = vch_load sigma.save() link_count += 1 return obj_count, link_count def kub_report(data_in: io.StringIO) -> pd.DataFrame: df_in = pd.read_excel(data_in) df = pd.DataFrame( columns=[ "Дата", "Широта", "Долгота", "Высота", "Населённый пункт", "ИСЗ", "Прямой канал, МГц", "Обратный канал, МГц", "Перенос, МГц", "Полоса, МГц", "Зеркала", ] ) for row in df_in.iterrows(): value = row[1] date = datetime.date(datetime.now()) isz = value["ИСЗ"] try: lat = float(value["Широта, град"].strip().replace(",", ".")) lon = float(value["Долгота, град"].strip().replace(",", ".")) downlink = float(value["Обратный канал, МГц"].strip().replace(",", ".")) freq_range = float(value["Полоса, МГц"].strip().replace(",", ".")) except Exception as e: lat = value["Широта, град"] lon = value["Долгота, град"] downlink = value["Обратный канал, МГц"] freq_range = value["Полоса, МГц"] print(e) norad = int(re.findall(r"\((\d+)\)", isz)[0]) sat_obj = Satellite.objects.get(norad=norad) pol_obj = Polarization.objects.get(name=value["Поляризация"].strip()) transponder = Transponders.objects.filter( sat_id=sat_obj, polarization=pol_obj, downlink__gte=downlink - F("frequency_range") / 2, downlink__lte=downlink + F("frequency_range") / 2, ).first() # try: # location = geolocator.reverse(f"{lat}, {lon}", language="ru").raw['address'] # loc_name = location.get('city', '') or location.get('town', '') or location.get('province', '') or location.get('country', '') # except AttributeError: # loc_name = '' # sleep(1) loc_name = "" if transponder: # and not (len(transponder) > 1): transfer = transponder.transfer uplink = transfer + downlink new_row = pd.DataFrame( [ { "Дата": date, "Широта": lat, "Долгота": lon, "Высота": 0.0, "Населённый пункт": loc_name, "ИСЗ": isz, "Прямой канал, МГц": uplink, "Обратный канал, МГц": downlink, "Перенос, МГц": transfer, "Полоса, МГц": freq_range, "Зеркала": "", } ] ) df = pd.concat([df, new_row], ignore_index=True) else: print("Ничего не найдено в транспондерах") return df # ============================================================================ # Утилиты для работы с координатами # ============================================================================ def calculate_mean_coords(coord1: tuple, coord2: tuple) -> tuple[tuple, float]: """ Вычисляет среднюю точку между двумя координатами с использованием геодезических вычислений (с учётом эллипсоида). :param lat1: Широта первой точки в градусах. :param lon1: Долгота первой точки в градусах. :param lat2: Широта второй точки в градусах. :param lon2: Долгота второй точки в градусах. :return: Словарь с ключами 'lat' и 'lon' для средней точки, и расстояние(dist) в КМ. """ lon1, lat1 = coord1 lon2, lat2 = coord2 geod_inv = Geodesic.WGS84.Inverse(lat1, lon1, lat2, lon2) azimuth1 = geod_inv['azi1'] distance = geod_inv['s12'] geod_direct = Geodesic.WGS84.Direct(lat1, lon1, azimuth1, distance / 2) return (geod_direct['lon2'], geod_direct['lat2']), distance/1000 def calculate_average_coords_incremental( current_average: tuple, new_coord: tuple ) -> tuple: """ Вычисляет новое среднее между текущим средним и новой координатой. Использует инкрементальное усреднение: каждая новая точка усредняется с текущим средним, а не со всеми точками кластера. Это упрощенный подход, где новое среднее = (текущее_среднее + новая_координата) / 2. Важно: Это НЕ среднее арифметическое всех точек кластера, а инкрементальное усреднение между двумя точками (текущим средним и новой точкой). Args: current_average (tuple): Текущее среднее в формате (longitude, latitude) new_coord (tuple): Новая координата в формате (longitude, latitude) Returns: tuple: Новое среднее в формате (longitude, latitude) Example: >>> avg1 = (37.62, 55.75) # Первая точка >>> avg2 = calculate_average_coords_incremental(avg1, (37.63, 55.76)) >>> print(avg2) (37.625, 55.755) >>> avg3 = calculate_average_coords_incremental(avg2, (37.64, 55.77)) >>> print(avg3) (37.6325, 55.7625) >>> # Проверка: среднее между одинаковыми точками >>> avg = calculate_average_coords_incremental((37.62, 55.75), (37.62, 55.75)) >>> print(avg) (37.62, 55.75) """ current_lon, current_lat = current_average new_lon, new_lat = new_coord # Инкрементальное усреднение: (current + new) / 2 avg_lon = (current_lon + new_lon) / 2 avg_lat = (current_lat + new_lat) / 2 return (avg_lon, avg_lat) # ============================================================================ # Утилиты для форматирования # ============================================================================ def format_coordinates(longitude: float, latitude: float) -> str: """ Форматирует координаты в читаемый вид. Преобразует числовые координаты в формат с указанием направления (N/S для широты, E/W для долготы). Args: longitude (float): Долгота в десятичных градусах. latitude (float): Широта в десятичных градусах. Returns: str: Отформатированная строка координат в формате "XXN/S YYE/W". Example: >>> format_coordinates(37.62, 55.75) '55.75N 37.62E' >>> format_coordinates(-122.42, 37.77) '37.77N 122.42W' """ lon_direction = "E" if longitude > 0 else "W" lat_direction = "N" if latitude > 0 else "S" lon_value = abs(longitude) lat_value = abs(latitude) return f"{lat_value}{lat_direction} {lon_value}{lon_direction}" def parse_pagination_params( request, default_per_page: int = DEFAULT_ITEMS_PER_PAGE ) -> tuple: """ Извлекает и валидирует параметры пагинации из запроса. Args: request: HTTP запрос Django. default_per_page (int): Количество элементов на странице по умолчанию. Returns: tuple: Кортеж (page_number, items_per_page), где: - page_number (int): Номер текущей страницы (по умолчанию 1). - items_per_page (int): Количество элементов на странице. Example: >>> page, per_page = parse_pagination_params(request, default_per_page=100) >>> paginator = Paginator(objects, per_page) >>> page_obj = paginator.get_page(page) """ page_number = request.GET.get("page", 1) items_per_page = request.GET.get("items_per_page", str(default_per_page)) # Валидация page_number try: page_number = int(page_number) if page_number < 1: page_number = 1 except (ValueError, TypeError): page_number = 1 # Валидация items_per_page try: items_per_page = int(items_per_page) if items_per_page < 1: items_per_page = default_per_page # Ограничиваем максимальное значение для предотвращения перегрузки if items_per_page > MAX_ITEMS_PER_PAGE: items_per_page = MAX_ITEMS_PER_PAGE except (ValueError, TypeError): items_per_page = default_per_page return page_number, items_per_page def get_first_param_subquery(field_name: str): """ Возвращает F() выражение для доступа к полю параметра через OneToOne связь. После рефакторинга связи Parameter-ObjItem с ManyToMany на OneToOne, эта функция упрощена для возврата прямого F() выражения вместо подзапроса. Args: field_name (str): Имя поля модели Parameter для извлечения. Может включать связанные поля через __ (например, 'id_satellite__name'). Returns: F: Django F() объект для использования в annotate(). Example: >>> freq_expr = get_first_param_subquery('frequency') >>> objects = ObjItem.objects.annotate(first_freq=freq_expr) >>> for obj in objects: ... print(obj.first_freq) """ return F(f"parameter_obj__{field_name}") # ============================================================================ # Number Formatting Functions # ============================================================================ def format_coordinate(value): """ Format coordinate value to 4 decimal places. Args: value: Numeric coordinate value Returns: str: Formatted coordinate or '-' if None """ if value is None: return '-' try: return f"{float(value):.4f}" except (ValueError, TypeError): return '-' def format_frequency(value): """ Format frequency value to 3 decimal places. Args: value: Numeric frequency value in MHz Returns: str: Formatted frequency or '-' if None """ if value is None: return '-' try: return f"{float(value):.3f}" except (ValueError, TypeError): return '-' def format_symbol_rate(value): """ Format symbol rate (bod_velocity) to integer. Args: value: Numeric symbol rate value Returns: str: Formatted symbol rate or '-' if None """ if value is None: return '-' try: return f"{float(value):.0f}" except (ValueError, TypeError): return '-' def format_coords_display(point): """ Format geographic point coordinates for display. Args: point: GeoDjango Point object Returns: str: Formatted coordinates as "LAT LON" or '-' if None """ if not point: return '-' try: longitude = point.coords[0] latitude = point.coords[1] lon = f"{abs(longitude):.4f}E" if longitude > 0 else f"{abs(longitude):.4f}W" lat = f"{abs(latitude):.4f}N" if latitude > 0 else f"{abs(latitude):.4f}S" return f"{lat} {lon}" except (AttributeError, IndexError, TypeError): return '-'