# Standard library imports import io import json import re from datetime import datetime, time # Django imports from django.contrib.gis.geos import Point from django.db.models import F # Third-party imports import pandas as pd # Local imports from mapsapp.models import Transponders from .models import ( CustomUser, Geo, Mirror, Modulation, ObjItem, Parameter, Polarization, Satellite, SigmaParameter, Standard, ) # ============================================================================ # Константы # ============================================================================ # Значения по умолчанию для пагинации DEFAULT_ITEMS_PER_PAGE = 50 MAX_ITEMS_PER_PAGE = 10000 # Значения по умолчанию для данных DEFAULT_NUMERIC_VALUE = -1.0 MINIMUM_BANDWIDTH_MHZ = 0.08 def get_all_constants(): sats = [sat.name for sat in Satellite.objects.all()] standards = [sat.name for sat in Standard.objects.all()] pols = [sat.name for sat in Polarization.objects.all()] mirrors = [sat.name for sat in Mirror.objects.all()] modulations = [sat.name for sat in Modulation.objects.all()] return sats, standards, pols, mirrors, modulations def coords_transform(coords: str): lat_part, lon_part = coords.strip().split() sign_map = {"N": 1, "E": 1, "S": -1, "W": -1} lat_sign_char = lat_part[-1] lat_value = float(lat_part[:-1].replace(",", ".")) latitude = lat_value * sign_map.get(lat_sign_char, 1) lon_sign_char = lon_part[-1] lon_value = float(lon_part[:-1].replace(",", ".")) longitude = lon_value * sign_map.get(lon_sign_char, 1) return (longitude, latitude) def remove_str(s: str): if isinstance(s, str): if ( s.strip() == "-" or s.strip() == "" or s.strip() == " " or "неизв" in s.strip() ): return -1 return float(s.strip().replace(",", ".")) return s def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None): try: df.rename(columns={"Модуляция ": "Модуляция"}, inplace=True) except Exception as e: print(e) consts = get_all_constants() df.fillna(-1, inplace=True) for stroka in df.iterrows(): geo_point = Point(coords_transform(stroka[1]["Координаты"]), srid=4326) valid_point = None kupsat_point = None try: if ( stroka[1]["Координаты объекта"] != -1 and stroka[1]["Координаты Кубсата"] != "+" ): if ( "ИРИ" not in stroka[1]["Координаты объекта"] and "БЛА" not in stroka[1]["Координаты объекта"] ): valid_point = list( map( float, stroka[1]["Координаты объекта"] .replace(",", ".") .split(". "), ) ) valid_point = Point(valid_point[1], valid_point[0], srid=4326) if ( stroka[1]["Координаты Кубсата"] != -1 and stroka[1]["Координаты Кубсата"] != "+" ): kupsat_point = list( map( float, stroka[1]["Координаты Кубсата"].replace(",", ".").split(". "), ) ) kupsat_point = Point(kupsat_point[1], kupsat_point[0], srid=4326) except KeyError: print("В таблице нет столбцов с координатами кубсата") try: polarization_obj, _ = Polarization.objects.get_or_create( name=stroka[1]["Поляризация"].strip() ) except KeyError: polarization_obj, _ = Polarization.objects.get_or_create(name="-") freq = remove_str(stroka[1]["Частота, МГц"]) freq_line = remove_str(stroka[1]["Полоса, МГц"]) v = remove_str(stroka[1]["Символьная скорость, БОД"]) try: mod_obj, _ = Modulation.objects.get_or_create( name=stroka[1]["Модуляция"].strip() ) except AttributeError: mod_obj, _ = Modulation.objects.get_or_create(name="-") snr = remove_str(stroka[1]["ОСШ"]) date = stroka[1]["Дата"].date() time_ = stroka[1]["Время"] if isinstance(time_, str): time_ = time_.strip() time_ = time(0, 0, 0) timestamp = datetime.combine(date, time_) current_mirrors = [] mirror_1 = stroka[1]["Зеркало 1"].strip().split("\n") mirror_2 = stroka[1]["Зеркало 2"].strip().split("\n") if len(mirror_1) > 1: for mir in mirror_1: mir_obj, _ = Mirror.objects.get_or_create(name=mir.strip()) current_mirrors.append(mir.strip()) elif mirror_1[0] not in consts[3]: mir_obj, _ = Mirror.objects.get_or_create(name=mirror_1[0].strip()) current_mirrors.append(mirror_1[0].strip()) if len(mirror_2) > 1: for mir in mirror_2: mir_obj, _ = Mirror.objects.get_or_create(name=mir.strip()) current_mirrors.append(mir.strip()) elif mirror_2[0] not in consts[3]: mir_obj, _ = Mirror.objects.get_or_create(name=mirror_2[0].strip()) current_mirrors.append(mirror_2[0].strip()) location = stroka[1]["Местоопределение"].strip() comment = stroka[1]["Комментарий"] source = stroka[1]["Объект наблюдения"] user_to_use = current_user if current_user else CustomUser.objects.get(id=1) vch_load_obj, _ = Parameter.objects.get_or_create( id_satellite=sat, polarization=polarization_obj, frequency=freq, freq_range=freq_line, bod_velocity=v, modulation=mod_obj, snr=snr, ) geo, _ = Geo.objects.get_or_create( timestamp=timestamp, coords=geo_point, defaults={ "coords_kupsat": kupsat_point, "coords_valid": valid_point, "location": location, "comment": comment, "is_average": (comment != -1.0), }, ) geo.save() geo.mirrors.set(Mirror.objects.filter(name__in=current_mirrors)) existing_obj_items = ObjItem.objects.filter( parameters_obj=vch_load_obj, geo_obj=geo ) if not existing_obj_items.exists(): obj_item = ObjItem.objects.create(name=source, created_by=user_to_use) obj_item.parameters_obj.set([vch_load_obj]) geo.objitem = obj_item geo.save() def add_satellite_list(): sats = [ "AZERSPACE 2", "Amos 4", "Astra 4A", "ComsatBW-1", "Eutelsat 16A", "Eutelsat 21B", "Eutelsat 7B", "ExpressAM6", "Hellas Sat 3", "Intelsat 39", "Intelsat 17", "NSS 12", "Sicral 2", "SkyNet 5B", "SkyNet 5D", "Syracuse 4A", "Turksat 3A", "Turksat 4A", "WGS 10", "Yamal 402", ] for sat in sats: sat_obj, _ = Satellite.objects.get_or_create(name=sat) sat_obj.save() def parse_string(s: str): pattern = r"^(.+?) (-?\d+\,\d+) \[(-?\d+\,\d+)\] ([^\s]+) ([A-Za-z]) - (\d{1,2}\.\d{1,2}\.\d{1,4} \d{1,2}:\d{1,2}:\d{1,2})$" match = re.match(pattern, s) if match: return list(match.groups()) else: raise ValueError("Некорректный формат строки") def get_point_from_json(filepath: str): with open(filepath, encoding="utf-8-sig") as jf: data = json.load(jf) for obj in data: if not obj.get("bearingBehavior", {}): if obj["tacticObjectType"] == "source": # if not obj['bearingBehavior']: source_id = obj["id"] name = obj["name"] elements = parse_string(name) sat_name = elements[0] freq = elements[1] freq_range = elements[2] pol = elements[4] timestamp = datetime.strptime(elements[-1], "%d.%m.%y %H:%M:%S") lat = None lon = None for pos in data: if pos["id"] == source_id and pos["tacticObjectType"] == "position": lat = pos["latitude"] lon = pos["longitude"] break print( f"Name - {sat_name}, f - {freq}, f range - {freq_range}, pol - {pol} " f"time - {timestamp}, pos - ({lat}, {lon})" ) def get_points_from_csv(file_content, current_user=None): df = pd.read_csv( io.StringIO(file_content), sep=";", names=[ "id", "obj", "lat", "lon", "h", "time", "sat", "norad_id", "freq", "f_range", "et", "qaul", "mir_1", "mir_2", "mir_3", ], ) df[["lat", "lon", "freq", "f_range"]] = ( df[["lat", "lon", "freq", "f_range"]] .replace(",", ".", regex=True) .astype(float) ) df["time"] = pd.to_datetime(df["time"], format="%d.%m.%Y %H:%M:%S") for row in df.iterrows(): row = row[1] match row["obj"].split(" ")[-1]: case "V": pol = "Вертикальная" case "H": pol = "Горизонтальная" case "R": pol = "Правая" case "L": pol = "Левая" case _: pol = "-" pol_obj, _ = Polarization.objects.get_or_create(name=pol) sat_obj, _ = Satellite.objects.get_or_create( name=row["sat"], defaults={"norad": row["norad_id"]} ) mir_1_obj, _ = Mirror.objects.get_or_create(name=row["mir_1"]) mir_2_obj, _ = Mirror.objects.get_or_create(name=row["mir_2"]) mir_lst = [row["mir_1"], row["mir_2"]] if not pd.isna(row["mir_3"]): mir_3_obj, _ = Mirror.objects.get_or_create(name=row["mir_3"]) user_to_use = current_user if current_user else CustomUser.objects.get(id=1) vch_load_obj, _ = Parameter.objects.get_or_create( id_satellite=sat_obj, polarization=pol_obj, frequency=row["freq"], freq_range=row["f_range"], # defaults={'id_user_add': user_to_use} ) geo_obj, _ = Geo.objects.get_or_create( timestamp=row["time"], coords=Point(row["lon"], row["lat"], srid=4326), defaults={ "is_average": False, # 'id_user_add': user_to_use, }, ) geo_obj.mirrors.set(Mirror.objects.filter(name__in=mir_lst)) existing_obj_items = ObjItem.objects.filter( parameters_obj=vch_load_obj, geo_obj=geo_obj ) if not existing_obj_items.exists(): obj_item = ObjItem.objects.create(name=row["obj"], created_by=user_to_use) obj_item.parameters_obj.set([vch_load_obj]) geo_obj.objitem = obj_item geo_obj.save() def get_vch_load_from_html(file, sat: Satellite) -> None: filename = file.name.split("_") transfer = filename[3] match filename[2]: case "H": pol = "Горизонтальная" case "V": pol = "Вертикальная" case "R": pol = "Правая" case "L": pol = "Левая" case _: pol = "-" tables = pd.read_html(file, encoding="windows-1251") df = tables[0] df = df.drop(0).reset_index(drop=True) df.columns = df.iloc[0] df = df.drop(0).reset_index(drop=True) df.replace("Неизвестно", "-", inplace=True) df[["Частота, МГц", "Полоса, МГц", "Мощность, дБм"]] = df[ ["Частота, МГц", "Полоса, МГц", "Мощность, дБм"] ].apply(pd.to_numeric) df["Время начала измерения"] = df["Время начала измерения"].apply( lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S") ) df["Время окончания измерения"] = df["Время окончания измерения"].apply( lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S") ) for stroka in df.iterrows(): value = stroka[1] if value["Полоса, МГц"] < 0.08: continue if "-" in value["Символьная скорость"]: bod_velocity = -1.0 else: bod_velocity = value["Символьная скорость"] if "-" in value["Сигнал/шум, дБ"]: snr = -1.0 else: snr = value["Сигнал/шум, дБ"] if value["Пакетность"] == "да": pack = True elif value["Пакетность"] == "нет": pack = False else: pack = None polarization, _ = Polarization.objects.get_or_create(name=pol) mod, _ = Modulation.objects.get_or_create(name=value["Модуляция"]) standard, _ = Standard.objects.get_or_create(name=value["Стандарт"]) sigma_load, _ = SigmaParameter.objects.get_or_create( id_satellite=sat, frequency=value["Частота, МГц"], freq_range=value["Полоса, МГц"], polarization=polarization, defaults={ "transfer": float(transfer), # "polarization": polarization, "status": value["Статус"], "power": value["Мощность, дБм"], "bod_velocity": bod_velocity, "modulation": mod, "snr": snr, "packets": pack, "datetime_begin": value["Время начала измерения"], "datetime_end": value["Время окончания измерения"], }, ) sigma_load.save() def compare_and_link_vch_load( sat_id: Satellite, eps_freq: float, eps_frange: float, ku_range: float ): item_obj = ObjItem.objects.filter(parameters_obj__id_satellite=sat_id) vch_sigma = SigmaParameter.objects.filter(id_satellite=sat_id) link_count = 0 obj_count = len(item_obj) for idx, obj in enumerate(item_obj): vch_load = obj.parameters_obj.get() if vch_load.frequency == -1.0: continue for sigma in vch_sigma: if ( abs(sigma.transfer_frequency - vch_load.frequency) <= eps_freq and abs(sigma.freq_range - vch_load.freq_range) <= vch_load.freq_range * eps_frange / 100 and sigma.polarization == vch_load.polarization ): sigma.parameter = vch_load sigma.save() link_count += 1 return obj_count, link_count def kub_report(data_in: io.StringIO) -> pd.DataFrame: df_in = pd.read_excel(data_in) df = pd.DataFrame( columns=[ "Дата", "Широта", "Долгота", "Высота", "Населённый пункт", "ИСЗ", "Прямой канал, МГц", "Обратный канал, МГц", "Перенос, МГц", "Полоса, МГц", "Зеркала", ] ) for row in df_in.iterrows(): value = row[1] date = datetime.date(datetime.now()) isz = value["ИСЗ"] try: lat = float(value["Широта, град"].strip().replace(",", ".")) lon = float(value["Долгота, град"].strip().replace(",", ".")) downlink = float(value["Обратный канал, МГц"].strip().replace(",", ".")) freq_range = float(value["Полоса, МГц"].strip().replace(",", ".")) except Exception as e: lat = value["Широта, град"] lon = value["Долгота, град"] downlink = value["Обратный канал, МГц"] freq_range = value["Полоса, МГц"] print(e) norad = int(re.findall(r"\((\d+)\)", isz)[0]) sat_obj = Satellite.objects.get(norad=norad) pol_obj = Polarization.objects.get(name=value["Поляризация"].strip()) transponder = Transponders.objects.filter( sat_id=sat_obj, polarization=pol_obj, downlink__gte=downlink - F("frequency_range") / 2, downlink__lte=downlink + F("frequency_range") / 2, ).first() # try: # location = geolocator.reverse(f"{lat}, {lon}", language="ru").raw['address'] # loc_name = location.get('city', '') or location.get('town', '') or location.get('province', '') or location.get('country', '') # except AttributeError: # loc_name = '' # sleep(1) loc_name = "" if transponder: # and not (len(transponder) > 1): transfer = transponder.transfer uplink = transfer + downlink new_row = pd.DataFrame( [ { "Дата": date, "Широта": lat, "Долгота": lon, "Высота": 0.0, "Населённый пункт": loc_name, "ИСЗ": isz, "Прямой канал, МГц": uplink, "Обратный канал, МГц": downlink, "Перенос, МГц": transfer, "Полоса, МГц": freq_range, "Зеркала": "", } ] ) df = pd.concat([df, new_row], ignore_index=True) else: print("Ничего не найдено в транспондерах") return df # ============================================================================ # Утилиты для форматирования # ============================================================================ def format_coordinates(longitude: float, latitude: float) -> str: """ Форматирует координаты в читаемый вид. Преобразует числовые координаты в формат с указанием направления (N/S для широты, E/W для долготы). Args: longitude (float): Долгота в десятичных градусах. latitude (float): Широта в десятичных градусах. Returns: str: Отформатированная строка координат в формате "XXN/S YYE/W". Example: >>> format_coordinates(37.62, 55.75) '55.75N 37.62E' >>> format_coordinates(-122.42, 37.77) '37.77N 122.42W' """ lon_direction = "E" if longitude > 0 else "W" lat_direction = "N" if latitude > 0 else "S" lon_value = abs(longitude) lat_value = abs(latitude) return f"{lat_value}{lat_direction} {lon_value}{lon_direction}" def parse_pagination_params( request, default_per_page: int = DEFAULT_ITEMS_PER_PAGE ) -> tuple: """ Извлекает и валидирует параметры пагинации из запроса. Args: request: HTTP запрос Django. default_per_page (int): Количество элементов на странице по умолчанию. Returns: tuple: Кортеж (page_number, items_per_page), где: - page_number (int): Номер текущей страницы (по умолчанию 1). - items_per_page (int): Количество элементов на странице. Example: >>> page, per_page = parse_pagination_params(request, default_per_page=100) >>> paginator = Paginator(objects, per_page) >>> page_obj = paginator.get_page(page) """ page_number = request.GET.get("page", 1) items_per_page = request.GET.get("items_per_page", str(default_per_page)) # Валидация page_number try: page_number = int(page_number) if page_number < 1: page_number = 1 except (ValueError, TypeError): page_number = 1 # Валидация items_per_page try: items_per_page = int(items_per_page) if items_per_page < 1: items_per_page = default_per_page # Ограничиваем максимальное значение для предотвращения перегрузки if items_per_page > MAX_ITEMS_PER_PAGE: items_per_page = MAX_ITEMS_PER_PAGE except (ValueError, TypeError): items_per_page = default_per_page return page_number, items_per_page def get_first_param_subquery(field_name: str): """ Создает подзапрос для получения первого параметра объекта. Используется для аннотации queryset с полями из связанной модели Parameter. Возвращает значение указанного поля из первого параметра объекта. Args: field_name (str): Имя поля модели Parameter для извлечения. Может включать связанные поля через __ (например, 'id_satellite__name'). Returns: Subquery: Django Subquery объект для использования в annotate(). Example: >>> from django.db.models import Subquery, OuterRef >>> freq_subq = get_first_param_subquery('frequency') >>> objects = ObjItem.objects.annotate(first_freq=Subquery(freq_subq)) >>> for obj in objects: ... print(obj.first_freq) """ from django.db.models import OuterRef return ( Parameter.objects.filter(objitems=OuterRef("pk")) .order_by("id") .values(field_name)[:1] )