Files
dbstorage/dbapp/mainapp/views/source.py

859 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Source related views.
"""
from datetime import datetime
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
from django.core.paginator import Paginator
from django.db.models import Count, Q
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.views import View
from ..forms import SourceForm
from ..models import Source, Satellite
from ..utils import format_coords_display, parse_pagination_params
class SourceListView(LoginRequiredMixin, View):
"""
View for displaying a list of sources (Source).
"""
def get(self, request):
# Get pagination parameters
page_number, items_per_page = parse_pagination_params(request)
# Get sorting parameters (default to ID ascending)
sort_param = request.GET.get("sort", "id")
# Get filter parameters - Source level
search_query = request.GET.get("search", "").strip()
has_coords_average = request.GET.get("has_coords_average")
has_coords_kupsat = request.GET.get("has_coords_kupsat")
has_coords_valid = request.GET.get("has_coords_valid")
has_coords_reference = request.GET.get("has_coords_reference")
has_lyngsat = request.GET.get("has_lyngsat")
selected_info = request.GET.getlist("info_id")
objitem_count_min = request.GET.get("objitem_count_min", "").strip()
objitem_count_max = request.GET.get("objitem_count_max", "").strip()
date_from = request.GET.get("date_from", "").strip()
date_to = request.GET.get("date_to", "").strip()
# Signal mark filters
has_signal_mark = request.GET.get("has_signal_mark")
mark_date_from = request.GET.get("mark_date_from", "").strip()
mark_date_to = request.GET.get("mark_date_to", "").strip()
# Get filter parameters - ObjItem level (параметры точек)
geo_date_from = request.GET.get("geo_date_from", "").strip()
geo_date_to = request.GET.get("geo_date_to", "").strip()
selected_satellites = request.GET.getlist("satellite_id")
selected_polarizations = request.GET.getlist("polarization_id")
selected_modulations = request.GET.getlist("modulation_id")
selected_mirrors = request.GET.getlist("mirror_id")
freq_min = request.GET.get("freq_min", "").strip()
freq_max = request.GET.get("freq_max", "").strip()
freq_range_min = request.GET.get("freq_range_min", "").strip()
freq_range_max = request.GET.get("freq_range_max", "").strip()
bod_velocity_min = request.GET.get("bod_velocity_min", "").strip()
bod_velocity_max = request.GET.get("bod_velocity_max", "").strip()
snr_min = request.GET.get("snr_min", "").strip()
snr_max = request.GET.get("snr_max", "").strip()
# Get all satellites for filter
satellites = (
Satellite.objects.filter(parameters__objitem__source__isnull=False)
.distinct()
.only("id", "name")
.order_by("name")
)
# Get all polarizations, modulations for filters
from ..models import Polarization, Modulation, ObjectInfo
polarizations = Polarization.objects.all().order_by("name")
modulations = Modulation.objects.all().order_by("name")
# Get all ObjectInfo for filter
object_infos = ObjectInfo.objects.all().order_by("name")
# Get all satellites that are used as mirrors
mirrors = (
Satellite.objects.filter(geo_mirrors__isnull=False)
.distinct()
.only("id", "name")
.order_by("name")
)
# Build Q object for filtering objitems in count
# This will be used in the annotate to count only objitems that match filters
objitem_filter_q = Q()
has_objitem_filter = False
# Check if search is by name (not by ID)
search_by_name = False
if search_query:
try:
int(search_query) # Try to parse as ID
except ValueError:
# Not a number, so it's a name search
search_by_name = True
objitem_filter_q &= Q(source_objitems__name__icontains=search_query)
has_objitem_filter = True
# Add geo date filter
if geo_date_from:
try:
geo_date_from_obj = datetime.strptime(geo_date_from, "%Y-%m-%d")
objitem_filter_q &= Q(source_objitems__geo_obj__timestamp__gte=geo_date_from_obj)
has_objitem_filter = True
except (ValueError, TypeError):
pass
if geo_date_to:
try:
from datetime import timedelta
geo_date_to_obj = datetime.strptime(geo_date_to, "%Y-%m-%d")
# Add one day to include entire end date
geo_date_to_obj = geo_date_to_obj + timedelta(days=1)
objitem_filter_q &= Q(source_objitems__geo_obj__timestamp__lt=geo_date_to_obj)
has_objitem_filter = True
except (ValueError, TypeError):
pass
# Add satellite filter to count
if selected_satellites:
objitem_filter_q &= Q(source_objitems__parameter_obj__id_satellite_id__in=selected_satellites)
has_objitem_filter = True
# Add polarization filter
if selected_polarizations:
objitem_filter_q &= Q(source_objitems__parameter_obj__polarization_id__in=selected_polarizations)
has_objitem_filter = True
# Add modulation filter
if selected_modulations:
objitem_filter_q &= Q(source_objitems__parameter_obj__modulation_id__in=selected_modulations)
has_objitem_filter = True
# Add frequency filter
if freq_min:
try:
freq_min_val = float(freq_min)
objitem_filter_q &= Q(source_objitems__parameter_obj__frequency__gte=freq_min_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
if freq_max:
try:
freq_max_val = float(freq_max)
objitem_filter_q &= Q(source_objitems__parameter_obj__frequency__lte=freq_max_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
# Add frequency range (bandwidth) filter
if freq_range_min:
try:
freq_range_min_val = float(freq_range_min)
objitem_filter_q &= Q(source_objitems__parameter_obj__freq_range__gte=freq_range_min_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
if freq_range_max:
try:
freq_range_max_val = float(freq_range_max)
objitem_filter_q &= Q(source_objitems__parameter_obj__freq_range__lte=freq_range_max_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
# Add symbol rate (bod_velocity) filter
if bod_velocity_min:
try:
bod_velocity_min_val = float(bod_velocity_min)
objitem_filter_q &= Q(source_objitems__parameter_obj__bod_velocity__gte=bod_velocity_min_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
if bod_velocity_max:
try:
bod_velocity_max_val = float(bod_velocity_max)
objitem_filter_q &= Q(source_objitems__parameter_obj__bod_velocity__lte=bod_velocity_max_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
# Add SNR filter
if snr_min:
try:
snr_min_val = float(snr_min)
objitem_filter_q &= Q(source_objitems__parameter_obj__snr__gte=snr_min_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
if snr_max:
try:
snr_max_val = float(snr_max)
objitem_filter_q &= Q(source_objitems__parameter_obj__snr__lte=snr_max_val)
has_objitem_filter = True
except (ValueError, TypeError):
pass
# Add mirrors filter
if selected_mirrors:
objitem_filter_q &= Q(source_objitems__geo_obj__mirrors__id__in=selected_mirrors)
has_objitem_filter = True
# Get all Source objects with query optimization
# Using annotate to count ObjItems efficiently (single query with GROUP BY)
# Using prefetch_related for reverse ForeignKey relationships to avoid N+1 queries
sources = Source.objects.select_related(
'info'
).prefetch_related(
'source_objitems',
'source_objitems__parameter_obj',
'source_objitems__parameter_obj__id_satellite',
'source_objitems__geo_obj',
'marks',
'marks__created_by__user'
).annotate(
objitem_count=Count('source_objitems', filter=objitem_filter_q, distinct=True) if has_objitem_filter else Count('source_objitems')
)
# Apply filters
# Filter by coords_average presence
if has_coords_average == "1":
sources = sources.filter(coords_average__isnull=False)
elif has_coords_average == "0":
sources = sources.filter(coords_average__isnull=True)
# Filter by coords_kupsat presence
if has_coords_kupsat == "1":
sources = sources.filter(coords_kupsat__isnull=False)
elif has_coords_kupsat == "0":
sources = sources.filter(coords_kupsat__isnull=True)
# Filter by coords_valid presence
if has_coords_valid == "1":
sources = sources.filter(coords_valid__isnull=False)
elif has_coords_valid == "0":
sources = sources.filter(coords_valid__isnull=True)
# Filter by coords_reference presence
if has_coords_reference == "1":
sources = sources.filter(coords_reference__isnull=False)
elif has_coords_reference == "0":
sources = sources.filter(coords_reference__isnull=True)
# Filter by LyngSat presence
if has_lyngsat == "1":
sources = sources.filter(source_objitems__lyngsat_source__isnull=False).distinct()
elif has_lyngsat == "0":
sources = sources.filter(
~Q(source_objitems__lyngsat_source__isnull=False)
).distinct()
# Filter by ObjectInfo (info field)
if selected_info:
sources = sources.filter(info_id__in=selected_info)
# Filter by signal marks
if has_signal_mark or mark_date_from or mark_date_to:
mark_filter_q = Q()
# Filter by mark value (signal presence)
if has_signal_mark == "1":
mark_filter_q &= Q(marks__mark=True)
elif has_signal_mark == "0":
mark_filter_q &= Q(marks__mark=False)
# Filter by mark date range
if mark_date_from:
try:
mark_date_from_obj = datetime.strptime(mark_date_from, "%Y-%m-%d")
mark_filter_q &= Q(marks__timestamp__gte=mark_date_from_obj)
except (ValueError, TypeError):
pass
if mark_date_to:
try:
from datetime import timedelta
mark_date_to_obj = datetime.strptime(mark_date_to, "%Y-%m-%d")
# Add one day to include entire end date
mark_date_to_obj = mark_date_to_obj + timedelta(days=1)
mark_filter_q &= Q(marks__timestamp__lt=mark_date_to_obj)
except (ValueError, TypeError):
pass
if mark_filter_q:
sources = sources.filter(mark_filter_q).distinct()
# Filter by ObjItem count
if objitem_count_min:
try:
min_count = int(objitem_count_min)
sources = sources.filter(objitem_count__gte=min_count)
except ValueError:
pass
if objitem_count_max:
try:
max_count = int(objitem_count_max)
sources = sources.filter(objitem_count__lte=max_count)
except ValueError:
pass
# Filter by creation date range
if date_from:
try:
date_from_obj = datetime.strptime(date_from, "%Y-%m-%d")
sources = sources.filter(created_at__gte=date_from_obj)
except (ValueError, TypeError):
pass
if date_to:
try:
from datetime import timedelta
date_to_obj = datetime.strptime(date_to, "%Y-%m-%d")
# Add one day to include entire end date
date_to_obj = date_to_obj + timedelta(days=1)
sources = sources.filter(created_at__lt=date_to_obj)
except (ValueError, TypeError):
pass
# Filter by Geo timestamp range (only filter sources that have matching objitems)
if geo_date_from or geo_date_to:
geo_filter_q = Q()
if geo_date_from:
try:
geo_date_from_obj = datetime.strptime(geo_date_from, "%Y-%m-%d")
geo_filter_q &= Q(source_objitems__geo_obj__timestamp__gte=geo_date_from_obj)
except (ValueError, TypeError):
pass
if geo_date_to:
try:
from datetime import timedelta
geo_date_to_obj = datetime.strptime(geo_date_to, "%Y-%m-%d")
geo_date_to_obj = geo_date_to_obj + timedelta(days=1)
geo_filter_q &= Q(source_objitems__geo_obj__timestamp__lt=geo_date_to_obj)
except (ValueError, TypeError):
pass
if geo_filter_q:
sources = sources.filter(geo_filter_q).distinct()
# Search by ID or name
if search_query:
try:
# Try to search by ID first
search_id = int(search_query)
sources = sources.filter(id=search_id)
except ValueError:
# If not a number, search by name in related objitems
sources = sources.filter(
source_objitems__name__icontains=search_query
).distinct()
# Filter by satellites
if selected_satellites:
sources = sources.filter(
source_objitems__parameter_obj__id_satellite_id__in=selected_satellites
).distinct()
# Filter by polarizations
if selected_polarizations:
sources = sources.filter(
source_objitems__parameter_obj__polarization_id__in=selected_polarizations
).distinct()
# Filter by modulations
if selected_modulations:
sources = sources.filter(
source_objitems__parameter_obj__modulation_id__in=selected_modulations
).distinct()
# Filter by frequency range
if freq_min:
try:
freq_min_val = float(freq_min)
sources = sources.filter(source_objitems__parameter_obj__frequency__gte=freq_min_val).distinct()
except (ValueError, TypeError):
pass
if freq_max:
try:
freq_max_val = float(freq_max)
sources = sources.filter(source_objitems__parameter_obj__frequency__lte=freq_max_val).distinct()
except (ValueError, TypeError):
pass
# Filter by frequency range (bandwidth)
if freq_range_min:
try:
freq_range_min_val = float(freq_range_min)
sources = sources.filter(source_objitems__parameter_obj__freq_range__gte=freq_range_min_val).distinct()
except (ValueError, TypeError):
pass
if freq_range_max:
try:
freq_range_max_val = float(freq_range_max)
sources = sources.filter(source_objitems__parameter_obj__freq_range__lte=freq_range_max_val).distinct()
except (ValueError, TypeError):
pass
# Filter by symbol rate
if bod_velocity_min:
try:
bod_velocity_min_val = float(bod_velocity_min)
sources = sources.filter(source_objitems__parameter_obj__bod_velocity__gte=bod_velocity_min_val).distinct()
except (ValueError, TypeError):
pass
if bod_velocity_max:
try:
bod_velocity_max_val = float(bod_velocity_max)
sources = sources.filter(source_objitems__parameter_obj__bod_velocity__lte=bod_velocity_max_val).distinct()
except (ValueError, TypeError):
pass
# Filter by SNR
if snr_min:
try:
snr_min_val = float(snr_min)
sources = sources.filter(source_objitems__parameter_obj__snr__gte=snr_min_val).distinct()
except (ValueError, TypeError):
pass
if snr_max:
try:
snr_max_val = float(snr_max)
sources = sources.filter(source_objitems__parameter_obj__snr__lte=snr_max_val).distinct()
except (ValueError, TypeError):
pass
# Filter by mirrors
if selected_mirrors:
sources = sources.filter(
source_objitems__geo_obj__mirrors__id__in=selected_mirrors
).distinct()
# Apply sorting
valid_sort_fields = {
"id": "id",
"-id": "-id",
"created_at": "created_at",
"-created_at": "-created_at",
"updated_at": "updated_at",
"-updated_at": "-updated_at",
"objitem_count": "objitem_count",
"-objitem_count": "-objitem_count",
}
if sort_param in valid_sort_fields:
sources = sources.order_by(valid_sort_fields[sort_param])
# Create paginator
paginator = Paginator(sources, items_per_page)
page_obj = paginator.get_page(page_number)
# Prepare data for display
processed_sources = []
for source in page_obj:
# Format coordinates
coords_average_str = format_coords_display(source.coords_average)
coords_kupsat_str = format_coords_display(source.coords_kupsat)
coords_valid_str = format_coords_display(source.coords_valid)
coords_reference_str = format_coords_display(source.coords_reference)
# Filter objitems for display (to get satellites and lyngsat info)
objitems_to_display = source.source_objitems.all()
# Apply the same filters as in the count annotation
if geo_date_from:
try:
geo_date_from_obj = datetime.strptime(geo_date_from, "%Y-%m-%d")
objitems_to_display = objitems_to_display.filter(geo_obj__timestamp__gte=geo_date_from_obj)
except (ValueError, TypeError):
pass
if geo_date_to:
try:
from datetime import timedelta
geo_date_to_obj = datetime.strptime(geo_date_to, "%Y-%m-%d")
geo_date_to_obj = geo_date_to_obj + timedelta(days=1)
objitems_to_display = objitems_to_display.filter(geo_obj__timestamp__lt=geo_date_to_obj)
except (ValueError, TypeError):
pass
if selected_satellites:
objitems_to_display = objitems_to_display.filter(parameter_obj__id_satellite_id__in=selected_satellites)
if selected_polarizations:
objitems_to_display = objitems_to_display.filter(parameter_obj__polarization_id__in=selected_polarizations)
if selected_modulations:
objitems_to_display = objitems_to_display.filter(parameter_obj__modulation_id__in=selected_modulations)
if freq_min:
try:
objitems_to_display = objitems_to_display.filter(parameter_obj__frequency__gte=float(freq_min))
except (ValueError, TypeError):
pass
if freq_max:
try:
objitems_to_display = objitems_to_display.filter(parameter_obj__frequency__lte=float(freq_max))
except (ValueError, TypeError):
pass
if freq_range_min:
try:
objitems_to_display = objitems_to_display.filter(parameter_obj__freq_range__gte=float(freq_range_min))
except (ValueError, TypeError):
pass
if freq_range_max:
try:
objitems_to_display = objitems_to_display.filter(parameter_obj__freq_range__lte=float(freq_range_max))
except (ValueError, TypeError):
pass
if bod_velocity_min:
try:
objitems_to_display = objitems_to_display.filter(parameter_obj__bod_velocity__gte=float(bod_velocity_min))
except (ValueError, TypeError):
pass
if bod_velocity_max:
try:
objitems_to_display = objitems_to_display.filter(parameter_obj__bod_velocity__lte=float(bod_velocity_max))
except (ValueError, TypeError):
pass
if snr_min:
try:
objitems_to_display = objitems_to_display.filter(parameter_obj__snr__gte=float(snr_min))
except (ValueError, TypeError):
pass
if snr_max:
try:
objitems_to_display = objitems_to_display.filter(parameter_obj__snr__lte=float(snr_max))
except (ValueError, TypeError):
pass
if selected_mirrors:
objitems_to_display = objitems_to_display.filter(geo_obj__mirrors__id__in=selected_mirrors)
if search_by_name:
objitems_to_display = objitems_to_display.filter(name__icontains=search_query)
# Use annotated count (consistent with filtering)
objitem_count = source.objitem_count
# Get satellites, name and check for LyngSat
satellite_names = set()
satellite_ids = set()
has_lyngsat = False
lyngsat_id = None
source_name = None
for objitem in objitems_to_display:
# Get name from first objitem
if source_name is None and objitem.name:
source_name = objitem.name
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
if hasattr(objitem.parameter_obj, 'id_satellite') and objitem.parameter_obj.id_satellite:
satellite_names.add(objitem.parameter_obj.id_satellite.name)
satellite_ids.add(objitem.parameter_obj.id_satellite.id)
# Check if any objitem has LyngSat
if hasattr(objitem, 'lyngsat_source') and objitem.lyngsat_source:
has_lyngsat = True
lyngsat_id = objitem.lyngsat_source.id
satellite_str = ", ".join(sorted(satellite_names)) if satellite_names else "-"
# Get first satellite ID for modal link (if multiple satellites, use first one)
first_satellite_id = min(satellite_ids) if satellite_ids else None
# Get all marks (presence/absence)
marks_data = []
for mark in source.marks.all():
marks_data.append({
'mark': mark.mark,
'timestamp': mark.timestamp,
'created_by': str(mark.created_by) if mark.created_by else '-',
})
# Get info name
info_name = source.info.name if source.info else '-'
processed_sources.append({
'id': source.id,
'name': source_name if source_name else '-',
'info': info_name,
'coords_average': coords_average_str,
'coords_kupsat': coords_kupsat_str,
'coords_valid': coords_valid_str,
'coords_reference': coords_reference_str,
'objitem_count': objitem_count,
'satellite': satellite_str,
'satellite_id': first_satellite_id,
'created_at': source.created_at,
'updated_at': source.updated_at,
'has_lyngsat': has_lyngsat,
'lyngsat_id': lyngsat_id,
'marks': marks_data,
})
# Prepare context for template
context = {
'page_obj': page_obj,
'processed_sources': processed_sources,
'items_per_page': items_per_page,
'available_items_per_page': [50, 100, 500, 1000],
'sort': sort_param,
'search_query': search_query,
# Source-level filters
'has_coords_average': has_coords_average,
'has_coords_kupsat': has_coords_kupsat,
'has_coords_valid': has_coords_valid,
'has_coords_reference': has_coords_reference,
'has_lyngsat': has_lyngsat,
'selected_info': [
int(x) if isinstance(x, str) else x for x in selected_info if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'objitem_count_min': objitem_count_min,
'objitem_count_max': objitem_count_max,
'date_from': date_from,
'date_to': date_to,
'has_signal_mark': has_signal_mark,
'mark_date_from': mark_date_from,
'mark_date_to': mark_date_to,
# ObjItem-level filters
'geo_date_from': geo_date_from,
'geo_date_to': geo_date_to,
'satellites': satellites,
'selected_satellites': [
int(x) if isinstance(x, str) else x for x in selected_satellites if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'polarizations': polarizations,
'selected_polarizations': [
int(x) if isinstance(x, str) else x for x in selected_polarizations if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'modulations': modulations,
'selected_modulations': [
int(x) if isinstance(x, str) else x for x in selected_modulations if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'freq_min': freq_min,
'freq_max': freq_max,
'freq_range_min': freq_range_min,
'freq_range_max': freq_range_max,
'bod_velocity_min': bod_velocity_min,
'bod_velocity_max': bod_velocity_max,
'snr_min': snr_min,
'snr_max': snr_max,
'mirrors': mirrors,
'selected_mirrors': [
int(x) if isinstance(x, str) else x for x in selected_mirrors if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'object_infos': object_infos,
'full_width_page': True,
}
return render(request, "mainapp/source_list.html", context)
class AdminModeratorMixin(UserPassesTestMixin):
"""Mixin to restrict access to admin and moderator roles only."""
def test_func(self):
return (
self.request.user.is_authenticated and
hasattr(self.request.user, 'customuser') and
self.request.user.customuser.role in ['admin', 'moderator']
)
def handle_no_permission(self):
messages.error(self.request, 'У вас нет прав для выполнения этого действия.')
return redirect('mainapp:home')
class SourceUpdateView(LoginRequiredMixin, AdminModeratorMixin, View):
"""View for editing Source with 4 coordinate fields and related ObjItems."""
def get(self, request, pk):
source = get_object_or_404(Source, pk=pk)
form = SourceForm(instance=source)
# Get related ObjItems ordered by creation date
objitems = source.source_objitems.select_related(
'parameter_obj',
'parameter_obj__id_satellite',
'parameter_obj__polarization',
'parameter_obj__modulation',
'parameter_obj__standard',
'geo_obj',
'created_by__user',
'updated_by__user'
).order_by('created_at')
context = {
'object': source,
'form': form,
'objitems': objitems,
'full_width_page': True,
}
return render(request, 'mainapp/source_form.html', context)
def post(self, request, pk):
source = get_object_or_404(Source, pk=pk)
form = SourceForm(request.POST, instance=source)
if form.is_valid():
source = form.save(commit=False)
# Set updated_by to current user
if hasattr(request.user, 'customuser'):
source.updated_by = request.user.customuser
source.save()
messages.success(request, f'Источник #{source.id} успешно обновлен.')
# Redirect back with query params if present
if request.GET.urlencode():
return redirect(f"{reverse('mainapp:source_update', args=[source.id])}?{request.GET.urlencode()}")
return redirect('mainapp:source_update', pk=source.id)
# If form is invalid, re-render with errors
objitems = source.source_objitems.select_related(
'parameter_obj',
'parameter_obj__id_satellite',
'parameter_obj__polarization',
'parameter_obj__modulation',
'parameter_obj__standard',
'geo_obj',
'created_by__user',
'updated_by__user'
).order_by('created_at')
context = {
'object': source,
'form': form,
'objitems': objitems,
'full_width_page': True,
}
return render(request, 'mainapp/source_form.html', context)
class SourceDeleteView(LoginRequiredMixin, AdminModeratorMixin, View):
"""View for deleting Source."""
def get(self, request, pk):
source = get_object_or_404(Source, pk=pk)
context = {
'object': source,
'objitems_count': source.source_objitems.count(),
}
return render(request, 'mainapp/source_confirm_delete.html', context)
def post(self, request, pk):
source = get_object_or_404(Source, pk=pk)
source_id = source.id
try:
source.delete()
messages.success(request, f'Источник #{source_id} успешно удален.')
except Exception as e:
messages.error(request, f'Ошибка при удалении источника: {str(e)}')
return redirect('mainapp:source_update', pk=pk)
# Redirect to source list
if request.GET.urlencode():
return redirect(f"{reverse('mainapp:home')}?{request.GET.urlencode()}")
return redirect('mainapp:home')
class DeleteSelectedSourcesView(LoginRequiredMixin, AdminModeratorMixin, View):
"""View for deleting multiple selected sources with confirmation."""
def get(self, request):
"""Show confirmation page with details about sources to be deleted."""
ids = request.GET.get("ids", "")
if not ids:
messages.error(request, "Не выбраны источники для удаления")
return redirect('mainapp:home')
try:
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
sources = Source.objects.filter(id__in=id_list).prefetch_related(
'source_objitems',
'source_objitems__parameter_obj',
'source_objitems__parameter_obj__id_satellite',
'source_objitems__geo_obj'
).annotate(
objitem_count=Count('source_objitems')
)
# Prepare detailed information about sources
sources_info = []
total_objitems = 0
for source in sources:
# Get satellites for this source
satellite_names = set()
for objitem in source.source_objitems.all():
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
if hasattr(objitem.parameter_obj, 'id_satellite') and objitem.parameter_obj.id_satellite:
satellite_names.add(objitem.parameter_obj.id_satellite.name)
objitem_count = source.objitem_count
total_objitems += objitem_count
sources_info.append({
'id': source.id,
'objitem_count': objitem_count,
'satellites': ", ".join(sorted(satellite_names)) if satellite_names else "-",
})
context = {
'sources_info': sources_info,
'total_sources': len(sources_info),
'total_objitems': total_objitems,
'ids': ids,
}
return render(request, 'mainapp/source_bulk_delete_confirm.html', context)
except Exception as e:
messages.error(request, f'Ошибка при подготовке удаления: {str(e)}')
return redirect('mainapp:home')
def post(self, request):
"""Actually delete the selected sources."""
ids = request.POST.get("ids", "")
if not ids:
return JsonResponse({"error": "Нет ID для удаления"}, status=400)
try:
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
# Get count before deletion
sources = Source.objects.filter(id__in=id_list)
deleted_sources_count = sources.count()
# Delete sources (cascade will delete related objitems)
sources.delete()
messages.success(
request,
f'Успешно удалено источников: {deleted_sources_count}'
)
return JsonResponse({
"success": True,
"message": f"Успешно удалено источников: {deleted_sources_count}",
"deleted_count": deleted_sources_count,
})
except Exception as e:
return JsonResponse({"error": f"Ошибка при удалении: {str(e)}"}, status=500)