Files
dbstorage/dbapp/mainapp/views.py

1273 lines
50 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Standard library imports
from collections import defaultdict
from io import BytesIO
# Django imports
from django.utils import timezone
from django.contrib import messages
from django.contrib.admin.views.decorators import staff_member_required
from django.contrib.auth import logout
from django.contrib.auth.mixins import LoginRequiredMixin
from django.core.paginator import Paginator
from django.db import models
from django.db.models import F
from django.http import HttpResponse, JsonResponse
from django.shortcuts import redirect, render
from django.urls import reverse_lazy
from django.utils.decorators import method_decorator
from django.views import View
from django.views.generic import (
CreateView,
DeleteView,
FormView,
UpdateView,
)
# Third-party imports
import pandas as pd
# Local imports
from .clusters import get_clusters
from .forms import (
GeoForm,
LoadCsvData,
LoadExcelData,
NewEventForm,
ObjItemForm,
ParameterForm,
UploadFileForm,
UploadVchLoad,
VchLinkForm,
FillLyngsatDataForm,
LinkLyngsatForm,
)
from .mixins import CoordinateProcessingMixin, FormMessageMixin, RoleRequiredMixin
from .models import Geo, Modulation, ObjItem, Polarization, Satellite
from .utils import (
add_satellite_list,
compare_and_link_vch_load,
fill_data_from_df,
get_points_from_csv,
get_vch_load_from_html,
kub_report,
parse_pagination_params,
)
from mapsapp.utils import parse_transponders_from_xml
class AddSatellitesView(LoginRequiredMixin, View):
def get(self, request):
add_satellite_list()
return redirect("mainapp:home")
# class AddTranspondersView(View):
# def get(self, request):
# try:
# parse_transponders_from_json(BASE_DIR / "transponders.json")
# except FileNotFoundError:
# print("Файл не найден")
# return redirect('home')
class AddTranspondersView(LoginRequiredMixin, FormMessageMixin, FormView):
template_name = "mainapp/transponders_upload.html"
form_class = UploadFileForm
success_message = "Файл успешно обработан"
error_message = "Форма заполнена некорректно"
def form_valid(self, form):
uploaded_file = self.request.FILES["file"]
try:
content = uploaded_file.read()
parse_transponders_from_xml(BytesIO(content))
except ValueError as e:
messages.error(self.request, f"Ошибка при чтении таблиц: {e}")
return redirect("mainapp:add_trans")
except Exception as e:
messages.error(self.request, f"Неизвестная ошибка: {e}")
return redirect("mainapp:add_trans")
return super().form_valid(form)
def get_success_url(self):
return reverse_lazy("mainapp:add_trans")
from django.views.generic import View
class ActionsPageView(View):
def get(self, request):
if request.user.is_authenticated:
return render(request, "mainapp/actions.html")
else:
return render(request, "mainapp/login_required.html")
class HomePageView(View):
def get(self, request):
if request.user.is_authenticated:
# Redirect to objitem list if authenticated
return redirect("mainapp:objitem_list")
else:
return render(request, "mainapp/login_required.html")
class LoadExcelDataView(LoginRequiredMixin, FormMessageMixin, FormView):
template_name = "mainapp/add_data_from_excel.html"
form_class = LoadExcelData
error_message = "Форма заполнена некорректно"
def form_valid(self, form):
uploaded_file = self.request.FILES["file"]
selected_sat = form.cleaned_data["sat_choice"]
number = form.cleaned_data["number_input"]
try:
import io
df = pd.read_excel(io.BytesIO(uploaded_file.read()))
if number > 0:
df = df.head(number)
result = fill_data_from_df(df, selected_sat, self.request.user.customuser)
messages.success(
self.request, f"Данные успешно загружены! Обработано строк: {result}"
)
except Exception as e:
messages.error(self.request, f"Ошибка при обработке файла: {str(e)}")
return redirect("mainapp:load_excel_data")
def get_success_url(self):
return reverse_lazy("mainapp:load_excel_data")
class GetLocationsView(LoginRequiredMixin, View):
def get(self, request, sat_id):
locations = (
ObjItem.objects.filter(parameter_obj__id_satellite=sat_id)
.select_related(
"geo_obj",
"parameter_obj",
"parameter_obj__polarization",
)
)
if not locations.exists():
return JsonResponse({"error": "Объектов не найдено"}, status=404)
features = []
for loc in locations:
if not hasattr(loc, "geo_obj") or not loc.geo_obj or not loc.geo_obj.coords:
continue
param = getattr(loc, 'parameter_obj', None)
if not param:
continue
features.append(
{
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [loc.geo_obj.coords[0], loc.geo_obj.coords[1]],
},
"properties": {
"pol": param.polarization.name if param.polarization else "-",
"freq": param.frequency * 1000000 if param.frequency else 0,
"name": loc.name or "-",
"id": loc.geo_obj.id,
},
}
)
return JsonResponse({"type": "FeatureCollection", "features": features})
class LoadCsvDataView(LoginRequiredMixin, FormMessageMixin, FormView):
template_name = "mainapp/add_data_from_csv.html"
form_class = LoadCsvData
success_message = "Данные успешно загружены!"
error_message = "Форма заполнена некорректно"
def form_valid(self, form):
uploaded_file = self.request.FILES["file"]
try:
content = uploaded_file.read()
if isinstance(content, bytes):
content = content.decode("utf-8")
get_points_from_csv(content, self.request.user.customuser)
except Exception as e:
messages.error(self.request, f"Ошибка при обработке файла: {str(e)}")
return redirect("mainapp:load_csv_data")
return super().form_valid(form)
def get_success_url(self):
return reverse_lazy("mainapp:load_csv_data")
@method_decorator(staff_member_required, name="dispatch")
class ShowMapView(RoleRequiredMixin, View):
required_roles = ["admin", "moderator"]
def get(self, request):
ids = request.GET.get("ids", "")
points = []
if ids:
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
locations = ObjItem.objects.filter(id__in=id_list).select_related(
"parameter_obj",
"parameter_obj__id_satellite",
"parameter_obj__polarization",
"parameter_obj__modulation",
"parameter_obj__standard",
"geo_obj",
)
for obj in locations:
if (
not hasattr(obj, "geo_obj")
or not obj.geo_obj
or not obj.geo_obj.coords
):
continue
param = getattr(obj, 'parameter_obj', None)
if not param:
continue
points.append(
{
"name": f"{obj.name}",
"freq": f"{param.frequency} [{param.freq_range}] МГц",
"point": (obj.geo_obj.coords.x, obj.geo_obj.coords.y),
}
)
else:
return redirect("admin")
grouped = defaultdict(list)
for p in points:
grouped[p["name"]].append({"point": p["point"], "frequency": p["freq"]})
groups = [
{"name": name, "points": coords_list}
for name, coords_list in grouped.items()
]
context = {
"groups": groups,
}
return render(request, "admin/map_custom.html", context)
class ShowSelectedObjectsMapView(LoginRequiredMixin, View):
def get(self, request):
ids = request.GET.get("ids", "")
points = []
if ids:
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
locations = ObjItem.objects.filter(id__in=id_list).select_related(
"parameter_obj",
"parameter_obj__id_satellite",
"parameter_obj__polarization",
"parameter_obj__modulation",
"parameter_obj__standard",
"geo_obj",
)
for obj in locations:
if (
not hasattr(obj, "geo_obj")
or not obj.geo_obj
or not obj.geo_obj.coords
):
continue
param = getattr(obj, 'parameter_obj', None)
if not param:
continue
points.append(
{
"name": f"{obj.name}",
"freq": f"{param.frequency} [{param.freq_range}] МГц",
"point": (obj.geo_obj.coords.x, obj.geo_obj.coords.y),
}
)
else:
return redirect("mainapp:objitem_list")
# Group points by object name
from collections import defaultdict
grouped = defaultdict(list)
for p in points:
grouped[p["name"]].append({"point": p["point"], "frequency": p["freq"]})
groups = [
{"name": name, "points": coords_list}
for name, coords_list in grouped.items()
]
context = {
"groups": groups,
}
return render(request, "mainapp/objitem_map.html", context)
class ClusterTestView(LoginRequiredMixin, View):
def get(self, request):
objs = ObjItem.objects.filter(
name__icontains="! Astra 4A 12654,040 [1,962] МГц H"
)
coords = []
for obj in objs:
if hasattr(obj, "geo_obj") and obj.geo_obj and obj.geo_obj.coords:
coords.append(
(obj.geo_obj.coords.coords[1], obj.geo_obj.coords.coords[0])
)
get_clusters(coords)
return JsonResponse({"success": "ок"})
def custom_logout(request):
logout(request)
return redirect("mainapp:home")
class UploadVchLoadView(LoginRequiredMixin, FormMessageMixin, FormView):
template_name = "mainapp/upload_html.html"
form_class = UploadVchLoad
success_message = "Файл успешно обработан"
error_message = "Форма заполнена некорректно"
def form_valid(self, form):
selected_sat = form.cleaned_data["sat_choice"]
uploaded_file = self.request.FILES["file"]
try:
get_vch_load_from_html(uploaded_file, selected_sat)
except ValueError as e:
messages.error(self.request, f"Ошибка при чтении таблиц: {e}")
return redirect("mainapp:vch_load")
except Exception as e:
messages.error(self.request, f"Неизвестная ошибка: {e}")
return redirect("mainapp:vch_load")
return super().form_valid(form)
def get_success_url(self):
return reverse_lazy("mainapp:vch_load")
class LinkVchSigmaView(LoginRequiredMixin, FormView):
template_name = "mainapp/link_vch.html"
form_class = VchLinkForm
def form_valid(self, form):
freq = form.cleaned_data["value1"]
freq_range = form.cleaned_data["value2"]
# ku_range = float(form.cleaned_data['ku_range'])
sat_id = form.cleaned_data["sat_choice"]
# print(freq, freq_range, ku_range, sat_id.pk)
count_all, link_count = compare_and_link_vch_load(sat_id, freq, freq_range, 1)
messages.success(
self.request, f"Привязано {link_count} из {count_all} объектов"
)
return redirect("mainapp:link_vch_sigma")
def form_invalid(self, form):
return self.render_to_response(self.get_context_data(form=form))
class LinkLyngsatSourcesView(LoginRequiredMixin, FormMessageMixin, FormView):
"""Представление для привязки источников LyngSat к объектам"""
template_name = "mainapp/link_lyngsat.html"
form_class = LinkLyngsatForm
success_message = "Привязка источников LyngSat завершена"
error_message = "Ошибка при привязке источников"
def form_valid(self, form):
from lyngsatapp.models import LyngSat
satellites = form.cleaned_data.get("satellites")
frequency_tolerance = form.cleaned_data.get("frequency_tolerance", 0.5)
# Если спутники не выбраны, обрабатываем все
if satellites:
objitems = ObjItem.objects.filter(
parameter_obj__id_satellite__in=satellites
).select_related('parameter_obj', 'parameter_obj__polarization')
else:
objitems = ObjItem.objects.filter(
parameter_obj__isnull=False
).select_related('parameter_obj', 'parameter_obj__polarization')
linked_count = 0
total_count = objitems.count()
for objitem in objitems:
if not hasattr(objitem, 'parameter_obj') or not objitem.parameter_obj:
continue
param = objitem.parameter_obj
# Округляем частоту объекта
if param.frequency:
rounded_freq = round(param.frequency, 0) # Округление до целого
# Ищем подходящий источник LyngSat
# Сравниваем по округленной частоте и поляризации
lyngsat_sources = LyngSat.objects.filter(
id_satellite=param.id_satellite,
polarization=param.polarization,
frequency__gte=rounded_freq - frequency_tolerance,
frequency__lte=rounded_freq + frequency_tolerance
).order_by('frequency')
if lyngsat_sources.exists():
# Берем первый подходящий источник
objitem.lyngsat_source = lyngsat_sources.first()
objitem.save(update_fields=['lyngsat_source'])
linked_count += 1
messages.success(
self.request,
f"Привязано {linked_count} из {total_count} объектов к источникам LyngSat"
)
return redirect("mainapp:link_lyngsat")
def form_invalid(self, form):
return self.render_to_response(self.get_context_data(form=form))
class LyngsatDataAPIView(LoginRequiredMixin, View):
"""API для получения данных LyngSat источника"""
def get(self, request, lyngsat_id):
from lyngsatapp.models import LyngSat
try:
lyngsat = LyngSat.objects.select_related(
'id_satellite',
'polarization',
'modulation',
'standard'
).get(id=lyngsat_id)
# Форматируем дату с учетом локального времени
last_update_str = '-'
if lyngsat.last_update:
local_time = timezone.localtime(lyngsat.last_update)
last_update_str = local_time.strftime("%d.%m.%Y")
data = {
'id': lyngsat.id,
'satellite': lyngsat.id_satellite.name if lyngsat.id_satellite else '-',
'frequency': f"{lyngsat.frequency:.3f}" if lyngsat.frequency else '-',
'polarization': lyngsat.polarization.name if lyngsat.polarization else '-',
'modulation': lyngsat.modulation.name if lyngsat.modulation else '-',
'standard': lyngsat.standard.name if lyngsat.standard else '-',
'sym_velocity': f"{lyngsat.sym_velocity:.0f}" if lyngsat.sym_velocity else '-',
'fec': lyngsat.fec or '-',
'channel_info': lyngsat.channel_info or '-',
'last_update': last_update_str,
'url': lyngsat.url or None,
}
return JsonResponse(data)
except LyngSat.DoesNotExist:
return JsonResponse({'error': 'Источник LyngSat не найден'}, status=404)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
class ProcessKubsatView(LoginRequiredMixin, FormMessageMixin, FormView):
template_name = "mainapp/process_kubsat.html"
form_class = NewEventForm
error_message = "Форма заполнена некорректно"
def form_valid(self, form):
uploaded_file = self.request.FILES["file"]
try:
content = uploaded_file.read()
df = kub_report(BytesIO(content))
output = BytesIO()
with pd.ExcelWriter(output, engine="openpyxl") as writer:
df.to_excel(writer, index=False, sheet_name="Результат")
output.seek(0)
response = HttpResponse(
output.getvalue(),
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
response["Content-Disposition"] = (
'attachment; filename="kubsat_report.xlsx"'
)
messages.success(self.request, "Событие успешно обработано!")
return response
except Exception as e:
messages.error(self.request, f"Ошибка при обработке файла: {str(e)}")
return redirect("mainapp:kubsat_excel")
class DeleteSelectedObjectsView(RoleRequiredMixin, View):
required_roles = ["admin", "moderator"]
def post(self, request):
ids = request.POST.get("ids", "")
if not ids:
return JsonResponse({"error": "Нет ID для удаления"}, status=400)
try:
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
deleted_count, _ = ObjItem.objects.filter(id__in=id_list).delete()
return JsonResponse(
{
"success": True,
"message": "Объект успешно удалён",
"deleted_count": deleted_count,
}
)
except Exception as e:
return JsonResponse({"error": f"Ошибка при удалении: {str(e)}"}, status=500)
class ObjItemListView(LoginRequiredMixin, View):
def get(self, request):
satellites = (
Satellite.objects.filter(parameters__objitem__isnull=False)
.distinct()
.only("id", "name")
.order_by("name")
)
selected_sat_id = request.GET.get("satellite_id")
page_number, items_per_page = parse_pagination_params(request)
sort_param = request.GET.get("sort", "")
freq_min = request.GET.get("freq_min")
freq_max = request.GET.get("freq_max")
range_min = request.GET.get("range_min")
range_max = request.GET.get("range_max")
snr_min = request.GET.get("snr_min")
snr_max = request.GET.get("snr_max")
bod_min = request.GET.get("bod_min")
bod_max = request.GET.get("bod_max")
search_query = request.GET.get("search")
selected_modulations = request.GET.getlist("modulation")
selected_polarizations = request.GET.getlist("polarization")
selected_satellites = request.GET.getlist("satellite_id")
has_kupsat = request.GET.get("has_kupsat")
has_valid = request.GET.get("has_valid")
date_from = request.GET.get("date_from")
date_to = request.GET.get("date_to")
objects = ObjItem.objects.none()
if selected_satellites or selected_sat_id:
if selected_sat_id and not selected_satellites:
try:
selected_sat_id_single = int(selected_sat_id)
selected_satellites = [selected_sat_id_single]
except ValueError:
selected_satellites = []
if selected_satellites:
objects = (
ObjItem.objects.select_related(
"geo_obj",
"updated_by__user",
"created_by__user",
"lyngsat_source",
"parameter_obj",
"parameter_obj__id_satellite",
"parameter_obj__polarization",
"parameter_obj__modulation",
"parameter_obj__standard",
)
.filter(parameter_obj__id_satellite_id__in=selected_satellites)
)
else:
objects = ObjItem.objects.select_related(
"geo_obj",
"updated_by__user",
"created_by__user",
"lyngsat_source",
"parameter_obj",
"parameter_obj__id_satellite",
"parameter_obj__polarization",
"parameter_obj__modulation",
"parameter_obj__standard",
)
if freq_min is not None and freq_min.strip() != "":
try:
freq_min_val = float(freq_min)
objects = objects.filter(
parameter_obj__frequency__gte=freq_min_val
)
except ValueError:
pass
if freq_max is not None and freq_max.strip() != "":
try:
freq_max_val = float(freq_max)
objects = objects.filter(
parameter_obj__frequency__lte=freq_max_val
)
except ValueError:
pass
if range_min is not None and range_min.strip() != "":
try:
range_min_val = float(range_min)
objects = objects.filter(
parameter_obj__freq_range__gte=range_min_val
)
except ValueError:
pass
if range_max is not None and range_max.strip() != "":
try:
range_max_val = float(range_max)
objects = objects.filter(
parameter_obj__freq_range__lte=range_max_val
)
except ValueError:
pass
if snr_min is not None and snr_min.strip() != "":
try:
snr_min_val = float(snr_min)
objects = objects.filter(parameter_obj__snr__gte=snr_min_val)
except ValueError:
pass
if snr_max is not None and snr_max.strip() != "":
try:
snr_max_val = float(snr_max)
objects = objects.filter(parameter_obj__snr__lte=snr_max_val)
except ValueError:
pass
if bod_min is not None and bod_min.strip() != "":
try:
bod_min_val = float(bod_min)
objects = objects.filter(
parameter_obj__bod_velocity__gte=bod_min_val
)
except ValueError:
pass
if bod_max is not None and bod_max.strip() != "":
try:
bod_max_val = float(bod_max)
objects = objects.filter(
parameter_obj__bod_velocity__lte=bod_max_val
)
except ValueError:
pass
if selected_modulations:
objects = objects.filter(
parameter_obj__modulation__id__in=selected_modulations
)
if selected_polarizations:
objects = objects.filter(
parameter_obj__polarization__id__in=selected_polarizations
)
if has_kupsat == "1":
objects = objects.filter(geo_obj__coords_kupsat__isnull=False)
elif has_kupsat == "0":
objects = objects.filter(geo_obj__coords_kupsat__isnull=True)
if has_valid == "1":
objects = objects.filter(geo_obj__coords_valid__isnull=False)
elif has_valid == "0":
objects = objects.filter(geo_obj__coords_valid__isnull=True)
# Date filter for geo_obj timestamp
date_from = request.GET.get("date_from")
date_to = request.GET.get("date_to")
if date_from and date_from.strip():
try:
from datetime import datetime
date_from_obj = datetime.strptime(date_from, "%Y-%m-%d")
objects = objects.filter(geo_obj__timestamp__gte=date_from_obj)
except (ValueError, TypeError):
pass
if date_to and date_to.strip():
try:
from datetime import datetime, timedelta
date_to_obj = datetime.strptime(date_to, "%Y-%m-%d")
# Add one day to include the entire end date
date_to_obj = date_to_obj + timedelta(days=1)
objects = objects.filter(geo_obj__timestamp__lt=date_to_obj)
except (ValueError, TypeError):
pass
if search_query:
search_query = search_query.strip()
if search_query:
objects = objects.filter(
models.Q(name__icontains=search_query)
| models.Q(geo_obj__location__icontains=search_query)
)
else:
selected_sat_id = None
objects = objects.annotate(
first_param_freq=F("parameter_obj__frequency"),
first_param_range=F("parameter_obj__freq_range"),
first_param_snr=F("parameter_obj__snr"),
first_param_bod=F("parameter_obj__bod_velocity"),
first_param_sat_name=F("parameter_obj__id_satellite__name"),
first_param_pol_name=F("parameter_obj__polarization__name"),
first_param_mod_name=F("parameter_obj__modulation__name"),
)
valid_sort_fields = {
"name": "name",
"-name": "-name",
"updated_at": "updated_at",
"-updated_at": "-updated_at",
"created_at": "created_at",
"-created_at": "-created_at",
"updated_by": "updated_by__user__username",
"-updated_by": "-updated_by__user__username",
"created_by": "created_by__user__username",
"-created_by": "-created_by__user__username",
"geo_timestamp": "geo_obj__timestamp",
"-geo_timestamp": "-geo_obj__timestamp",
"frequency": "first_param_freq",
"-frequency": "-first_param_freq",
"freq_range": "first_param_range",
"-freq_range": "-first_param_range",
"snr": "first_param_snr",
"-snr": "-first_param_snr",
"bod_velocity": "first_param_bod",
"-bod_velocity": "-first_param_bod",
"satellite": "first_param_sat_name",
"-satellite": "-first_param_sat_name",
"polarization": "first_param_pol_name",
"-polarization": "-first_param_pol_name",
"modulation": "first_param_mod_name",
"-modulation": "-first_param_mod_name",
}
if sort_param in valid_sort_fields:
objects = objects.order_by(valid_sort_fields[sort_param])
paginator = Paginator(objects, items_per_page)
page_obj = paginator.get_page(page_number)
processed_objects = []
for obj in page_obj:
param = getattr(obj, 'parameter_obj', None)
geo_coords = "-"
geo_timestamp = "-"
geo_location = "-"
kupsat_coords = "-"
valid_coords = "-"
distance_geo_kup = "-"
distance_geo_valid = "-"
distance_kup_valid = "-"
if hasattr(obj, "geo_obj") and obj.geo_obj:
geo_timestamp = obj.geo_obj.timestamp
geo_location = obj.geo_obj.location
if obj.geo_obj.coords:
longitude = obj.geo_obj.coords.coords[0]
latitude = obj.geo_obj.coords.coords[1]
lon = f"{longitude}E" if longitude > 0 else f"{abs(longitude)}W"
lat = f"{latitude}N" if latitude > 0 else f"{abs(latitude)}S"
geo_coords = f"{lat} {lon}"
if obj.geo_obj.coords_kupsat:
longitude = obj.geo_obj.coords_kupsat.coords[0]
latitude = obj.geo_obj.coords_kupsat.coords[1]
lon = f"{longitude}E" if longitude > 0 else f"{abs(longitude)}W"
lat = f"{latitude}N" if latitude > 0 else f"{abs(latitude)}S"
kupsat_coords = f"{lat} {lon}"
elif obj.geo_obj.coords_kupsat is not None:
kupsat_coords = "-"
if obj.geo_obj.coords_valid:
longitude = obj.geo_obj.coords_valid.coords[0]
latitude = obj.geo_obj.coords_valid.coords[1]
lon = f"{longitude}E" if longitude > 0 else f"{abs(longitude)}W"
lat = f"{latitude}N" if latitude > 0 else f"{abs(latitude)}S"
valid_coords = f"{lat} {lon}"
elif obj.geo_obj.coords_valid is not None:
valid_coords = "-"
if obj.geo_obj.distance_coords_kup is not None:
distance_geo_kup = f"{obj.geo_obj.distance_coords_kup:.3f}"
if obj.geo_obj.distance_coords_valid is not None:
distance_geo_valid = f"{obj.geo_obj.distance_coords_valid:.3f}"
if obj.geo_obj.distance_kup_valid is not None:
distance_kup_valid = f"{obj.geo_obj.distance_kup_valid:.3f}"
satellite_name = "-"
frequency = "-"
freq_range = "-"
polarization_name = "-"
bod_velocity = "-"
modulation_name = "-"
snr = "-"
standard_name = "-"
comment = "-"
is_average = "-"
if param:
if hasattr(param, "id_satellite") and param.id_satellite:
satellite_name = (
param.id_satellite.name
if hasattr(param.id_satellite, "name")
else "-"
)
frequency = (
f"{param.frequency:.3f}" if param.frequency is not None else "-"
)
freq_range = (
f"{param.freq_range:.3f}" if param.freq_range is not None else "-"
)
bod_velocity = (
f"{param.bod_velocity:.0f}"
if param.bod_velocity is not None
else "-"
)
snr = f"{param.snr:.0f}" if param.snr is not None else "-"
if hasattr(param, "polarization") and param.polarization:
polarization_name = (
param.polarization.name
if hasattr(param.polarization, "name")
else "-"
)
if hasattr(param, "modulation") and param.modulation:
modulation_name = (
param.modulation.name
if hasattr(param.modulation, "name")
else "-"
)
if hasattr(param, "standard") and param.standard:
standard_name = (
param.standard.name
if hasattr(param.standard, "name")
else "-"
)
if hasattr(obj, "geo_obj") and obj.geo_obj:
comment = obj.geo_obj.comment or "-"
is_average = "Да" if obj.geo_obj.is_average else "Нет" if obj.geo_obj.is_average is not None else "-"
# Check if LyngSat source is linked
source_type = "ТВ" if obj.lyngsat_source else "-"
processed_objects.append(
{
"id": obj.id,
"name": obj.name or "-",
"satellite_name": satellite_name,
"frequency": frequency,
"freq_range": freq_range,
"polarization": polarization_name,
"bod_velocity": bod_velocity,
"modulation": modulation_name,
"snr": snr,
"geo_timestamp": geo_timestamp,
"geo_location": geo_location,
"geo_coords": geo_coords,
"kupsat_coords": kupsat_coords,
"valid_coords": valid_coords,
"distance_geo_kup": distance_geo_kup,
"distance_geo_valid": distance_geo_valid,
"distance_kup_valid": distance_kup_valid,
"updated_by": obj.updated_by if obj.updated_by else "-",
"comment": comment,
"is_average": is_average,
"source_type": source_type,
"standard": standard_name,
"obj": obj,
}
)
modulations = Modulation.objects.all()
polarizations = Polarization.objects.all()
context = {
"satellites": satellites,
"selected_satellite_id": selected_sat_id,
"page_obj": page_obj,
"processed_objects": processed_objects,
"items_per_page": items_per_page,
"available_items_per_page": [50, 100, 500, 1000],
"freq_min": freq_min,
"freq_max": freq_max,
"range_min": range_min,
"range_max": range_max,
"snr_min": snr_min,
"snr_max": snr_max,
"bod_min": bod_min,
"bod_max": bod_max,
"search_query": search_query,
"selected_modulations": [
int(x) for x in selected_modulations if x.isdigit()
],
"selected_polarizations": [
int(x) for x in selected_polarizations if x.isdigit()
],
"selected_satellites": [int(x) for x in selected_satellites if x.isdigit()],
"has_kupsat": has_kupsat,
"has_valid": has_valid,
"date_from": date_from,
"date_to": date_to,
"modulations": modulations,
"polarizations": polarizations,
"full_width_page": True,
"sort": sort_param,
}
return render(request, "mainapp/objitem_list.html", context)
class ObjItemFormView(
RoleRequiredMixin, CoordinateProcessingMixin, FormMessageMixin, UpdateView
):
"""
Базовый класс для создания и редактирования ObjItem.
Содержит общую логику обработки форм, координат и параметров.
"""
model = ObjItem
form_class = ObjItemForm
template_name = "mainapp/objitem_form.html"
success_url = reverse_lazy("mainapp:home")
required_roles = ["admin", "moderator"]
def get_success_url(self):
"""Возвращает URL с сохраненными параметрами фильтров."""
# Получаем все параметры из GET запроса и сохраняем их в URL
if self.request.GET:
from urllib.parse import urlencode
query_string = urlencode(self.request.GET)
return reverse_lazy("mainapp:objitem_list") + '?' + query_string
return reverse_lazy("mainapp:objitem_list")
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["LEAFLET_CONFIG"] = {
"DEFAULT_CENTER": (55.75, 37.62),
"DEFAULT_ZOOM": 5,
}
# Сохраняем параметры возврата для кнопки "Назад"
context["return_params"] = self.request.GET.get('return_params', '')
# Работаем с одной формой параметра вместо formset
if self.object and hasattr(self.object, "parameter_obj") and self.object.parameter_obj:
context["parameter_form"] = ParameterForm(
instance=self.object.parameter_obj, prefix="parameter"
)
else:
context["parameter_form"] = ParameterForm(prefix="parameter")
if self.object and hasattr(self.object, "geo_obj") and self.object.geo_obj:
context["geo_form"] = GeoForm(
instance=self.object.geo_obj, prefix="geo"
)
else:
context["geo_form"] = GeoForm(prefix="geo")
return context
def form_valid(self, form):
# Получаем форму параметра
if self.object and hasattr(self.object, "parameter_obj") and self.object.parameter_obj:
parameter_form = ParameterForm(
self.request.POST,
instance=self.object.parameter_obj,
prefix="parameter"
)
else:
parameter_form = ParameterForm(self.request.POST, prefix="parameter")
if self.object and hasattr(self.object, "geo_obj") and self.object.geo_obj:
geo_form = GeoForm(self.request.POST, instance=self.object.geo_obj, prefix="geo")
else:
geo_form = GeoForm(self.request.POST, prefix="geo")
# Сохраняем основной объект
self.object = form.save(commit=False)
self.set_user_fields()
self.object.save()
# Сохраняем связанный параметр
if parameter_form.is_valid():
self.save_parameter(parameter_form)
else:
context = self.get_context_data()
context.update({
'form': form,
'parameter_form': parameter_form,
'geo_form': geo_form,
})
return self.render_to_response(context)
# Сохраняем геоданные
if geo_form.is_valid():
self.save_geo_data(geo_form)
else:
context = self.get_context_data()
context.update({
'form': form,
'parameter_form': parameter_form,
'geo_form': geo_form,
})
return self.render_to_response(context)
return super().form_valid(form)
def set_user_fields(self):
"""Устанавливает поля пользователя для объекта."""
raise NotImplementedError("Subclasses must implement set_user_fields()")
def save_parameter(self, parameter_form):
"""Сохраняет параметр объекта через OneToOne связь."""
if parameter_form.is_valid():
instance = parameter_form.save(commit=False)
instance.objitem = self.object
instance.save()
def save_geo_data(self, geo_form):
"""Сохраняет геоданные объекта."""
geo_instance = self.get_or_create_geo_instance()
# Обновляем поля из geo_form
if geo_form.is_valid():
geo_instance.location = geo_form.cleaned_data["location"]
geo_instance.comment = geo_form.cleaned_data["comment"]
geo_instance.is_average = geo_form.cleaned_data["is_average"]
# Обрабатываем координаты
self.process_coordinates(geo_instance)
# Обрабатываем дату/время
self.process_timestamp(geo_instance)
geo_instance.save()
def get_or_create_geo_instance(self):
"""Получает или создает экземпляр Geo."""
if hasattr(self.object, "geo_obj") and self.object.geo_obj:
return self.object.geo_obj
return Geo(objitem=self.object)
class ObjItemUpdateView(ObjItemFormView):
"""Представление для редактирования ObjItem."""
success_message = "Объект успешно сохранён!"
def set_user_fields(self):
self.object.updated_by = self.request.user.customuser
class ObjItemCreateView(ObjItemFormView, CreateView):
"""Представление для создания ObjItem."""
success_message = "Объект успешно создан!"
def set_user_fields(self):
self.object.created_by = self.request.user.customuser
self.object.updated_by = self.request.user.customuser
class ObjItemDeleteView(RoleRequiredMixin, FormMessageMixin, DeleteView):
model = ObjItem
template_name = "mainapp/objitem_confirm_delete.html"
success_url = reverse_lazy("mainapp:objitem_list")
success_message = "Объект успешно удалён!"
required_roles = ["admin", "moderator"]
def get_success_url(self):
"""Возвращает URL с сохраненными параметрами фильтров."""
# Получаем все параметры из GET запроса и сохраняем их в URL
if self.request.GET:
from urllib.parse import urlencode
query_string = urlencode(self.request.GET)
return reverse_lazy("mainapp:objitem_list") + '?' + query_string
return reverse_lazy("mainapp:objitem_list")
class ObjItemDetailView(LoginRequiredMixin, View):
"""
Представление для просмотра деталей ObjItem в режиме чтения.
Доступно для всех авторизованных пользователей, показывает данные в режиме чтения.
"""
def get(self, request, pk):
obj = ObjItem.objects.filter(pk=pk).select_related(
'geo_obj',
'updated_by__user',
'created_by__user',
'parameter_obj',
'parameter_obj__id_satellite',
'parameter_obj__polarization',
'parameter_obj__modulation',
'parameter_obj__standard',
).first()
if not obj:
from django.http import Http404
raise Http404("Объект не найден")
# Сохраняем параметры возврата для кнопки "Назад"
return_params = request.GET.get('return_params', '')
context = {
'object': obj,
'return_params': return_params
}
return render(request, "mainapp/objitem_detail.html", context)
class FillLyngsatDataView(LoginRequiredMixin, FormMessageMixin, FormView):
"""
Представление для заполнения данных из Lyngsat.
Позволяет выбрать спутники и регионы для парсинга данных с сайта Lyngsat.
Запускает асинхронную задачу Celery для обработки.
"""
template_name = "mainapp/fill_lyngsat_data.html"
form_class = FillLyngsatDataForm
success_url = reverse_lazy("mainapp:lyngsat_task_status")
error_message = "Форма заполнена некорректно"
def form_valid(self, form):
satellites = form.cleaned_data["satellites"]
regions = form.cleaned_data["regions"]
use_cache = form.cleaned_data.get("use_cache", True)
force_refresh = form.cleaned_data.get("force_refresh", False)
# Получаем названия спутников
target_sats = [sat.name for sat in satellites]
try:
from lyngsatapp.tasks import fill_lyngsat_data_task
# Запускаем асинхронную задачу с параметрами кеширования
task = fill_lyngsat_data_task.delay(
target_sats,
regions,
force_refresh=force_refresh,
use_cache=use_cache
)
cache_status = "без кеша" if not use_cache else ("с обновлением кеша" if force_refresh else "с кешированием")
messages.success(
self.request,
f"Задача запущена ({cache_status})! ID задачи: {task.id}. "
"Вы будете перенаправлены на страницу отслеживания прогресса."
)
# Перенаправляем на страницу статуса задачи
return redirect('mainapp:lyngsat_task_status', task_id=task.id)
except Exception as e:
messages.error(self.request, f"Ошибка при запуске задачи: {str(e)}")
return redirect("mainapp:fill_lyngsat_data")
class LyngsatTaskStatusView(LoginRequiredMixin, View):
"""
Представление для отслеживания статуса задачи заполнения данных Lyngsat.
"""
template_name = "mainapp/lyngsat_task_status.html"
def get(self, request, task_id=None):
context = {
'task_id': task_id
}
return render(request, self.template_name, context)
class LyngsatTaskStatusAPIView(LoginRequiredMixin, View):
"""
API для получения статуса задачи Celery.
"""
def get(self, request, task_id):
from celery.result import AsyncResult
from django.core.cache import cache
task = AsyncResult(task_id)
response_data = {
'task_id': task_id,
'state': task.state,
'result': None,
'error': None
}
if task.state == 'PENDING':
response_data['status'] = 'Задача в очереди...'
elif task.state == 'PROGRESS':
response_data['status'] = task.info.get('status', '')
response_data['current'] = task.info.get('current', 0)
response_data['total'] = task.info.get('total', 1)
response_data['percent'] = int((task.info.get('current', 0) / task.info.get('total', 1)) * 100)
elif task.state == 'SUCCESS':
# Получаем результат из кеша
result = cache.get(f'lyngsat_task_{task_id}')
if result:
response_data['result'] = result
response_data['status'] = 'Задача завершена успешно'
else:
response_data['result'] = task.result
response_data['status'] = 'Задача завершена'
elif task.state == 'FAILURE':
response_data['status'] = 'Ошибка при выполнении задачи'
response_data['error'] = str(task.info)
else:
response_data['status'] = task.state
return JsonResponse(response_data)
class ClearLyngsatCacheView(LoginRequiredMixin, View):
"""
Представление для очистки кеша LyngSat.
"""
def post(self, request):
from lyngsatapp.tasks import clear_cache_task
cache_type = request.POST.get('cache_type', 'all')
try:
# Запускаем задачу очистки кеша
task = clear_cache_task.delay(cache_type)
messages.success(
request,
f"Задача очистки кеша ({cache_type}) запущена! ID задачи: {task.id}"
)
except Exception as e:
messages.error(request, f"Ошибка при запуске задачи очистки кеша: {str(e)}")
return redirect(request.META.get('HTTP_REFERER', 'mainapp:home'))
def get(self, request):
"""Страница управления кешем"""
return render(request, 'mainapp/clear_lyngsat_cache.html')