Files
dbstorage/dbapp/mainapp/utils.py

426 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from .models import (
Satellite,
Standard,
Polarization,
Mirror,
Modulation,
Geo,
Parameter,
SigmaParameter,
ObjItem,
CustomUser
)
from mapsapp.models import Transponders
from datetime import datetime, time
import pandas as pd
import numpy as np
from django.contrib.gis.geos import Point
import json
import re
import io
from django.db.models import F, Count, Exists, OuterRef, Min, Max
from geopy.geocoders import Nominatim
import reverse_geocoder as rg
from time import sleep
def get_all_constants():
sats = [sat.name for sat in Satellite.objects.all()]
standards = [sat.name for sat in Standard.objects.all()]
pols = [sat.name for sat in Polarization.objects.all()]
mirrors = [sat.name for sat in Mirror.objects.all()]
modulations = [sat.name for sat in Modulation.objects.all()]
return sats, standards, pols, mirrors, modulations
def coords_transform(coords: str):
lat_part, lon_part = coords.strip().split()
sign_map = {'N': 1, 'E': 1, 'S': -1, 'W': -1}
lat_sign_char = lat_part[-1]
lat_value = float(lat_part[:-1].replace(",", "."))
latitude = lat_value * sign_map.get(lat_sign_char, 1)
lon_sign_char = lon_part[-1]
lon_value = float(lon_part[:-1].replace(",", "."))
longitude = lon_value * sign_map.get(lon_sign_char, 1)
return (longitude, latitude)
def remove_str(s: str):
if isinstance(s, str):
if s.strip() == "-" or s.strip() == "" or s.strip() == " " or "неизв" in s.strip():
return -1
return float(s.strip().replace(",", "."))
return s
def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None):
try:
df.rename(columns={'Модуляция ': 'Модуляция'}, inplace=True)
except Exception as e:
print(e)
consts = get_all_constants()
df.fillna(-1, inplace=True)
for stroka in df.iterrows():
geo_point = Point(coords_transform(stroka[1]['Координаты']), srid=4326)
valid_point = None
kupsat_point = None
try:
if stroka[1]['Координаты объекта'] != -1 and stroka[1]['Координаты Кубсата'] != '+':
if 'ИРИ' not in stroka[1]['Координаты объекта'] and 'БЛА' not in stroka[1]['Координаты объекта']:
valid_point = list(map(float, stroka[1]['Координаты объекта'].replace(',', '.').split('. ')))
valid_point = Point(valid_point[1], valid_point[0], srid=4326)
if stroka[1]['Координаты Кубсата'] != -1 and stroka[1]['Координаты Кубсата'] != '+':
kupsat_point = list(map(float, stroka[1]['Координаты Кубсата'].replace(',', '.').split('. ')))
kupsat_point = Point(kupsat_point[1], kupsat_point[0], srid=4326)
except KeyError:
print("В таблице нет столбцов с координатами кубсата")
try:
polarization_obj, _ = Polarization.objects.get_or_create(name=stroka[1]['Поляризация'].strip())
except KeyError:
polarization_obj, _ = Polarization.objects.get_or_create(name="-")
freq = remove_str(stroka[1]['Частота, МГц'])
freq_line = remove_str(stroka[1]['Полоса, МГц'])
v = remove_str(stroka[1]['Символьная скорость, БОД'])
try:
mod_obj, _ = Modulation.objects.get_or_create(name=stroka[1]['Модуляция'].strip())
except AttributeError:
mod_obj, _ = Modulation.objects.get_or_create(name='-')
snr = remove_str(stroka[1]['ОСШ'])
date = stroka[1]['Дата'].date()
time_ = stroka[1]['Время']
if isinstance(time_, str):
time_ = time_.strip()
time_ = time(0,0,0)
timestamp = datetime.combine(date, time_)
current_mirrors = []
mirror_1 = stroka[1]['Зеркало 1'].strip().split("\n")
mirror_2 = stroka[1]['Зеркало 2'].strip().split("\n")
if len(mirror_1) > 1:
for mir in mirror_1:
mir_obj, _ = Mirror.objects.get_or_create(name=mir.strip())
current_mirrors.append(mir.strip())
elif mirror_1[0] not in consts[3]:
mir_obj, _ = Mirror.objects.get_or_create(name=mirror_1[0].strip())
current_mirrors.append(mirror_1[0].strip())
if len(mirror_2) > 1:
for mir in mirror_2:
mir_obj, _ = Mirror.objects.get_or_create(name=mir.strip())
current_mirrors.append(mir.strip())
elif mirror_2[0] not in consts[3]:
mir_obj, _ = Mirror.objects.get_or_create(name=mirror_2[0].strip())
current_mirrors.append(mirror_2[0].strip())
location = stroka[1]['Местоопределение'].strip()
comment = stroka[1]['Комментарий']
source = stroka[1]['Объект наблюдения']
user_to_use = current_user if current_user else CustomUser.objects.get(id=1)
vch_load_obj, _ = Parameter.objects.get_or_create(
id_satellite=sat,
polarization=polarization_obj,
frequency=freq,
freq_range=freq_line,
bod_velocity=v,
modulation=mod_obj,
snr=snr,
)
geo, _ = Geo.objects.get_or_create(
timestamp=timestamp,
coords=geo_point,
defaults={
'coords_kupsat': kupsat_point,
'coords_valid': valid_point,
'location': location,
'comment': comment,
'is_average': (comment != -1.0),
}
)
geo.save()
geo.mirrors.set(Mirror.objects.filter(name__in=current_mirrors))
existing_obj_items = ObjItem.objects.filter(
parameters_obj=vch_load_obj,
geo_obj=geo
)
if not existing_obj_items.exists():
obj_item = ObjItem.objects.create(
name=source,
created_by=user_to_use
)
obj_item.parameters_obj.set([vch_load_obj])
geo.objitem = obj_item
geo.save()
def add_satellite_list():
sats = ['AZERSPACE 2', 'Amos 4', 'Astra 4A', 'ComsatBW-1', 'Eutelsat 16A',
'Eutelsat 21B', 'Eutelsat 7B', 'ExpressAM6', 'Hellas Sat 3',
'Intelsat 39', 'Intelsat 17',
'NSS 12', 'Sicral 2', 'SkyNet 5B', 'SkyNet 5D', 'Syracuse 4A',
'Turksat 3A', 'Turksat 4A', 'WGS 10', 'Yamal 402']
for sat in sats:
sat_obj, _ = Satellite.objects.get_or_create(
name=sat
)
sat_obj.save()
def parse_string(s: str):
pattern = r'^(.+?) (-?\d+\,\d+) \[(-?\d+\,\d+)\] ([^\s]+) ([A-Za-z]) - (\d{1,2}\.\d{1,2}\.\d{1,4} \d{1,2}:\d{1,2}:\d{1,2})$'
match = re.match(pattern, s)
if match:
return list(match.groups())
else:
raise ValueError("Некорректный формат строки")
def get_point_from_json(filepath: str):
with open(filepath, encoding='utf-8-sig') as jf:
data = json.load(jf)
for obj in data:
if not obj.get('bearingBehavior', {}):
if obj['tacticObjectType'] == "source":
# if not obj['bearingBehavior']:
source_id = obj['id']
name = obj['name']
elements = parse_string(name)
sat_name = elements[0]
freq = elements[1]
freq_range = elements[2]
pol = elements[4]
timestamp = datetime.strptime(elements[-1], '%d.%m.%y %H:%M:%S')
lat = None
lon = None
for pos in data:
if pos["id"] == source_id and pos["tacticObjectType"] == "position":
lat = pos["latitude"]
lon = pos["longitude"]
break
print(f"Name - {sat_name}, f - {freq}, f range - {freq_range}, pol - {pol} "
f"time - {timestamp}, pos - ({lat}, {lon})")
def get_points_from_csv(file_content, current_user=None):
df = pd.read_csv(io.StringIO(file_content), sep=";",
names=['id', 'obj', 'lat', 'lon', 'h', 'time', 'sat', 'norad_id', 'freq', 'f_range', 'et', 'qaul', 'mir_1', 'mir_2', 'mir_3'])
df[['lat', 'lon', 'freq', 'f_range']] = df[['lat', 'lon', 'freq', 'f_range']].replace(',', '.', regex=True).astype(float)
df['time'] = pd.to_datetime(df['time'], format='%d.%m.%Y %H:%M:%S')
for row in df.iterrows():
row = row[1]
match row['obj'].split(' ')[-1]:
case 'V':
pol = 'Вертикальная'
case 'H':
pol = 'Горизонтальная'
case 'R':
pol = 'Правая'
case 'L':
pol = 'Левая'
case _:
pol = '-'
pol_obj, _ = Polarization.objects.get_or_create(
name=pol
)
sat_obj, _ = Satellite.objects.get_or_create(
name=row['sat'],
defaults={'norad': row['norad_id']}
)
mir_1_obj, _ = Mirror.objects.get_or_create(
name=row['mir_1']
)
mir_2_obj, _ = Mirror.objects.get_or_create(
name=row['mir_2']
)
mir_lst = [row['mir_1'], row['mir_2']]
if not pd.isna(row['mir_3']):
mir_3_obj, _ = Mirror.objects.get_or_create(
name=row['mir_3']
)
user_to_use = current_user if current_user else CustomUser.objects.get(id=1)
vch_load_obj, _ = Parameter.objects.get_or_create(
id_satellite=sat_obj,
polarization=pol_obj,
frequency=row['freq'],
freq_range=row['f_range'],
# defaults={'id_user_add': user_to_use}
)
geo_obj, _ = Geo.objects.get_or_create(
timestamp=row['time'],
coords=Point(row['lon'], row['lat'], srid=4326),
defaults={
'is_average': False,
# 'id_user_add': user_to_use,
}
)
geo_obj.mirrors.set(Mirror.objects.filter(name__in=mir_lst))
existing_obj_items = ObjItem.objects.filter(
parameters_obj=vch_load_obj,
geo_obj=geo_obj
)
if not existing_obj_items.exists():
obj_item = ObjItem.objects.create(
name=row['obj'],
created_by=user_to_use
)
obj_item.parameters_obj.set([vch_load_obj])
geo_obj.objitem = obj_item
geo_obj.save()
def get_vch_load_from_html(file, sat: Satellite) -> None:
filename = file.name.split('_')
transfer = filename[3]
match filename[2]:
case 'H':
pol = 'Горизонтальная'
case 'V':
pol = 'Вертикальная'
case 'R':
pol = 'Правая'
case 'L':
pol = 'Левая'
case _:
pol = '-'
tables = pd.read_html(file, encoding='windows-1251')
df = tables[0]
df = df.drop(0).reset_index(drop=True)
df.columns = df.iloc[0]
df = df.drop(0).reset_index(drop=True)
df.replace('Неизвестно', '-', inplace=True)
df[['Частота, МГц', 'Полоса, МГц', 'Мощность, дБм']] = df[['Частота, МГц', 'Полоса, МГц', 'Мощность, дБм']].apply(pd.to_numeric)
df['Время начала измерения'] = df['Время начала измерения'].apply(lambda x: datetime.strptime(x, '%d.%m.%Y %H:%M:%S'))
df['Время окончания измерения'] = df['Время окончания измерения'].apply(lambda x: datetime.strptime(x, '%d.%m.%Y %H:%M:%S'))
for stroka in df.iterrows():
value = stroka[1]
if value['Полоса, МГц'] < 0.08:
continue
if '-' in value['Символьная скорость']:
bod_velocity = -1.0
else:
bod_velocity = value['Символьная скорость']
if '-' in value['Сигнал/шум, дБ']:
snr = - 1.0
else:
snr = value['Сигнал/шум, дБ']
if value['Пакетность'] == 'да':
pack = True
elif value['Пакетность'] == 'нет':
pack = False
else:
pack = None
polarization, _ = Polarization.objects.get_or_create(
name=pol
)
mod, _ = Modulation.objects.get_or_create(
name=value['Модуляция']
)
standard, _ = Standard.objects.get_or_create(
name=value['Стандарт']
)
sigma_load, _ = SigmaParameter.objects.get_or_create(
id_satellite=sat,
frequency=value['Частота, МГц'],
freq_range=value['Полоса, МГц'],
polarization=polarization,
defaults={
"transfer": float(transfer),
# "polarization": polarization,
"status": value['Статус'],
"power": value['Мощность, дБм'],
"bod_velocity": bod_velocity,
"modulation": mod,
"snr": snr,
"packets": pack,
"datetime_begin": value['Время начала измерения'],
"datetime_end": value['Время окончания измерения'],
}
)
sigma_load.save()
def compare_and_link_vch_load(sat_id: Satellite, eps_freq: float, eps_frange: float, ku_range: float):
item_obj = ObjItem.objects.filter(parameters_obj__id_satellite=sat_id)
vch_sigma = SigmaParameter.objects.filter(id_satellite=sat_id)
link_count = 0
obj_count = len(item_obj)
for idx, obj in enumerate(item_obj):
vch_load = obj.parameters_obj.get()
if vch_load.frequency == -1.0:
continue
for sigma in vch_sigma:
if (
abs(sigma.transfer_frequency - vch_load.frequency) <= eps_freq and
abs(sigma.freq_range - vch_load.freq_range) <= vch_load.freq_range*eps_frange/100 and
sigma.polarization == vch_load.polarization
):
sigma.parameter = vch_load
sigma.save()
link_count += 1
return obj_count, link_count
def kub_report(data_in: io.StringIO) -> pd.DataFrame:
df_in = pd.read_excel(data_in)
df = pd.DataFrame(columns=['Дата', 'Широта', 'Долгота',
'Высота', 'Населённый пункт', 'ИСЗ',
'Прямой канал, МГц', 'Обратный канал, МГц', 'Перенос, МГц', 'Полоса, МГц', 'Зеркала'])
for row in df_in.iterrows():
value = row[1]
date = datetime.date(datetime.now())
isz = value['ИСЗ']
try:
lat = float(value['Широта, град'].strip().replace(',', '.'))
lon = float(value['Долгота, град'].strip().replace(',', '.'))
downlink = float(value['Обратный канал, МГц'].strip().replace(',', '.'))
freq_range = float(value['Полоса, МГц'].strip().replace(',', '.'))
except Exception as e:
lat = value['Широта, град']
lon = value['Долгота, град']
downlink = value['Обратный канал, МГц']
freq_range = value['Полоса, МГц']
print(e)
norad = int(re.findall(r'\((\d+)\)', isz)[0])
sat_obj = Satellite.objects.get(norad=norad)
pol_obj = Polarization.objects.get(name=value['Поляризация'].strip())
transponder = Transponders.objects.filter(
sat_id=sat_obj,
polarization=pol_obj,
downlink__gte=downlink - F('frequency_range')/2,
downlink__lte=downlink + F('frequency_range')/2,
).first()
# try:
# location = geolocator.reverse(f"{lat}, {lon}", language="ru").raw['address']
# loc_name = location.get('city', '') or location.get('town', '') or location.get('province', '') or location.get('country', '')
# except AttributeError:
# loc_name = ''
# sleep(1)
loc_name = ''
if transponder: #and not (len(transponder) > 1):
transfer = transponder.transfer
uplink = transfer + downlink
new_row = pd.DataFrame([{'Дата': date,
'Широта': lat,
'Долгота': lon,
'Высота': 0.0,
'Населённый пункт': loc_name,
'ИСЗ': isz,
'Прямой канал, МГц': uplink,
'Обратный канал, МГц': downlink,
'Перенос, МГц': transfer,
'Полоса, МГц': freq_range,
'Зеркала': ''
}])
df = pd.concat([df, new_row], ignore_index=True)
else:
print("Ничего не найдено в транспондерах")
return df