Files
dbstorage/dbapp/mainapp/views/source.py

1143 lines
48 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Source related views.
"""
import json
from datetime import datetime
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
from django.contrib.gis.geos import Point, Polygon as GEOSPolygon
from django.core.paginator import Paginator
from django.db.models import Count, Prefetch, Q
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.views import View
from ..forms import SourceForm
from ..models import Source, Satellite
from ..utils import format_coords_display, parse_pagination_params
class SourceListView(LoginRequiredMixin, View):
"""
View for displaying a list of sources (Source).
"""
def get(self, request):
# Get pagination parameters
page_number, items_per_page = parse_pagination_params(request)
# Get sorting parameters (default to ID ascending)
sort_param = request.GET.get("sort", "id")
# Get filter parameters - Source level
search_query = request.GET.get("search", "").strip()
has_coords_average = request.GET.get("has_coords_average")
has_coords_kupsat = request.GET.get("has_coords_kupsat")
has_coords_valid = request.GET.get("has_coords_valid")
has_coords_reference = request.GET.get("has_coords_reference")
selected_info = request.GET.getlist("info_id")
selected_ownership = request.GET.getlist("ownership_id")
objitem_count_min = request.GET.get("objitem_count_min", "").strip()
objitem_count_max = request.GET.get("objitem_count_max", "").strip()
date_from = request.GET.get("date_from", "").strip()
date_to = request.GET.get("date_to", "").strip()
# Source request filters
has_requests = request.GET.get("has_requests")
selected_request_statuses = request.GET.getlist("request_status")
selected_request_priorities = request.GET.getlist("request_priority")
request_gso_success = request.GET.get("request_gso_success")
request_kubsat_success = request.GET.get("request_kubsat_success")
request_planned_from = request.GET.get("request_planned_from", "").strip()
request_planned_to = request.GET.get("request_planned_to", "").strip()
request_date_from = request.GET.get("request_date_from", "").strip()
request_date_to = request.GET.get("request_date_to", "").strip()
# Get filter parameters - ObjItem level (параметры точек)
geo_date_from = request.GET.get("geo_date_from", "").strip()
geo_date_to = request.GET.get("geo_date_to", "").strip()
selected_satellites = request.GET.getlist("satellite_id")
selected_polarizations = request.GET.getlist("polarization_id")
selected_modulations = request.GET.getlist("modulation_id")
selected_mirrors = request.GET.getlist("mirror_id")
freq_min = request.GET.get("freq_min", "").strip()
freq_max = request.GET.get("freq_max", "").strip()
freq_range_min = request.GET.get("freq_range_min", "").strip()
freq_range_max = request.GET.get("freq_range_max", "").strip()
bod_velocity_min = request.GET.get("bod_velocity_min", "").strip()
bod_velocity_max = request.GET.get("bod_velocity_max", "").strip()
snr_min = request.GET.get("snr_min", "").strip()
snr_max = request.GET.get("snr_max", "").strip()
# Get polygon filter
polygon_coords_str = request.GET.get("polygon", "").strip()
polygon_coords = None
polygon_geom = None
if polygon_coords_str:
try:
polygon_coords = json.loads(polygon_coords_str)
if polygon_coords and len(polygon_coords) >= 4:
# Create GEOS Polygon from coordinates
# Coordinates are in [lng, lat] format
polygon_geom = GEOSPolygon(polygon_coords, srid=4326)
except (json.JSONDecodeError, ValueError, TypeError) as e:
# Invalid polygon data, ignore
polygon_coords = None
polygon_geom = None
# Get all satellites for filter
satellites = (
Satellite.objects.filter(parameters__objitem__source__isnull=False)
.distinct()
.only("id", "name")
.order_by("name")
)
# Get all polarizations, modulations for filters
from ..models import Polarization, Modulation, ObjectInfo
polarizations = Polarization.objects.all().order_by("name")
modulations = Modulation.objects.all().order_by("name")
# Get all ObjectInfo for filter
object_infos = ObjectInfo.objects.all().order_by("name")
# Get all ObjectOwnership for filter
from ..models import ObjectOwnership
object_ownerships = ObjectOwnership.objects.all().order_by("name")
# Get all satellites that are used as mirrors
mirrors = (
Satellite.objects.filter(geo_mirrors__isnull=False)
.distinct()
.only("id", "name")
.order_by("name")
)
# Build Q object for filtering objitems in count
# This will be used in the annotate to count only objitems that match filters
objitem_filter_q = Q()
has_objitem_filter = False
# Check if search is by name (not by ID)
search_by_name = False
if search_query:
try:
int(search_query) # Try to parse as ID
except ValueError:
# Not a number, so it's a name search
search_by_name = True
objitem_filter_q &= Q(source_objitems__name__icontains=search_query)
has_objitem_filter = True
# Add geo date filter
if geo_date_from:
try:
geo_date_from_obj = datetime.strptime(geo_date_from, "%Y-%m-%d")
objitem_filter_q &= Q(source_objitems__geo_obj__timestamp__gte=geo_date_from_obj)
has_objitem_filter = True
except (ValueError, TypeError):
pass
if geo_date_to:
try:
from datetime import timedelta
geo_date_to_obj = datetime.strptime(geo_date_to, "%Y-%m-%d")
# Add one day to include entire end date
geo_date_to_obj = geo_date_to_obj + timedelta(days=1)
objitem_filter_q &= Q(source_objitems__geo_obj__timestamp__lt=geo_date_to_obj)
has_objitem_filter = True
except (ValueError, TypeError):
pass
# Add satellite filter to count
if selected_satellites:
objitem_filter_q &= Q(source_objitems__parameter_obj__id_satellite_id__in=selected_satellites)
has_objitem_filter = True
# Add polarization filter
if selected_polarizations:
objitem_filter_q &= Q(source_objitems__parameter_obj__polarization_id__in=selected_polarizations)
has_objitem_filter = True
# Add modulation filter
if selected_modulations:
objitem_filter_q &= Q(source_objitems__parameter_obj__modulation_id__in=selected_modulations)
has_objitem_filter = True
# Add frequency filter
if freq_min:
try:
freq_min_val = float(freq_min)
objitem_filter_q &= Q(source_objitems__parameter_obj__frequency__gte=freq_min_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
if freq_max:
try:
freq_max_val = float(freq_max)
objitem_filter_q &= Q(source_objitems__parameter_obj__frequency__lte=freq_max_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
# Add frequency range (bandwidth) filter
if freq_range_min:
try:
freq_range_min_val = float(freq_range_min)
objitem_filter_q &= Q(source_objitems__parameter_obj__freq_range__gte=freq_range_min_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
if freq_range_max:
try:
freq_range_max_val = float(freq_range_max)
objitem_filter_q &= Q(source_objitems__parameter_obj__freq_range__lte=freq_range_max_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
# Add symbol rate (bod_velocity) filter
if bod_velocity_min:
try:
bod_velocity_min_val = float(bod_velocity_min)
objitem_filter_q &= Q(source_objitems__parameter_obj__bod_velocity__gte=bod_velocity_min_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
if bod_velocity_max:
try:
bod_velocity_max_val = float(bod_velocity_max)
objitem_filter_q &= Q(source_objitems__parameter_obj__bod_velocity__lte=bod_velocity_max_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
# Add SNR filter
if snr_min:
try:
snr_min_val = float(snr_min)
objitem_filter_q &= Q(source_objitems__parameter_obj__snr__gte=snr_min_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
if snr_max:
try:
snr_max_val = float(snr_max)
objitem_filter_q &= Q(source_objitems__parameter_obj__snr__lte=snr_max_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
# Add mirrors filter
if selected_mirrors:
objitem_filter_q &= Q(source_objitems__geo_obj__mirrors__id__in=selected_mirrors)
has_objitem_filter = True
# Add polygon filter
if polygon_geom:
objitem_filter_q &= Q(source_objitems__geo_obj__coords__within=polygon_geom)
has_objitem_filter = True
# Build filtered objitems queryset for prefetch
from ..models import ObjItem
filtered_objitems_qs = ObjItem.objects.select_related(
'parameter_obj',
'parameter_obj__id_satellite',
'parameter_obj__polarization',
'parameter_obj__modulation',
'parameter_obj__standard',
'geo_obj',
'lyngsat_source',
'lyngsat_source__id_satellite',
'lyngsat_source__polarization',
'lyngsat_source__modulation',
'lyngsat_source__standard',
'transponder',
'created_by',
'created_by__user',
'updated_by',
'updated_by__user',
).prefetch_related(
'geo_obj__mirrors',
)
# Apply the same filters to prefetch queryset
if search_by_name:
filtered_objitems_qs = filtered_objitems_qs.filter(name__icontains=search_query)
if geo_date_from:
try:
geo_date_from_obj = datetime.strptime(geo_date_from, "%Y-%m-%d")
filtered_objitems_qs = filtered_objitems_qs.filter(geo_obj__timestamp__gte=geo_date_from_obj)
except (ValueError, TypeError):
pass
if geo_date_to:
try:
from datetime import timedelta
geo_date_to_obj = datetime.strptime(geo_date_to, "%Y-%m-%d")
geo_date_to_obj = geo_date_to_obj + timedelta(days=1)
filtered_objitems_qs = filtered_objitems_qs.filter(geo_obj__timestamp__lt=geo_date_to_obj)
except (ValueError, TypeError):
pass
if selected_satellites:
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__id_satellite_id__in=selected_satellites)
if selected_polarizations:
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__polarization_id__in=selected_polarizations)
if selected_modulations:
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__modulation_id__in=selected_modulations)
if freq_min:
try:
freq_min_val = float(freq_min)
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__frequency__gte=freq_min_val)
except (ValueError, TypeError):
pass
if freq_max:
try:
freq_max_val = float(freq_max)
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__frequency__lte=freq_max_val)
except (ValueError, TypeError):
pass
if freq_range_min:
try:
freq_range_min_val = float(freq_range_min)
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__freq_range__gte=freq_range_min_val)
except (ValueError, TypeError):
pass
if freq_range_max:
try:
freq_range_max_val = float(freq_range_max)
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__freq_range__lte=freq_range_max_val)
except (ValueError, TypeError):
pass
if bod_velocity_min:
try:
bod_velocity_min_val = float(bod_velocity_min)
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__bod_velocity__gte=bod_velocity_min_val)
except (ValueError, TypeError):
pass
if bod_velocity_max:
try:
bod_velocity_max_val = float(bod_velocity_max)
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__bod_velocity__lte=bod_velocity_max_val)
except (ValueError, TypeError):
pass
if snr_min:
try:
snr_min_val = float(snr_min)
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__snr__gte=snr_min_val)
except (ValueError, TypeError):
pass
if snr_max:
try:
snr_max_val = float(snr_max)
filtered_objitems_qs = filtered_objitems_qs.filter(parameter_obj__snr__lte=snr_max_val)
except (ValueError, TypeError):
pass
if selected_mirrors:
filtered_objitems_qs = filtered_objitems_qs.filter(geo_obj__mirrors__id__in=selected_mirrors)
if polygon_geom:
filtered_objitems_qs = filtered_objitems_qs.filter(geo_obj__coords__within=polygon_geom)
# Get all Source objects with query optimization
# Using annotate to count ObjItems efficiently (single query with GROUP BY)
# Using select_related for ForeignKey/OneToOne relationships to avoid N+1 queries
# Using Prefetch with filtered queryset to avoid N+1 queries in display loop
sources = Source.objects.select_related(
'info', # ForeignKey to ObjectInfo
'created_by', # ForeignKey to CustomUser
'created_by__user', # OneToOne to User
'updated_by', # ForeignKey to CustomUser
'updated_by__user', # OneToOne to User
).prefetch_related(
# Use Prefetch with filtered queryset
Prefetch('source_objitems', queryset=filtered_objitems_qs, to_attr='filtered_objitems'),
).annotate(
# Use annotate for efficient counting in a single query
objitem_count=Count('source_objitems', filter=objitem_filter_q, distinct=True) if has_objitem_filter else Count('source_objitems')
)
# Apply filters
# Filter by coords_average presence
if has_coords_average == "1":
sources = sources.filter(coords_average__isnull=False)
elif has_coords_average == "0":
sources = sources.filter(coords_average__isnull=True)
# Filter by coords_kupsat presence
if has_coords_kupsat == "1":
sources = sources.filter(coords_kupsat__isnull=False)
elif has_coords_kupsat == "0":
sources = sources.filter(coords_kupsat__isnull=True)
# Filter by coords_valid presence
if has_coords_valid == "1":
sources = sources.filter(coords_valid__isnull=False)
elif has_coords_valid == "0":
sources = sources.filter(coords_valid__isnull=True)
# Filter by coords_reference presence
if has_coords_reference == "1":
sources = sources.filter(coords_reference__isnull=False)
elif has_coords_reference == "0":
sources = sources.filter(coords_reference__isnull=True)
# Filter by ObjectInfo (info field)
if selected_info:
sources = sources.filter(info_id__in=selected_info)
# Filter by ObjectOwnership (ownership field)
if selected_ownership:
sources = sources.filter(ownership_id__in=selected_ownership)
# NOTE: Фильтры по отметкам сигналов удалены, т.к. ObjectMark теперь связан с TechAnalyze, а не с Source
# Для фильтрации по отметкам используйте страницу "Отметки сигналов"
# Filter by source requests
if has_requests == "1":
# Has requests - apply subfilters
from ..models import SourceRequest
from django.db.models import Exists, OuterRef
# Build subquery for filtering requests
request_subquery = SourceRequest.objects.filter(source=OuterRef('pk'))
# Filter by request status
if selected_request_statuses:
request_subquery = request_subquery.filter(status__in=selected_request_statuses)
# Filter by request priority
if selected_request_priorities:
request_subquery = request_subquery.filter(priority__in=selected_request_priorities)
# Filter by GSO success
if request_gso_success == "true":
request_subquery = request_subquery.filter(gso_success=True)
elif request_gso_success == "false":
request_subquery = request_subquery.filter(gso_success=False)
# Filter by Kubsat success
if request_kubsat_success == "true":
request_subquery = request_subquery.filter(kubsat_success=True)
elif request_kubsat_success == "false":
request_subquery = request_subquery.filter(kubsat_success=False)
# Filter by planned date range
if request_planned_from:
try:
planned_from_obj = datetime.strptime(request_planned_from, "%Y-%m-%d")
request_subquery = request_subquery.filter(planned_at__gte=planned_from_obj)
except (ValueError, TypeError):
pass
if request_planned_to:
try:
from datetime import timedelta
planned_to_obj = datetime.strptime(request_planned_to, "%Y-%m-%d")
planned_to_obj = planned_to_obj + timedelta(days=1)
request_subquery = request_subquery.filter(planned_at__lt=planned_to_obj)
except (ValueError, TypeError):
pass
# Filter by request date range
if request_date_from:
try:
req_date_from_obj = datetime.strptime(request_date_from, "%Y-%m-%d")
request_subquery = request_subquery.filter(request_date__gte=req_date_from_obj)
except (ValueError, TypeError):
pass
if request_date_to:
try:
req_date_to_obj = datetime.strptime(request_date_to, "%Y-%m-%d")
request_subquery = request_subquery.filter(request_date__lte=req_date_to_obj)
except (ValueError, TypeError):
pass
# Apply the subquery filter using Exists
sources = sources.filter(Exists(request_subquery))
elif has_requests == "0":
# No requests
sources = sources.filter(source_requests__isnull=True)
# Filter by ObjItem count
if objitem_count_min:
try:
min_count = int(objitem_count_min)
sources = sources.filter(objitem_count__gte=min_count)
except ValueError:
pass
if objitem_count_max:
try:
max_count = int(objitem_count_max)
sources = sources.filter(objitem_count__lte=max_count)
except ValueError:
pass
# Filter by creation date range
if date_from:
try:
date_from_obj = datetime.strptime(date_from, "%Y-%m-%d")
sources = sources.filter(created_at__gte=date_from_obj)
except (ValueError, TypeError):
pass
if date_to:
try:
from datetime import timedelta
date_to_obj = datetime.strptime(date_to, "%Y-%m-%d")
# Add one day to include entire end date
date_to_obj = date_to_obj + timedelta(days=1)
sources = sources.filter(created_at__lt=date_to_obj)
except (ValueError, TypeError):
pass
# Filter by Geo timestamp range (only filter sources that have matching objitems)
if geo_date_from or geo_date_to:
geo_filter_q = Q()
if geo_date_from:
try:
geo_date_from_obj = datetime.strptime(geo_date_from, "%Y-%m-%d")
geo_filter_q &= Q(source_objitems__geo_obj__timestamp__gte=geo_date_from_obj)
except (ValueError, TypeError):
pass
if geo_date_to:
try:
from datetime import timedelta
geo_date_to_obj = datetime.strptime(geo_date_to, "%Y-%m-%d")
geo_date_to_obj = geo_date_to_obj + timedelta(days=1)
geo_filter_q &= Q(source_objitems__geo_obj__timestamp__lt=geo_date_to_obj)
except (ValueError, TypeError):
pass
if geo_filter_q:
sources = sources.filter(geo_filter_q).distinct()
# Search by ID or name
if search_query:
try:
# Try to search by ID first
search_id = int(search_query)
sources = sources.filter(id=search_id)
except ValueError:
# If not a number, search by name in related objitems
sources = sources.filter(
source_objitems__name__icontains=search_query
).distinct()
# Filter by satellites
if selected_satellites:
sources = sources.filter(
source_objitems__parameter_obj__id_satellite_id__in=selected_satellites
).distinct()
# Filter by polarizations
if selected_polarizations:
sources = sources.filter(
source_objitems__parameter_obj__polarization_id__in=selected_polarizations
).distinct()
# Filter by modulations
if selected_modulations:
sources = sources.filter(
source_objitems__parameter_obj__modulation_id__in=selected_modulations
).distinct()
# Filter by frequency range
if freq_min:
try:
freq_min_val = float(freq_min)
sources = sources.filter(source_objitems__parameter_obj__frequency__gte=freq_min_val).distinct()
except (ValueError, TypeError):
pass
if freq_max:
try:
freq_max_val = float(freq_max)
sources = sources.filter(source_objitems__parameter_obj__frequency__lte=freq_max_val).distinct()
except (ValueError, TypeError):
pass
# Filter by frequency range (bandwidth)
if freq_range_min:
try:
freq_range_min_val = float(freq_range_min)
sources = sources.filter(source_objitems__parameter_obj__freq_range__gte=freq_range_min_val).distinct()
except (ValueError, TypeError):
pass
if freq_range_max:
try:
freq_range_max_val = float(freq_range_max)
sources = sources.filter(source_objitems__parameter_obj__freq_range__lte=freq_range_max_val).distinct()
except (ValueError, TypeError):
pass
# Filter by symbol rate
if bod_velocity_min:
try:
bod_velocity_min_val = float(bod_velocity_min)
sources = sources.filter(source_objitems__parameter_obj__bod_velocity__gte=bod_velocity_min_val).distinct()
except (ValueError, TypeError):
pass
if bod_velocity_max:
try:
bod_velocity_max_val = float(bod_velocity_max)
sources = sources.filter(source_objitems__parameter_obj__bod_velocity__lte=bod_velocity_max_val).distinct()
except (ValueError, TypeError):
pass
# Filter by SNR
if snr_min:
try:
snr_min_val = float(snr_min)
sources = sources.filter(source_objitems__parameter_obj__snr__gte=snr_min_val).distinct()
except (ValueError, TypeError):
pass
if snr_max:
try:
snr_max_val = float(snr_max)
sources = sources.filter(source_objitems__parameter_obj__snr__lte=snr_max_val).distinct()
except (ValueError, TypeError):
pass
# Filter by mirrors
if selected_mirrors:
sources = sources.filter(
source_objitems__geo_obj__mirrors__id__in=selected_mirrors
).distinct()
# Filter by polygon
if polygon_geom:
sources = sources.filter(
source_objitems__geo_obj__coords__within=polygon_geom
).distinct()
# Apply sorting
valid_sort_fields = {
"id": "id",
"-id": "-id",
"created_at": "created_at",
"-created_at": "-created_at",
"updated_at": "updated_at",
"-updated_at": "-updated_at",
"objitem_count": "objitem_count",
"-objitem_count": "-objitem_count",
}
if sort_param in valid_sort_fields:
sources = sources.order_by(valid_sort_fields[sort_param])
# Create paginator
paginator = Paginator(sources, items_per_page)
page_obj = paginator.get_page(page_number)
# Prepare data for display
processed_sources = []
for source in page_obj:
# Format coordinates
coords_average_str = format_coords_display(source.coords_average)
coords_kupsat_str = format_coords_display(source.coords_kupsat)
coords_valid_str = format_coords_display(source.coords_valid)
coords_reference_str = format_coords_display(source.coords_reference)
# Use pre-filtered objitems from Prefetch
objitems_to_display = source.filtered_objitems
# Use annotated count (consistent with filtering)
objitem_count = source.objitem_count
# Get satellites, name and check for LyngSat
satellite_names = set()
satellite_ids = set()
has_lyngsat = False
lyngsat_id = None
source_name = None
for objitem in objitems_to_display:
# Get name from first objitem
if source_name is None and objitem.name:
source_name = objitem.name
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
if hasattr(objitem.parameter_obj, 'id_satellite') and objitem.parameter_obj.id_satellite:
satellite_names.add(objitem.parameter_obj.id_satellite.name)
satellite_ids.add(objitem.parameter_obj.id_satellite.id)
# Check if any objitem has LyngSat
if hasattr(objitem, 'lyngsat_source') and objitem.lyngsat_source:
has_lyngsat = True
lyngsat_id = objitem.lyngsat_source.id
satellite_str = ", ".join(sorted(satellite_names)) if satellite_names else "-"
# Get first satellite ID for modal link (if multiple satellites, use first one)
first_satellite_id = min(satellite_ids) if satellite_ids else None
# Отметки теперь привязаны к TechAnalyze, а не к Source
marks_data = []
# Get info name and ownership
info_name = source.info.name if source.info else '-'
ownership_name = source.ownership.name if source.ownership else '-'
processed_sources.append({
'id': source.id,
'name': source_name if source_name else '-',
'info': info_name,
'ownership': ownership_name,
'coords_average': coords_average_str,
'coords_kupsat': coords_kupsat_str,
'coords_valid': coords_valid_str,
'coords_reference': coords_reference_str,
'objitem_count': objitem_count,
'satellite': satellite_str,
'satellite_id': first_satellite_id,
'created_at': source.created_at,
'updated_at': source.updated_at,
'confirm_at': source.confirm_at,
'last_signal_at': source.last_signal_at,
'has_lyngsat': has_lyngsat,
'lyngsat_id': lyngsat_id,
'marks': marks_data,
'note': source.note if source.note else '-'
})
# Prepare context for template
context = {
'page_obj': page_obj,
'processed_sources': processed_sources,
'items_per_page': items_per_page,
'available_items_per_page': [50, 100, 500, 1000],
'sort': sort_param,
'search_query': search_query,
# Source-level filters
'has_coords_average': has_coords_average,
'has_coords_kupsat': has_coords_kupsat,
'has_coords_valid': has_coords_valid,
'has_coords_reference': has_coords_reference,
'selected_info': [
int(x) if isinstance(x, str) else x for x in selected_info if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'selected_ownership': [
int(x) if isinstance(x, str) else x for x in selected_ownership if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'objitem_count_min': objitem_count_min,
'objitem_count_max': objitem_count_max,
'date_from': date_from,
'date_to': date_to,
# Source request filters
'has_requests': has_requests,
'selected_request_statuses': selected_request_statuses,
'selected_request_priorities': selected_request_priorities,
'request_gso_success': request_gso_success,
'request_kubsat_success': request_kubsat_success,
'request_planned_from': request_planned_from,
'request_planned_to': request_planned_to,
'request_date_from': request_date_from,
'request_date_to': request_date_to,
# ObjItem-level filters
'geo_date_from': geo_date_from,
'geo_date_to': geo_date_to,
'object_infos': object_infos,
'object_ownerships': object_ownerships,
'satellites': satellites,
'selected_satellites': [
int(x) if isinstance(x, str) else x for x in selected_satellites if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'polarizations': polarizations,
'selected_polarizations': [
int(x) if isinstance(x, str) else x for x in selected_polarizations if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'modulations': modulations,
'selected_modulations': [
int(x) if isinstance(x, str) else x for x in selected_modulations if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'freq_min': freq_min,
'freq_max': freq_max,
'freq_range_min': freq_range_min,
'freq_range_max': freq_range_max,
'bod_velocity_min': bod_velocity_min,
'bod_velocity_max': bod_velocity_max,
'snr_min': snr_min,
'snr_max': snr_max,
'mirrors': mirrors,
'selected_mirrors': [
int(x) if isinstance(x, str) else x for x in selected_mirrors if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'object_infos': object_infos,
'polygon_coords': json.dumps(polygon_coords) if polygon_coords else None,
'full_width_page': True,
}
return render(request, "mainapp/source_list.html", context)
class AdminModeratorMixin(UserPassesTestMixin):
"""Mixin to restrict access to admin and moderator roles only."""
def test_func(self):
return (
self.request.user.is_authenticated and
hasattr(self.request.user, 'customuser') and
self.request.user.customuser.role in ['admin', 'moderator']
)
def handle_no_permission(self):
messages.error(self.request, 'У вас нет прав для выполнения этого действия.')
return redirect('mainapp:source_list')
class SourceCreateView(LoginRequiredMixin, AdminModeratorMixin, View):
"""View for creating new Source."""
def get(self, request):
form = SourceForm()
context = {
'object': None,
'form': form,
'objitems': [],
'full_width_page': True,
}
return render(request, 'mainapp/source_form.html', context)
def post(self, request):
form = SourceForm(request.POST)
if form.is_valid():
source = form.save(commit=False)
# Set created_by and updated_by to current user
if hasattr(request.user, 'customuser'):
source.created_by = request.user.customuser
source.updated_by = request.user.customuser
source.save()
messages.success(request, f'Источник #{source.id} успешно создан.')
# Redirect to edit page
return redirect('mainapp:source_update', pk=source.id)
# If form is invalid, re-render with errors
context = {
'object': None,
'form': form,
'objitems': [],
'full_width_page': True,
}
return render(request, 'mainapp/source_form.html', context)
class SourceUpdateView(LoginRequiredMixin, AdminModeratorMixin, View):
"""View for editing Source with 4 coordinate fields and related ObjItems."""
def get(self, request, pk):
source = get_object_or_404(Source, pk=pk)
form = SourceForm(instance=source)
form.fields['average_latitude'].disabled = True
form.fields['average_longitude'].disabled = True
# Get related ObjItems ordered by creation date
objitems = source.source_objitems.select_related(
'parameter_obj',
'parameter_obj__id_satellite',
'parameter_obj__polarization',
'parameter_obj__modulation',
'parameter_obj__standard',
'geo_obj',
'created_by__user',
'updated_by__user'
).order_by('created_at')
context = {
'object': source,
'form': form,
'objitems': objitems,
'full_width_page': True,
}
return render(request, 'mainapp/source_form.html', context)
def post(self, request, pk):
source = get_object_or_404(Source, pk=pk)
form = SourceForm(request.POST, instance=source)
if form.is_valid():
source = form.save(commit=False)
# Set updated_by to current user
if hasattr(request.user, 'customuser'):
source.updated_by = request.user.customuser
source.save()
messages.success(request, f'Источник #{source.id} успешно обновлен.')
# Redirect back with query params if present
if request.GET.urlencode():
return redirect(f"{reverse('mainapp:source_update', args=[source.id])}?{request.GET.urlencode()}")
return redirect('mainapp:source_update', pk=source.id)
# If form is invalid, re-render with errors
objitems = source.source_objitems.select_related(
'parameter_obj',
'parameter_obj__id_satellite',
'parameter_obj__polarization',
'parameter_obj__modulation',
'parameter_obj__standard',
'geo_obj',
'created_by__user',
'updated_by__user'
).order_by('created_at')
context = {
'object': source,
'form': form,
'objitems': objitems,
'full_width_page': True,
}
return render(request, 'mainapp/source_form.html', context)
class SourceDeleteView(LoginRequiredMixin, AdminModeratorMixin, View):
"""View for deleting Source."""
def get(self, request, pk):
source = get_object_or_404(Source, pk=pk)
context = {
'object': source,
'objitems_count': source.source_objitems.count(),
}
return render(request, 'mainapp/source_confirm_delete.html', context)
def post(self, request, pk):
source = get_object_or_404(Source, pk=pk)
source_id = source.id
try:
source.delete()
messages.success(request, f'Источник #{source_id} успешно удален.')
except Exception as e:
messages.error(request, f'Ошибка при удалении источника: {str(e)}')
return redirect('mainapp:source_update', pk=pk)
# Redirect to source list
if request.GET.urlencode():
return redirect(f"{reverse('mainapp:source_list')}?{request.GET.urlencode()}")
return redirect('mainapp:source_list')
class DeleteSelectedSourcesView(LoginRequiredMixin, AdminModeratorMixin, View):
"""View for deleting multiple selected sources with confirmation."""
def get(self, request):
"""Show confirmation page with details about sources to be deleted."""
ids = request.GET.get("ids", "")
if not ids:
messages.error(request, "Не выбраны источники для удаления")
return redirect('mainapp:source_list')
try:
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
sources = Source.objects.filter(id__in=id_list).prefetch_related(
'source_objitems',
'source_objitems__parameter_obj',
'source_objitems__parameter_obj__id_satellite',
'source_objitems__geo_obj'
).annotate(
objitem_count=Count('source_objitems')
)
# Prepare detailed information about sources
sources_info = []
total_objitems = 0
for source in sources:
# Get satellites for this source
satellite_names = set()
for objitem in source.source_objitems.all():
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
if hasattr(objitem.parameter_obj, 'id_satellite') and objitem.parameter_obj.id_satellite:
satellite_names.add(objitem.parameter_obj.id_satellite.name)
objitem_count = source.objitem_count
total_objitems += objitem_count
sources_info.append({
'id': source.id,
'objitem_count': objitem_count,
'satellites': ", ".join(sorted(satellite_names)) if satellite_names else "-",
})
context = {
'sources_info': sources_info,
'total_sources': len(sources_info),
'total_objitems': total_objitems,
'ids': ids,
}
return render(request, 'mainapp/source_bulk_delete_confirm.html', context)
except Exception as e:
messages.error(request, f'Ошибка при подготовке удаления: {str(e)}')
return redirect('mainapp:source_list')
def post(self, request):
"""Actually delete the selected sources."""
ids = request.POST.get("ids", "")
if not ids:
return JsonResponse({"error": "Нет ID для удаления"}, status=400)
try:
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
# Get count before deletion
sources = Source.objects.filter(id__in=id_list)
deleted_sources_count = sources.count()
# Delete sources (cascade will delete related objitems)
sources.delete()
messages.success(
request,
f'Успешно удалено источников: {deleted_sources_count}'
)
return JsonResponse({
"success": True,
"message": f"Успешно удалено источников: {deleted_sources_count}",
"deleted_count": deleted_sources_count,
})
except Exception as e:
return JsonResponse({"error": f"Ошибка при удалении: {str(e)}"}, status=500)
class MergeSourcesView(LoginRequiredMixin, AdminModeratorMixin, View):
"""View for merging multiple sources into one."""
def post(self, request):
"""Merge selected sources into the first one."""
try:
# Parse JSON body
import json
data = json.loads(request.body)
source_ids = data.get('source_ids', [])
info_id = data.get('info_id')
ownership_id = data.get('ownership_id')
note = data.get('note', '')
# Validate input
if not source_ids or len(source_ids) < 2:
return JsonResponse({
'success': False,
'error': 'Необходимо выбрать минимум 2 источника для объединения'
}, status=400)
if not info_id:
return JsonResponse({
'success': False,
'error': 'Необходимо выбрать тип объекта'
}, status=400)
if not ownership_id:
return JsonResponse({
'success': False,
'error': 'Необходимо выбрать принадлежность объекта'
}, status=400)
# Get all sources
sources = Source.objects.filter(id__in=source_ids).order_by('id')
if sources.count() != len(source_ids):
return JsonResponse({
'success': False,
'error': 'Некоторые источники не найдены'
}, status=404)
# First source is the target
target_source = sources.first()
sources_to_merge = sources.exclude(id=target_source.id)
# Get ObjectInfo and ObjectOwnership
from ..models import ObjectInfo, ObjectOwnership
try:
info = ObjectInfo.objects.get(id=info_id)
ownership = ObjectOwnership.objects.get(id=ownership_id)
except (ObjectInfo.DoesNotExist, ObjectOwnership.DoesNotExist):
return JsonResponse({
'success': False,
'error': 'Тип объекта или принадлежность не найдены'
}, status=404)
# Start transaction
from django.db import transaction
with transaction.atomic():
# Update target source
target_source.info = info
target_source.ownership = ownership
target_source.note = note
if hasattr(request.user, 'customuser'):
target_source.updated_by = request.user.customuser
target_source.save()
# Move all ObjItems from sources_to_merge to target_source
total_moved = 0
for source in sources_to_merge:
# Get all objitems for this source
objitems = source.source_objitems.all()
objitem_count = objitems.count()
# Update source field for all objitems
objitems.update(source=target_source)
total_moved += objitem_count
# Recalculate coords_average for target source
target_source._recalculate_average_coords()
target_source.update_confirm_at()
target_source.save()
# Delete sources_to_merge (without cascade deleting objitems since we moved them)
# We need to delete marks first (they have CASCADE)
from ..models import ObjectMark
ObjectMark.objects.filter(source__in=sources_to_merge).delete()
# Now delete the sources
deleted_count = sources_to_merge.count()
sources_to_merge.delete()
return JsonResponse({
'success': True,
'message': f'Успешно объединено {deleted_count + 1} источников. Перемещено {total_moved} точек в источник #{target_source.id}',
'target_source_id': target_source.id,
'moved_objitems': total_moved,
'deleted_sources': deleted_count
})
except json.JSONDecodeError:
return JsonResponse({
'success': False,
'error': 'Неверный формат данных'
}, status=400)
except Exception as e:
return JsonResponse({
'success': False,
'error': f'Ошибка при объединении источников: {str(e)}'
}, status=500)