983 lines
40 KiB
Python
983 lines
40 KiB
Python
"""
|
||
Source related views.
|
||
"""
|
||
import json
|
||
from datetime import datetime
|
||
|
||
from django.contrib import messages
|
||
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
|
||
from django.contrib.gis.geos import Point, Polygon as GEOSPolygon
|
||
from django.core.paginator import Paginator
|
||
from django.db.models import Count, Prefetch, Q
|
||
from django.http import JsonResponse
|
||
from django.shortcuts import get_object_or_404, redirect, render
|
||
from django.urls import reverse
|
||
from django.views import View
|
||
|
||
from ..forms import SourceForm
|
||
from ..models import Source, Satellite
|
||
from ..utils import format_coords_display, parse_pagination_params
|
||
|
||
|
||
class SourceListView(LoginRequiredMixin, View):
|
||
"""
|
||
View for displaying a list of sources (Source).
|
||
"""
|
||
|
||
def get(self, request):
|
||
# Get pagination parameters
|
||
page_number, items_per_page = parse_pagination_params(request)
|
||
|
||
# Get sorting parameters (default to ID ascending)
|
||
sort_param = request.GET.get("sort", "id")
|
||
|
||
# Get filter parameters - Source level
|
||
search_query = request.GET.get("search", "").strip()
|
||
has_coords_average = request.GET.get("has_coords_average")
|
||
has_coords_kupsat = request.GET.get("has_coords_kupsat")
|
||
has_coords_valid = request.GET.get("has_coords_valid")
|
||
has_coords_reference = request.GET.get("has_coords_reference")
|
||
selected_info = request.GET.getlist("info_id")
|
||
selected_ownership = request.GET.getlist("ownership_id")
|
||
objitem_count_min = request.GET.get("objitem_count_min", "").strip()
|
||
objitem_count_max = request.GET.get("objitem_count_max", "").strip()
|
||
date_from = request.GET.get("date_from", "").strip()
|
||
date_to = request.GET.get("date_to", "").strip()
|
||
# Signal mark filters
|
||
has_signal_mark = request.GET.get("has_signal_mark")
|
||
mark_date_from = request.GET.get("mark_date_from", "").strip()
|
||
mark_date_to = request.GET.get("mark_date_to", "").strip()
|
||
|
||
# Get filter parameters - ObjItem level (параметры точек)
|
||
geo_date_from = request.GET.get("geo_date_from", "").strip()
|
||
geo_date_to = request.GET.get("geo_date_to", "").strip()
|
||
selected_satellites = request.GET.getlist("satellite_id")
|
||
selected_polarizations = request.GET.getlist("polarization_id")
|
||
selected_modulations = request.GET.getlist("modulation_id")
|
||
selected_mirrors = request.GET.getlist("mirror_id")
|
||
freq_min = request.GET.get("freq_min", "").strip()
|
||
freq_max = request.GET.get("freq_max", "").strip()
|
||
freq_range_min = request.GET.get("freq_range_min", "").strip()
|
||
freq_range_max = request.GET.get("freq_range_max", "").strip()
|
||
bod_velocity_min = request.GET.get("bod_velocity_min", "").strip()
|
||
bod_velocity_max = request.GET.get("bod_velocity_max", "").strip()
|
||
snr_min = request.GET.get("snr_min", "").strip()
|
||
snr_max = request.GET.get("snr_max", "").strip()
|
||
|
||
# Get polygon filter
|
||
polygon_coords_str = request.GET.get("polygon", "").strip()
|
||
polygon_coords = None
|
||
polygon_geom = None
|
||
|
||
if polygon_coords_str:
|
||
try:
|
||
polygon_coords = json.loads(polygon_coords_str)
|
||
if polygon_coords and len(polygon_coords) >= 4:
|
||
# Create GEOS Polygon from coordinates
|
||
# Coordinates are in [lng, lat] format
|
||
polygon_geom = GEOSPolygon(polygon_coords, srid=4326)
|
||
except (json.JSONDecodeError, ValueError, TypeError) as e:
|
||
# Invalid polygon data, ignore
|
||
polygon_coords = None
|
||
polygon_geom = None
|
||
|
||
# Get all satellites for filter
|
||
satellites = (
|
||
Satellite.objects.filter(parameters__objitem__source__isnull=False)
|
||
.distinct()
|
||
.only("id", "name")
|
||
.order_by("name")
|
||
)
|
||
|
||
# Get all polarizations, modulations for filters
|
||
from ..models import Polarization, Modulation, ObjectInfo
|
||
polarizations = Polarization.objects.all().order_by("name")
|
||
modulations = Modulation.objects.all().order_by("name")
|
||
|
||
# Get all ObjectInfo for filter
|
||
object_infos = ObjectInfo.objects.all().order_by("name")
|
||
|
||
# Get all ObjectOwnership for filter
|
||
from ..models import ObjectOwnership
|
||
object_ownerships = ObjectOwnership.objects.all().order_by("name")
|
||
|
||
# Get all satellites that are used as mirrors
|
||
mirrors = (
|
||
Satellite.objects.filter(geo_mirrors__isnull=False)
|
||
.distinct()
|
||
.only("id", "name")
|
||
.order_by("name")
|
||
)
|
||
|
||
# Build Q object for filtering objitems in count
|
||
# This will be used in the annotate to count only objitems that match filters
|
||
objitem_filter_q = Q()
|
||
has_objitem_filter = False
|
||
|
||
# Check if search is by name (not by ID)
|
||
search_by_name = False
|
||
if search_query:
|
||
try:
|
||
int(search_query) # Try to parse as ID
|
||
except ValueError:
|
||
# Not a number, so it's a name search
|
||
search_by_name = True
|
||
objitem_filter_q &= Q(source_objitems__name__icontains=search_query)
|
||
has_objitem_filter = True
|
||
|
||
# Add geo date filter
|
||
if geo_date_from:
|
||
try:
|
||
geo_date_from_obj = datetime.strptime(geo_date_from, "%Y-%m-%d")
|
||
objitem_filter_q &= Q(source_objitems__geo_obj__timestamp__gte=geo_date_from_obj)
|
||
has_objitem_filter = True
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if geo_date_to:
|
||
try:
|
||
from datetime import timedelta
|
||
geo_date_to_obj = datetime.strptime(geo_date_to, "%Y-%m-%d")
|
||
# Add one day to include entire end date
|
||
geo_date_to_obj = geo_date_to_obj + timedelta(days=1)
|
||
objitem_filter_q &= Q(source_objitems__geo_obj__timestamp__lt=geo_date_to_obj)
|
||
has_objitem_filter = True
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Add satellite filter to count
|
||
if selected_satellites:
|
||
objitem_filter_q &= Q(source_objitems__parameter_obj__id_satellite_id__in=selected_satellites)
|
||
has_objitem_filter = True
|
||
|
||
# Add polarization filter
|
||
if selected_polarizations:
|
||
objitem_filter_q &= Q(source_objitems__parameter_obj__polarization_id__in=selected_polarizations)
|
||
has_objitem_filter = True
|
||
|
||
# Add modulation filter
|
||
if selected_modulations:
|
||
objitem_filter_q &= Q(source_objitems__parameter_obj__modulation_id__in=selected_modulations)
|
||
has_objitem_filter = True
|
||
|
||
# Add frequency filter
|
||
if freq_min:
|
||
try:
|
||
freq_min_val = float(freq_min)
|
||
objitem_filter_q &= Q(source_objitems__parameter_obj__frequency__gte=freq_min_val)
|
||
has_objitem_filter = True
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if freq_max:
|
||
try:
|
||
freq_max_val = float(freq_max)
|
||
objitem_filter_q &= Q(source_objitems__parameter_obj__frequency__lte=freq_max_val)
|
||
has_objitem_filter = True
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Add frequency range (bandwidth) filter
|
||
if freq_range_min:
|
||
try:
|
||
freq_range_min_val = float(freq_range_min)
|
||
objitem_filter_q &= Q(source_objitems__parameter_obj__freq_range__gte=freq_range_min_val)
|
||
has_objitem_filter = True
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if freq_range_max:
|
||
try:
|
||
freq_range_max_val = float(freq_range_max)
|
||
objitem_filter_q &= Q(source_objitems__parameter_obj__freq_range__lte=freq_range_max_val)
|
||
has_objitem_filter = True
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Add symbol rate (bod_velocity) filter
|
||
if bod_velocity_min:
|
||
try:
|
||
bod_velocity_min_val = float(bod_velocity_min)
|
||
objitem_filter_q &= Q(source_objitems__parameter_obj__bod_velocity__gte=bod_velocity_min_val)
|
||
has_objitem_filter = True
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if bod_velocity_max:
|
||
try:
|
||
bod_velocity_max_val = float(bod_velocity_max)
|
||
objitem_filter_q &= Q(source_objitems__parameter_obj__bod_velocity__lte=bod_velocity_max_val)
|
||
has_objitem_filter = True
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Add SNR filter
|
||
if snr_min:
|
||
try:
|
||
snr_min_val = float(snr_min)
|
||
objitem_filter_q &= Q(source_objitems__parameter_obj__snr__gte=snr_min_val)
|
||
has_objitem_filter = True
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if snr_max:
|
||
try:
|
||
snr_max_val = float(snr_max)
|
||
objitem_filter_q &= Q(source_objitems__parameter_obj__snr__lte=snr_max_val)
|
||
has_objitem_filter = True
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Add mirrors filter
|
||
if selected_mirrors:
|
||
objitem_filter_q &= Q(source_objitems__geo_obj__mirrors__id__in=selected_mirrors)
|
||
has_objitem_filter = True
|
||
|
||
# Add polygon filter
|
||
if polygon_geom:
|
||
objitem_filter_q &= Q(source_objitems__geo_obj__coords__within=polygon_geom)
|
||
has_objitem_filter = True
|
||
|
||
# Build filtered objitems queryset for prefetch
|
||
from ..models import ObjItem
|
||
filtered_objitems_qs = ObjItem.objects.select_related(
|
||
'parameter_obj',
|
||
'parameter_obj__id_satellite',
|
||
'parameter_obj__polarization',
|
||
'parameter_obj__modulation',
|
||
'parameter_obj__standard',
|
||
'geo_obj',
|
||
'lyngsat_source',
|
||
'lyngsat_source__id_satellite',
|
||
'lyngsat_source__polarization',
|
||
'lyngsat_source__modulation',
|
||
'lyngsat_source__standard',
|
||
'transponder',
|
||
'created_by',
|
||
'created_by__user',
|
||
'updated_by',
|
||
'updated_by__user',
|
||
).prefetch_related(
|
||
'geo_obj__mirrors',
|
||
)
|
||
|
||
# Apply the same filters to prefetch queryset
|
||
if search_by_name:
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(name__icontains=search_query)
|
||
if geo_date_from:
|
||
try:
|
||
geo_date_from_obj = datetime.strptime(geo_date_from, "%Y-%m-%d")
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(geo_obj__timestamp__gte=geo_date_from_obj)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if geo_date_to:
|
||
try:
|
||
from datetime import timedelta
|
||
geo_date_to_obj = datetime.strptime(geo_date_to, "%Y-%m-%d")
|
||
geo_date_to_obj = geo_date_to_obj + timedelta(days=1)
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(geo_obj__timestamp__lt=geo_date_to_obj)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if selected_satellites:
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__id_satellite_id__in=selected_satellites)
|
||
if selected_polarizations:
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__polarization_id__in=selected_polarizations)
|
||
if selected_modulations:
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__modulation_id__in=selected_modulations)
|
||
if freq_min:
|
||
try:
|
||
freq_min_val = float(freq_min)
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__frequency__gte=freq_min_val)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if freq_max:
|
||
try:
|
||
freq_max_val = float(freq_max)
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__frequency__lte=freq_max_val)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if freq_range_min:
|
||
try:
|
||
freq_range_min_val = float(freq_range_min)
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__freq_range__gte=freq_range_min_val)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if freq_range_max:
|
||
try:
|
||
freq_range_max_val = float(freq_range_max)
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__freq_range__lte=freq_range_max_val)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if bod_velocity_min:
|
||
try:
|
||
bod_velocity_min_val = float(bod_velocity_min)
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__bod_velocity__gte=bod_velocity_min_val)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if bod_velocity_max:
|
||
try:
|
||
bod_velocity_max_val = float(bod_velocity_max)
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__bod_velocity__lte=bod_velocity_max_val)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if snr_min:
|
||
try:
|
||
snr_min_val = float(snr_min)
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__snr__gte=snr_min_val)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if snr_max:
|
||
try:
|
||
snr_max_val = float(snr_max)
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__snr__lte=snr_max_val)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if selected_mirrors:
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(geo_obj__mirrors__id__in=selected_mirrors)
|
||
if polygon_geom:
|
||
filtered_objitems_qs = filtered_objitems_qs.filter(geo_obj__coords__within=polygon_geom)
|
||
|
||
# Get all Source objects with query optimization
|
||
# Using annotate to count ObjItems efficiently (single query with GROUP BY)
|
||
# Using select_related for ForeignKey/OneToOne relationships to avoid N+1 queries
|
||
# Using Prefetch with filtered queryset to avoid N+1 queries in display loop
|
||
sources = Source.objects.select_related(
|
||
'info', # ForeignKey to ObjectInfo
|
||
'created_by', # ForeignKey to CustomUser
|
||
'created_by__user', # OneToOne to User
|
||
'updated_by', # ForeignKey to CustomUser
|
||
'updated_by__user', # OneToOne to User
|
||
).prefetch_related(
|
||
# Use Prefetch with filtered queryset
|
||
Prefetch('source_objitems', queryset=filtered_objitems_qs, to_attr='filtered_objitems'),
|
||
# Prefetch marks with their relationships
|
||
'marks',
|
||
'marks__created_by',
|
||
'marks__created_by__user'
|
||
).annotate(
|
||
# Use annotate for efficient counting in a single query
|
||
objitem_count=Count('source_objitems', filter=objitem_filter_q, distinct=True) if has_objitem_filter else Count('source_objitems')
|
||
)
|
||
|
||
# Apply filters
|
||
# Filter by coords_average presence
|
||
if has_coords_average == "1":
|
||
sources = sources.filter(coords_average__isnull=False)
|
||
elif has_coords_average == "0":
|
||
sources = sources.filter(coords_average__isnull=True)
|
||
|
||
# Filter by coords_kupsat presence
|
||
if has_coords_kupsat == "1":
|
||
sources = sources.filter(coords_kupsat__isnull=False)
|
||
elif has_coords_kupsat == "0":
|
||
sources = sources.filter(coords_kupsat__isnull=True)
|
||
|
||
# Filter by coords_valid presence
|
||
if has_coords_valid == "1":
|
||
sources = sources.filter(coords_valid__isnull=False)
|
||
elif has_coords_valid == "0":
|
||
sources = sources.filter(coords_valid__isnull=True)
|
||
|
||
# Filter by coords_reference presence
|
||
if has_coords_reference == "1":
|
||
sources = sources.filter(coords_reference__isnull=False)
|
||
elif has_coords_reference == "0":
|
||
sources = sources.filter(coords_reference__isnull=True)
|
||
|
||
# Filter by ObjectInfo (info field)
|
||
if selected_info:
|
||
sources = sources.filter(info_id__in=selected_info)
|
||
|
||
# Filter by ObjectOwnership (ownership field)
|
||
if selected_ownership:
|
||
sources = sources.filter(ownership_id__in=selected_ownership)
|
||
|
||
# Filter by signal marks
|
||
if has_signal_mark or mark_date_from or mark_date_to:
|
||
mark_filter_q = Q()
|
||
|
||
# Filter by mark value (signal presence)
|
||
if has_signal_mark == "1":
|
||
mark_filter_q &= Q(marks__mark=True)
|
||
elif has_signal_mark == "0":
|
||
mark_filter_q &= Q(marks__mark=False)
|
||
|
||
# Filter by mark date range
|
||
if mark_date_from:
|
||
try:
|
||
mark_date_from_obj = datetime.strptime(mark_date_from, "%Y-%m-%d")
|
||
mark_filter_q &= Q(marks__timestamp__gte=mark_date_from_obj)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if mark_date_to:
|
||
try:
|
||
from datetime import timedelta
|
||
mark_date_to_obj = datetime.strptime(mark_date_to, "%Y-%m-%d")
|
||
# Add one day to include entire end date
|
||
mark_date_to_obj = mark_date_to_obj + timedelta(days=1)
|
||
mark_filter_q &= Q(marks__timestamp__lt=mark_date_to_obj)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if mark_filter_q:
|
||
sources = sources.filter(mark_filter_q).distinct()
|
||
|
||
# Filter by ObjItem count
|
||
if objitem_count_min:
|
||
try:
|
||
min_count = int(objitem_count_min)
|
||
sources = sources.filter(objitem_count__gte=min_count)
|
||
except ValueError:
|
||
pass
|
||
|
||
if objitem_count_max:
|
||
try:
|
||
max_count = int(objitem_count_max)
|
||
sources = sources.filter(objitem_count__lte=max_count)
|
||
except ValueError:
|
||
pass
|
||
|
||
# Filter by creation date range
|
||
if date_from:
|
||
try:
|
||
date_from_obj = datetime.strptime(date_from, "%Y-%m-%d")
|
||
sources = sources.filter(created_at__gte=date_from_obj)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if date_to:
|
||
try:
|
||
from datetime import timedelta
|
||
date_to_obj = datetime.strptime(date_to, "%Y-%m-%d")
|
||
# Add one day to include entire end date
|
||
date_to_obj = date_to_obj + timedelta(days=1)
|
||
sources = sources.filter(created_at__lt=date_to_obj)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Filter by Geo timestamp range (only filter sources that have matching objitems)
|
||
if geo_date_from or geo_date_to:
|
||
geo_filter_q = Q()
|
||
if geo_date_from:
|
||
try:
|
||
geo_date_from_obj = datetime.strptime(geo_date_from, "%Y-%m-%d")
|
||
geo_filter_q &= Q(source_objitems__geo_obj__timestamp__gte=geo_date_from_obj)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if geo_date_to:
|
||
try:
|
||
from datetime import timedelta
|
||
geo_date_to_obj = datetime.strptime(geo_date_to, "%Y-%m-%d")
|
||
geo_date_to_obj = geo_date_to_obj + timedelta(days=1)
|
||
geo_filter_q &= Q(source_objitems__geo_obj__timestamp__lt=geo_date_to_obj)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if geo_filter_q:
|
||
sources = sources.filter(geo_filter_q).distinct()
|
||
|
||
# Search by ID or name
|
||
if search_query:
|
||
try:
|
||
# Try to search by ID first
|
||
search_id = int(search_query)
|
||
sources = sources.filter(id=search_id)
|
||
except ValueError:
|
||
# If not a number, search by name in related objitems
|
||
sources = sources.filter(
|
||
source_objitems__name__icontains=search_query
|
||
).distinct()
|
||
|
||
# Filter by satellites
|
||
if selected_satellites:
|
||
sources = sources.filter(
|
||
source_objitems__parameter_obj__id_satellite_id__in=selected_satellites
|
||
).distinct()
|
||
|
||
# Filter by polarizations
|
||
if selected_polarizations:
|
||
sources = sources.filter(
|
||
source_objitems__parameter_obj__polarization_id__in=selected_polarizations
|
||
).distinct()
|
||
|
||
# Filter by modulations
|
||
if selected_modulations:
|
||
sources = sources.filter(
|
||
source_objitems__parameter_obj__modulation_id__in=selected_modulations
|
||
).distinct()
|
||
|
||
# Filter by frequency range
|
||
if freq_min:
|
||
try:
|
||
freq_min_val = float(freq_min)
|
||
sources = sources.filter(source_objitems__parameter_obj__frequency__gte=freq_min_val).distinct()
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if freq_max:
|
||
try:
|
||
freq_max_val = float(freq_max)
|
||
sources = sources.filter(source_objitems__parameter_obj__frequency__lte=freq_max_val).distinct()
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Filter by frequency range (bandwidth)
|
||
if freq_range_min:
|
||
try:
|
||
freq_range_min_val = float(freq_range_min)
|
||
sources = sources.filter(source_objitems__parameter_obj__freq_range__gte=freq_range_min_val).distinct()
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if freq_range_max:
|
||
try:
|
||
freq_range_max_val = float(freq_range_max)
|
||
sources = sources.filter(source_objitems__parameter_obj__freq_range__lte=freq_range_max_val).distinct()
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Filter by symbol rate
|
||
if bod_velocity_min:
|
||
try:
|
||
bod_velocity_min_val = float(bod_velocity_min)
|
||
sources = sources.filter(source_objitems__parameter_obj__bod_velocity__gte=bod_velocity_min_val).distinct()
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if bod_velocity_max:
|
||
try:
|
||
bod_velocity_max_val = float(bod_velocity_max)
|
||
sources = sources.filter(source_objitems__parameter_obj__bod_velocity__lte=bod_velocity_max_val).distinct()
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Filter by SNR
|
||
if snr_min:
|
||
try:
|
||
snr_min_val = float(snr_min)
|
||
sources = sources.filter(source_objitems__parameter_obj__snr__gte=snr_min_val).distinct()
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if snr_max:
|
||
try:
|
||
snr_max_val = float(snr_max)
|
||
sources = sources.filter(source_objitems__parameter_obj__snr__lte=snr_max_val).distinct()
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Filter by mirrors
|
||
if selected_mirrors:
|
||
sources = sources.filter(
|
||
source_objitems__geo_obj__mirrors__id__in=selected_mirrors
|
||
).distinct()
|
||
|
||
# Filter by polygon
|
||
if polygon_geom:
|
||
sources = sources.filter(
|
||
source_objitems__geo_obj__coords__within=polygon_geom
|
||
).distinct()
|
||
|
||
# Apply sorting
|
||
valid_sort_fields = {
|
||
"id": "id",
|
||
"-id": "-id",
|
||
"created_at": "created_at",
|
||
"-created_at": "-created_at",
|
||
"updated_at": "updated_at",
|
||
"-updated_at": "-updated_at",
|
||
"objitem_count": "objitem_count",
|
||
"-objitem_count": "-objitem_count",
|
||
}
|
||
|
||
if sort_param in valid_sort_fields:
|
||
sources = sources.order_by(valid_sort_fields[sort_param])
|
||
|
||
# Create paginator
|
||
paginator = Paginator(sources, items_per_page)
|
||
page_obj = paginator.get_page(page_number)
|
||
|
||
# Prepare data for display
|
||
processed_sources = []
|
||
|
||
for source in page_obj:
|
||
# Format coordinates
|
||
coords_average_str = format_coords_display(source.coords_average)
|
||
coords_kupsat_str = format_coords_display(source.coords_kupsat)
|
||
coords_valid_str = format_coords_display(source.coords_valid)
|
||
coords_reference_str = format_coords_display(source.coords_reference)
|
||
|
||
# Use pre-filtered objitems from Prefetch
|
||
objitems_to_display = source.filtered_objitems
|
||
|
||
# Use annotated count (consistent with filtering)
|
||
objitem_count = source.objitem_count
|
||
|
||
# Get satellites, name and check for LyngSat
|
||
satellite_names = set()
|
||
satellite_ids = set()
|
||
has_lyngsat = False
|
||
lyngsat_id = None
|
||
source_name = None
|
||
|
||
for objitem in objitems_to_display:
|
||
# Get name from first objitem
|
||
if source_name is None and objitem.name:
|
||
source_name = objitem.name
|
||
|
||
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
|
||
if hasattr(objitem.parameter_obj, 'id_satellite') and objitem.parameter_obj.id_satellite:
|
||
satellite_names.add(objitem.parameter_obj.id_satellite.name)
|
||
satellite_ids.add(objitem.parameter_obj.id_satellite.id)
|
||
|
||
# Check if any objitem has LyngSat
|
||
if hasattr(objitem, 'lyngsat_source') and objitem.lyngsat_source:
|
||
has_lyngsat = True
|
||
lyngsat_id = objitem.lyngsat_source.id
|
||
|
||
satellite_str = ", ".join(sorted(satellite_names)) if satellite_names else "-"
|
||
# Get first satellite ID for modal link (if multiple satellites, use first one)
|
||
first_satellite_id = min(satellite_ids) if satellite_ids else None
|
||
|
||
# Get all marks (presence/absence)
|
||
marks_data = []
|
||
for mark in source.marks.all():
|
||
marks_data.append({
|
||
'mark': mark.mark,
|
||
'timestamp': mark.timestamp,
|
||
'created_by': str(mark.created_by) if mark.created_by else '-',
|
||
})
|
||
|
||
# Get info name and ownership
|
||
info_name = source.info.name if source.info else '-'
|
||
ownership_name = source.ownership.name if source.ownership else '-'
|
||
|
||
processed_sources.append({
|
||
'id': source.id,
|
||
'name': source_name if source_name else '-',
|
||
'info': info_name,
|
||
'ownership': ownership_name,
|
||
'coords_average': coords_average_str,
|
||
'coords_kupsat': coords_kupsat_str,
|
||
'coords_valid': coords_valid_str,
|
||
'coords_reference': coords_reference_str,
|
||
'objitem_count': objitem_count,
|
||
'satellite': satellite_str,
|
||
'satellite_id': first_satellite_id,
|
||
'created_at': source.created_at,
|
||
'updated_at': source.updated_at,
|
||
'confirm_at': source.confirm_at,
|
||
'last_signal_at': source.last_signal_at,
|
||
'has_lyngsat': has_lyngsat,
|
||
'lyngsat_id': lyngsat_id,
|
||
'marks': marks_data,
|
||
'note': source.note if source.note else '-'
|
||
})
|
||
|
||
# Prepare context for template
|
||
context = {
|
||
'page_obj': page_obj,
|
||
'processed_sources': processed_sources,
|
||
'items_per_page': items_per_page,
|
||
'available_items_per_page': [50, 100, 500, 1000],
|
||
'sort': sort_param,
|
||
'search_query': search_query,
|
||
# Source-level filters
|
||
'has_coords_average': has_coords_average,
|
||
'has_coords_kupsat': has_coords_kupsat,
|
||
'has_coords_valid': has_coords_valid,
|
||
'has_coords_reference': has_coords_reference,
|
||
'selected_info': [
|
||
int(x) if isinstance(x, str) else x for x in selected_info if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
|
||
],
|
||
'selected_ownership': [
|
||
int(x) if isinstance(x, str) else x for x in selected_ownership if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
|
||
],
|
||
'objitem_count_min': objitem_count_min,
|
||
'objitem_count_max': objitem_count_max,
|
||
'date_from': date_from,
|
||
'date_to': date_to,
|
||
'has_signal_mark': has_signal_mark,
|
||
'mark_date_from': mark_date_from,
|
||
'mark_date_to': mark_date_to,
|
||
# ObjItem-level filters
|
||
'geo_date_from': geo_date_from,
|
||
'geo_date_to': geo_date_to,
|
||
'object_infos': object_infos,
|
||
'object_ownerships': object_ownerships,
|
||
'satellites': satellites,
|
||
'selected_satellites': [
|
||
int(x) if isinstance(x, str) else x for x in selected_satellites if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
|
||
],
|
||
'polarizations': polarizations,
|
||
'selected_polarizations': [
|
||
int(x) if isinstance(x, str) else x for x in selected_polarizations if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
|
||
],
|
||
'modulations': modulations,
|
||
'selected_modulations': [
|
||
int(x) if isinstance(x, str) else x for x in selected_modulations if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
|
||
],
|
||
'freq_min': freq_min,
|
||
'freq_max': freq_max,
|
||
'freq_range_min': freq_range_min,
|
||
'freq_range_max': freq_range_max,
|
||
'bod_velocity_min': bod_velocity_min,
|
||
'bod_velocity_max': bod_velocity_max,
|
||
'snr_min': snr_min,
|
||
'snr_max': snr_max,
|
||
'mirrors': mirrors,
|
||
'selected_mirrors': [
|
||
int(x) if isinstance(x, str) else x for x in selected_mirrors if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
|
||
],
|
||
'object_infos': object_infos,
|
||
'polygon_coords': json.dumps(polygon_coords) if polygon_coords else None,
|
||
'full_width_page': True,
|
||
}
|
||
|
||
return render(request, "mainapp/source_list.html", context)
|
||
|
||
|
||
|
||
class AdminModeratorMixin(UserPassesTestMixin):
|
||
"""Mixin to restrict access to admin and moderator roles only."""
|
||
|
||
def test_func(self):
|
||
return (
|
||
self.request.user.is_authenticated and
|
||
hasattr(self.request.user, 'customuser') and
|
||
self.request.user.customuser.role in ['admin', 'moderator']
|
||
)
|
||
|
||
def handle_no_permission(self):
|
||
messages.error(self.request, 'У вас нет прав для выполнения этого действия.')
|
||
return redirect('mainapp:source_list')
|
||
|
||
|
||
class SourceCreateView(LoginRequiredMixin, AdminModeratorMixin, View):
|
||
"""View for creating new Source."""
|
||
|
||
def get(self, request):
|
||
form = SourceForm()
|
||
|
||
context = {
|
||
'object': None,
|
||
'form': form,
|
||
'objitems': [],
|
||
'full_width_page': True,
|
||
}
|
||
|
||
return render(request, 'mainapp/source_form.html', context)
|
||
|
||
def post(self, request):
|
||
form = SourceForm(request.POST)
|
||
|
||
if form.is_valid():
|
||
source = form.save(commit=False)
|
||
# Set created_by and updated_by to current user
|
||
if hasattr(request.user, 'customuser'):
|
||
source.created_by = request.user.customuser
|
||
source.updated_by = request.user.customuser
|
||
source.save()
|
||
|
||
messages.success(request, f'Источник #{source.id} успешно создан.')
|
||
|
||
# Redirect to edit page
|
||
return redirect('mainapp:source_update', pk=source.id)
|
||
|
||
# If form is invalid, re-render with errors
|
||
context = {
|
||
'object': None,
|
||
'form': form,
|
||
'objitems': [],
|
||
'full_width_page': True,
|
||
}
|
||
|
||
return render(request, 'mainapp/source_form.html', context)
|
||
|
||
|
||
class SourceUpdateView(LoginRequiredMixin, AdminModeratorMixin, View):
|
||
"""View for editing Source with 4 coordinate fields and related ObjItems."""
|
||
|
||
def get(self, request, pk):
|
||
source = get_object_or_404(Source, pk=pk)
|
||
form = SourceForm(instance=source)
|
||
form.fields['average_latitude'].disabled = True
|
||
form.fields['average_longitude'].disabled = True
|
||
|
||
|
||
# Get related ObjItems ordered by creation date
|
||
objitems = source.source_objitems.select_related(
|
||
'parameter_obj',
|
||
'parameter_obj__id_satellite',
|
||
'parameter_obj__polarization',
|
||
'parameter_obj__modulation',
|
||
'parameter_obj__standard',
|
||
'geo_obj',
|
||
'created_by__user',
|
||
'updated_by__user'
|
||
).order_by('created_at')
|
||
|
||
context = {
|
||
'object': source,
|
||
'form': form,
|
||
'objitems': objitems,
|
||
'full_width_page': True,
|
||
}
|
||
|
||
return render(request, 'mainapp/source_form.html', context)
|
||
|
||
def post(self, request, pk):
|
||
source = get_object_or_404(Source, pk=pk)
|
||
form = SourceForm(request.POST, instance=source)
|
||
|
||
if form.is_valid():
|
||
source = form.save(commit=False)
|
||
# Set updated_by to current user
|
||
if hasattr(request.user, 'customuser'):
|
||
source.updated_by = request.user.customuser
|
||
source.save()
|
||
|
||
messages.success(request, f'Источник #{source.id} успешно обновлен.')
|
||
|
||
# Redirect back with query params if present
|
||
if request.GET.urlencode():
|
||
return redirect(f"{reverse('mainapp:source_update', args=[source.id])}?{request.GET.urlencode()}")
|
||
return redirect('mainapp:source_update', pk=source.id)
|
||
|
||
# If form is invalid, re-render with errors
|
||
objitems = source.source_objitems.select_related(
|
||
'parameter_obj',
|
||
'parameter_obj__id_satellite',
|
||
'parameter_obj__polarization',
|
||
'parameter_obj__modulation',
|
||
'parameter_obj__standard',
|
||
'geo_obj',
|
||
'created_by__user',
|
||
'updated_by__user'
|
||
).order_by('created_at')
|
||
|
||
context = {
|
||
'object': source,
|
||
'form': form,
|
||
'objitems': objitems,
|
||
'full_width_page': True,
|
||
}
|
||
|
||
return render(request, 'mainapp/source_form.html', context)
|
||
|
||
|
||
class SourceDeleteView(LoginRequiredMixin, AdminModeratorMixin, View):
|
||
"""View for deleting Source."""
|
||
|
||
def get(self, request, pk):
|
||
source = get_object_or_404(Source, pk=pk)
|
||
|
||
context = {
|
||
'object': source,
|
||
'objitems_count': source.source_objitems.count(),
|
||
}
|
||
|
||
return render(request, 'mainapp/source_confirm_delete.html', context)
|
||
|
||
def post(self, request, pk):
|
||
source = get_object_or_404(Source, pk=pk)
|
||
source_id = source.id
|
||
|
||
try:
|
||
source.delete()
|
||
messages.success(request, f'Источник #{source_id} успешно удален.')
|
||
except Exception as e:
|
||
messages.error(request, f'Ошибка при удалении источника: {str(e)}')
|
||
return redirect('mainapp:source_update', pk=pk)
|
||
|
||
# Redirect to source list
|
||
if request.GET.urlencode():
|
||
return redirect(f"{reverse('mainapp:source_list')}?{request.GET.urlencode()}")
|
||
return redirect('mainapp:source_list')
|
||
|
||
|
||
class DeleteSelectedSourcesView(LoginRequiredMixin, AdminModeratorMixin, View):
|
||
"""View for deleting multiple selected sources with confirmation."""
|
||
|
||
def get(self, request):
|
||
"""Show confirmation page with details about sources to be deleted."""
|
||
ids = request.GET.get("ids", "")
|
||
if not ids:
|
||
messages.error(request, "Не выбраны источники для удаления")
|
||
return redirect('mainapp:source_list')
|
||
|
||
try:
|
||
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
|
||
sources = Source.objects.filter(id__in=id_list).prefetch_related(
|
||
'source_objitems',
|
||
'source_objitems__parameter_obj',
|
||
'source_objitems__parameter_obj__id_satellite',
|
||
'source_objitems__geo_obj'
|
||
).annotate(
|
||
objitem_count=Count('source_objitems')
|
||
)
|
||
|
||
# Prepare detailed information about sources
|
||
sources_info = []
|
||
total_objitems = 0
|
||
|
||
for source in sources:
|
||
# Get satellites for this source
|
||
satellite_names = set()
|
||
for objitem in source.source_objitems.all():
|
||
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
|
||
if hasattr(objitem.parameter_obj, 'id_satellite') and objitem.parameter_obj.id_satellite:
|
||
satellite_names.add(objitem.parameter_obj.id_satellite.name)
|
||
|
||
objitem_count = source.objitem_count
|
||
total_objitems += objitem_count
|
||
|
||
sources_info.append({
|
||
'id': source.id,
|
||
'objitem_count': objitem_count,
|
||
'satellites': ", ".join(sorted(satellite_names)) if satellite_names else "-",
|
||
})
|
||
|
||
context = {
|
||
'sources_info': sources_info,
|
||
'total_sources': len(sources_info),
|
||
'total_objitems': total_objitems,
|
||
'ids': ids,
|
||
}
|
||
|
||
return render(request, 'mainapp/source_bulk_delete_confirm.html', context)
|
||
|
||
except Exception as e:
|
||
messages.error(request, f'Ошибка при подготовке удаления: {str(e)}')
|
||
return redirect('mainapp:source_list')
|
||
|
||
def post(self, request):
|
||
"""Actually delete the selected sources."""
|
||
ids = request.POST.get("ids", "")
|
||
if not ids:
|
||
return JsonResponse({"error": "Нет ID для удаления"}, status=400)
|
||
|
||
try:
|
||
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
|
||
|
||
# Get count before deletion
|
||
sources = Source.objects.filter(id__in=id_list)
|
||
deleted_sources_count = sources.count()
|
||
|
||
# Delete sources (cascade will delete related objitems)
|
||
sources.delete()
|
||
|
||
messages.success(
|
||
request,
|
||
f'Успешно удалено источников: {deleted_sources_count}'
|
||
)
|
||
|
||
return JsonResponse({
|
||
"success": True,
|
||
"message": f"Успешно удалено источников: {deleted_sources_count}",
|
||
"deleted_count": deleted_sources_count,
|
||
})
|
||
|
||
except Exception as e:
|
||
return JsonResponse({"error": f"Ошибка при удалении: {str(e)}"}, status=500)
|