from .models import ( Satellite, Standard, Polarization, Mirror, Modulation, Geo, Parameter, SigmaParameter, ObjItem, CustomUser ) from mapsapp.models import Transponders from datetime import datetime, time import pandas as pd import numpy as np from django.contrib.gis.geos import Point import json import re import io from django.db.models import F, Count, Exists, OuterRef, Min, Max from geopy.geocoders import Nominatim import reverse_geocoder as rg import time def get_all_constants(): sats = [sat.name for sat in Satellite.objects.all()] standards = [sat.name for sat in Standard.objects.all()] pols = [sat.name for sat in Polarization.objects.all()] mirrors = [sat.name for sat in Mirror.objects.all()] modulations = [sat.name for sat in Modulation.objects.all()] return sats, standards, pols, mirrors, modulations def coords_transform(coords: str): lat_part, lon_part = coords.strip().split() sign_map = {'N': 1, 'E': 1, 'S': -1, 'W': -1} lat_sign_char = lat_part[-1] lat_value = float(lat_part[:-1].replace(",", ".")) latitude = lat_value * sign_map.get(lat_sign_char, 1) lon_sign_char = lon_part[-1] lon_value = float(lon_part[:-1].replace(",", ".")) longitude = lon_value * sign_map.get(lon_sign_char, 1) return (longitude, latitude) def remove_str(s: str): if isinstance(s, str): if s.strip() == "-" or s.strip() == "" or s.strip() == " " or "неизв" in s.strip(): return -1 return float(s.strip().replace(",", ".")) return s def fill_data_from_df(df: pd.DataFrame, sat: Satellite): try: df.rename(columns={'Модуляция ': 'Модуляция'}, inplace=True) except Exception as e: print(e) consts = get_all_constants() df.fillna(-1, inplace=True) for stroka in df.iterrows(): geo_point = Point(coords_transform(stroka[1]['Координаты']), srid=4326) valid_point = None kupsat_point = None try: if stroka[1]['Координаты объекта'] != -1 and stroka[1]['Координаты Кубсата'] != '+': if 'ИРИ' not in stroka[1]['Координаты объекта'] and 'БЛА' not in stroka[1]['Координаты объекта']: valid_point = list(map(float, stroka[1]['Координаты объекта'].replace(',', '.').split('. '))) valid_point = Point(valid_point[1], valid_point[0], srid=4326) if stroka[1]['Координаты Кубсата'] != -1 and stroka[1]['Координаты Кубсата'] != '+': kupsat_point = list(map(float, stroka[1]['Координаты Кубсата'].replace(',', '.').split('. '))) kupsat_point = Point(kupsat_point[1], kupsat_point[0], srid=4326) except KeyError: print("В таблице нет столбцов с координатами кубсата") try: polarization_obj, _ = Polarization.objects.get_or_create(name=stroka[1]['Поляризация'].strip()) except KeyError: polarization_obj, _ = Polarization.objects.get_or_create(name="-") freq = remove_str(stroka[1]['Частота, МГц']) freq_line = remove_str(stroka[1]['Полоса, МГц']) v = remove_str(stroka[1]['Символьная скорость, БОД']) try: mod_obj, _ = Modulation.objects.get_or_create(name=stroka[1]['Модуляция'].strip()) except AttributeError: mod_obj, _ = Modulation.objects.get_or_create(name='-') snr = remove_str(stroka[1]['ОСШ']) date = stroka[1]['Дата'].date() time_ = stroka[1]['Время'] if isinstance(time_, str): time_ = time(0,0,0) timestamp = datetime.combine(date, time_) current_mirrors = [] mirror_1 = stroka[1]['Зеркало 1'].strip().split("\n") mirror_2 = stroka[1]['Зеркало 2'].strip().split("\n") if len(mirror_1) > 1: for mir in mirror_1: mir_obj, _ = Mirror.objects.get_or_create(name=mir.strip()) current_mirrors.append(mir.strip()) elif mirror_1[0] not in consts[3]: mir_obj, _ = Mirror.objects.get_or_create(name=mirror_1[0].strip()) current_mirrors.append(mirror_1[0].strip()) if len(mirror_2) > 1: for mir in mirror_2: mir_obj, _ = Mirror.objects.get_or_create(name=mir.strip()) current_mirrors.append(mir.strip()) elif mirror_2[0] not in consts[3]: mir_obj, _ = Mirror.objects.get_or_create(name=mirror_2[0].strip()) current_mirrors.append(mirror_2[0].strip()) location = stroka[1]['Местоопределение'].strip() comment = stroka[1]['Комментарий'] source = stroka[1]['Объект наблюдения'] vch_load_obj, vch_created = Parameter.objects.get_or_create( id_satellite=sat, polarization=polarization_obj, frequency=freq, freq_range=freq_line, bod_velocity=v, modulation=mod_obj, snr=snr, defaults={'id_user_add': CustomUser.objects.get(id=1)} ) geo, _ = Geo.objects.get_or_create( timestamp=timestamp, coords=geo_point, defaults={ 'coords_kupsat': kupsat_point, 'coords_valid': valid_point, 'location': location, 'comment': comment, 'is_average': (comment != -1.0), 'id_user_add': CustomUser.objects.get(id=1) } ) geo.save() geo.mirrors.set(Mirror.objects.filter(name__in=current_mirrors)) obj_item, _ = ObjItem.objects.get_or_create( id_geo=geo, id_vch_load=vch_load_obj, defaults={ 'name': source, 'id_user_add': CustomUser.objects.get(id=1), # 'id_satellite': sat } ) obj_item.save() def add_satellite_list(): sats = ['AZERSPACE 2', 'Amos 4', 'Astra 4A', 'ComsatBW-1', 'Eutelsat 16A', 'Eutelsat 21B', 'Eutelsat 7B', 'ExpressAM6', 'Hellas Sat 3', 'Intelsat 39', 'Intelsat 17', 'NSS 12', 'Sicral 2', 'SkyNet 5B', 'SkyNet 5D', 'Syracuse 4A', 'Turksat 3A', 'Turksat 4A', 'WGS 10', 'Yamal 402'] for sat in sats: sat_obj, _ = Satellite.objects.get_or_create( name=sat ) sat_obj.save() def parse_string(s: str): pattern = r'^(.+?) (-?\d+\,\d+) \[(-?\d+\,\d+)\] ([^\s]+) ([A-Za-z]) - (\d{1,2}\.\d{1,2}\.\d{1,4} \d{1,2}:\d{1,2}:\d{1,2})$' match = re.match(pattern, s) if match: return list(match.groups()) else: raise ValueError("Некорректный формат строки") def get_point_from_json(filepath: str): with open(filepath, encoding='utf-8-sig') as jf: data = json.load(jf) for obj in data: if not obj.get('bearingBehavior', {}): if obj['tacticObjectType'] == "source": # if not obj['bearingBehavior']: source_id = obj['id'] name = obj['name'] elements = parse_string(name) sat_name = elements[0] freq = elements[1] freq_range = elements[2] pol = elements[4] timestamp = datetime.strptime(elements[-1], '%d.%m.%y %H:%M:%S') lat = None lon = None for pos in data: if pos["id"] == source_id and pos["tacticObjectType"] == "position": lat = pos["latitude"] lon = pos["longitude"] break print(f"Name - {sat_name}, f - {freq}, f range - {freq_range}, pol - {pol} " f"time - {timestamp}, pos - ({lat}, {lon})") def get_points_from_csv(file_content): df = pd.read_csv(io.StringIO(file_content), sep=";", names=['id', 'obj', 'lat', 'lon', 'h', 'time', 'sat', 'norad_id', 'freq', 'f_range', 'et', 'qaul', 'mir_1', 'mir_2', 'mir_3']) df[['lat', 'lon', 'freq', 'f_range']] = df[['lat', 'lon', 'freq', 'f_range']].replace(',', '.', regex=True).astype(float) df['time'] = pd.to_datetime(df['time'], format='%d.%m.%Y %H:%M:%S') for row in df.iterrows(): row = row[1] match row['obj'].split(' ')[-1]: case 'V': pol = 'Вертикальная' case 'H': pol = 'Горизонтальная' case 'R': pol = 'Правая' case 'L': pol = 'Левая' case _: pol = '-' pol_obj, _ = Polarization.objects.get_or_create( name=pol ) sat_obj, _ = Satellite.objects.get_or_create( name=row['sat'], defaults={'norad': row['norad_id']} ) mir_1_obj, _ = Mirror.objects.get_or_create( name=row['mir_1'] ) mir_2_obj, _ = Mirror.objects.get_or_create( name=row['mir_2'] ) mir_lst = [row['mir_1'], row['mir_2']] if not pd.isna(row['mir_3']): mir_3_obj, _ = Mirror.objects.get_or_create( name=row['mir_3'] ) vch_load_obj, _ = Parameter.objects.get_or_create( id_satellite=sat_obj, polarization=pol_obj, frequency=row['freq'], freq_range=row['f_range'], defaults={'id_user_add': CustomUser.objects.get(id=1)} ) geo_obj, _ = Geo.objects.get_or_create( timestamp=row['time'], coords=Point(row['lon'], row['lat'], srid=4326), defaults={ 'is_average': False, 'id_user_add': CustomUser.objects.get(id=1), } ) geo_obj.mirrors.set(Mirror.objects.filter(name__in=mir_lst)) obj_item_obj, _ = ObjItem.objects.get_or_create( name=row['obj'], # id_satellite=sat_obj, id_vch_load=vch_load_obj, id_geo=geo_obj, defaults={ 'id_user_add': CustomUser.objects.get(id=1) } ) obj_item_obj.save() def get_vch_load_from_html(file, sat: Satellite) -> None: filename = file.name.split('_') transfer = filename[3] match filename[2]: case 'H': pol = 'Горизонтальная' case 'V': pol = 'Вертикальная' case 'R': pol = 'Правая' case 'L': pol = 'Левая' case _: pol = '-' tables = pd.read_html(file, encoding='windows-1251') df = tables[0] df = df.drop(0).reset_index(drop=True) df.columns = df.iloc[0] df = df.drop(0).reset_index(drop=True) df.replace('Неизвестно', '-', inplace=True) df[['Частота, МГц', 'Полоса, МГц', 'Мощность, дБм']] = df[['Частота, МГц', 'Полоса, МГц', 'Мощность, дБм']].apply(pd.to_numeric) df['Время начала измерения'] = df['Время начала измерения'].apply(lambda x: datetime.strptime(x, '%d.%m.%Y %H:%M:%S')) df['Время окончания измерения'] = df['Время окончания измерения'].apply(lambda x: datetime.strptime(x, '%d.%m.%Y %H:%M:%S')) for stroka in df.iterrows(): value = stroka[1] if value['Полоса, МГц'] < 0.08: continue if '-' in value['Символьная скорость']: bod_velocity = -1.0 else: bod_velocity = value['Символьная скорость'] if '-' in value['Сигнал/шум, дБ']: snr = - 1.0 else: snr = value['Сигнал/шум, дБ'] if value['Пакетность'] == 'да': pack = True elif value['Пакетность'] == 'нет': pack = False else: pack = None polarization, _ = Polarization.objects.get_or_create( name=pol ) mod, _ = Modulation.objects.get_or_create( name=value['Модуляция'] ) standard, _ = Standard.objects.get_or_create( name=value['Стандарт'] ) sigma_load, _ = SigmaParameter.objects.get_or_create( id_satellite=sat, frequency=value['Частота, МГц'], freq_range=value['Полоса, МГц'], polarization=polarization, defaults={ "transfer": float(transfer), # "polarization": polarization, "status": value['Статус'], "power": value['Мощность, дБм'], "bod_velocity": bod_velocity, "modulation": mod, "snr": snr, "packets": pack, "datetime_begin": value['Время начала измерения'], "datetime_end": value['Время окончания измерения'], } ) sigma_load.save() def compare_and_link_vch_load(sat_id: Satellite, eps_freq: float, eps_frange: float, ku_range: float): item_obj = ObjItem.objects.filter(id_vch_load__id_satellite=sat_id) vch_sigma = SigmaParameter.objects.filter(id_satellite=sat_id) link_count = 0 obj_count = len(item_obj) for idx, obj in enumerate(item_obj): vch_load = obj.id_vch_load if vch_load.frequency == -1.0: continue # if unique_points = Point.objects.order_by('frequency').distinct('frequency') for sigma in vch_sigma: if ( abs(sigma.transfer_frequency - vch_load.frequency) <= vch_load.frequency*eps_freq/100 and abs(sigma.freq_range - vch_load.freq_range) <= vch_load.freq_range*eps_frange/100 and sigma.polarization == vch_load.polarization ): sigma.parameter = vch_load sigma.save() link_count += 1 return obj_count, link_count def kub_report(data_in: io.StringIO) -> pd.DataFrame: df_in = pd.read_excel(data_in) df = pd.DataFrame(columns=['Дата', 'Широта', 'Долгота', 'Высота', 'Населённый пункт', 'ИСЗ', 'Прямой канал, МГц', 'Обратный канал, МГц', 'Перенос, МГц', 'Полоса, МГц', 'Зеркала']) for row in df_in.iterrows(): value = row[1] date = datetime.date(datetime.now()) lat = value['Широта, град'] lon = value['Долгота, град'] isz = value['ИСЗ'] downlink = value['Обратный канал, МГц'] freq_range = value['Полоса, МГц'] norad = int(re.findall(r'\((\d+)\)', isz)[0]) sat_obj = Satellite.objects.get(norad=norad) pol_obj = Polarization.objects.get(name=value['Поляризация'].strip()) transponder = Transponders.objects.filter( sat_id=sat_obj, polarization=pol_obj, downlink__gte=downlink - F('frequency_range')/2, downlink__lte=downlink + F('frequency_range')/2, ).first() # try: # location = geolocator.reverse(f"{lat}, {lon}", language="ru").raw['address'] # loc_name = location.get('city', '') or location.get('town', '') or location.get('province', '') or location.get('country', '') # except AttributeError: # loc_name = '' # time.sleep(1) loc_name = '' if transponder: #and not (len(transponder) > 1): transfer = transponder.transfer uplink = transfer + downlink new_row = pd.DataFrame([{'Дата': date, 'Широта': lat, 'Долгота': lon, 'Высота': 0.0, 'Населённый пункт': loc_name, 'ИСЗ': isz, 'Прямой канал, МГц': uplink, 'Обратный канал, МГц': downlink, 'Перенос, МГц': transfer, 'Полоса, МГц': freq_range, 'Зеркала': '' }]) df = pd.concat([df, new_row], ignore_index=True) else: print("Ничего не найдено в транспондерах") return df