422 lines
16 KiB
Python
422 lines
16 KiB
Python
"""
|
||
Source related views.
|
||
"""
|
||
from datetime import datetime
|
||
|
||
from django.contrib import messages
|
||
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
|
||
from django.core.paginator import Paginator
|
||
from django.db.models import Count, Q
|
||
from django.http import JsonResponse
|
||
from django.shortcuts import get_object_or_404, redirect, render
|
||
from django.urls import reverse
|
||
from django.views import View
|
||
|
||
from ..forms import SourceForm
|
||
from ..models import Source, Satellite
|
||
from ..utils import parse_pagination_params
|
||
|
||
|
||
class SourceListView(LoginRequiredMixin, View):
|
||
"""
|
||
View for displaying a list of sources (Source).
|
||
"""
|
||
|
||
def get(self, request):
|
||
# Get pagination parameters
|
||
page_number, items_per_page = parse_pagination_params(request)
|
||
|
||
# Get sorting parameters (default to ID ascending)
|
||
sort_param = request.GET.get("sort", "id")
|
||
|
||
# Get filter parameters
|
||
search_query = request.GET.get("search", "").strip()
|
||
has_coords_average = request.GET.get("has_coords_average")
|
||
has_coords_kupsat = request.GET.get("has_coords_kupsat")
|
||
has_coords_valid = request.GET.get("has_coords_valid")
|
||
has_coords_reference = request.GET.get("has_coords_reference")
|
||
objitem_count_min = request.GET.get("objitem_count_min", "").strip()
|
||
objitem_count_max = request.GET.get("objitem_count_max", "").strip()
|
||
date_from = request.GET.get("date_from", "").strip()
|
||
date_to = request.GET.get("date_to", "").strip()
|
||
selected_satellites = request.GET.getlist("satellite_id")
|
||
|
||
# Get all satellites for filter
|
||
satellites = (
|
||
Satellite.objects.filter(parameters__objitem__source__isnull=False)
|
||
.distinct()
|
||
.only("id", "name")
|
||
.order_by("name")
|
||
)
|
||
|
||
# Get all Source objects with query optimization
|
||
# Using annotate to count ObjItems efficiently (single query with GROUP BY)
|
||
# Using prefetch_related for reverse ForeignKey relationships to avoid N+1 queries
|
||
sources = Source.objects.prefetch_related(
|
||
'source_objitems',
|
||
'source_objitems__parameter_obj',
|
||
'source_objitems__parameter_obj__id_satellite',
|
||
'source_objitems__geo_obj'
|
||
).annotate(
|
||
objitem_count=Count('source_objitems')
|
||
)
|
||
|
||
# Apply filters
|
||
# Filter by coords_average presence
|
||
if has_coords_average == "1":
|
||
sources = sources.filter(coords_average__isnull=False)
|
||
elif has_coords_average == "0":
|
||
sources = sources.filter(coords_average__isnull=True)
|
||
|
||
# Filter by coords_kupsat presence
|
||
if has_coords_kupsat == "1":
|
||
sources = sources.filter(coords_kupsat__isnull=False)
|
||
elif has_coords_kupsat == "0":
|
||
sources = sources.filter(coords_kupsat__isnull=True)
|
||
|
||
# Filter by coords_valid presence
|
||
if has_coords_valid == "1":
|
||
sources = sources.filter(coords_valid__isnull=False)
|
||
elif has_coords_valid == "0":
|
||
sources = sources.filter(coords_valid__isnull=True)
|
||
|
||
# Filter by coords_reference presence
|
||
if has_coords_reference == "1":
|
||
sources = sources.filter(coords_reference__isnull=False)
|
||
elif has_coords_reference == "0":
|
||
sources = sources.filter(coords_reference__isnull=True)
|
||
|
||
# Filter by ObjItem count
|
||
if objitem_count_min:
|
||
try:
|
||
min_count = int(objitem_count_min)
|
||
sources = sources.filter(objitem_count__gte=min_count)
|
||
except ValueError:
|
||
pass
|
||
|
||
if objitem_count_max:
|
||
try:
|
||
max_count = int(objitem_count_max)
|
||
sources = sources.filter(objitem_count__lte=max_count)
|
||
except ValueError:
|
||
pass
|
||
|
||
# Filter by creation date range
|
||
if date_from:
|
||
try:
|
||
date_from_obj = datetime.strptime(date_from, "%Y-%m-%d")
|
||
sources = sources.filter(created_at__gte=date_from_obj)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if date_to:
|
||
try:
|
||
from datetime import timedelta
|
||
date_to_obj = datetime.strptime(date_to, "%Y-%m-%d")
|
||
# Add one day to include entire end date
|
||
date_to_obj = date_to_obj + timedelta(days=1)
|
||
sources = sources.filter(created_at__lt=date_to_obj)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Search by ID
|
||
if search_query:
|
||
try:
|
||
search_id = int(search_query)
|
||
sources = sources.filter(id=search_id)
|
||
except ValueError:
|
||
# If not a number, ignore
|
||
pass
|
||
|
||
# Filter by satellites
|
||
if selected_satellites:
|
||
sources = sources.filter(
|
||
source_objitems__parameter_obj__id_satellite_id__in=selected_satellites
|
||
).distinct()
|
||
|
||
# Apply sorting
|
||
valid_sort_fields = {
|
||
"id": "id",
|
||
"-id": "-id",
|
||
"created_at": "created_at",
|
||
"-created_at": "-created_at",
|
||
"updated_at": "updated_at",
|
||
"-updated_at": "-updated_at",
|
||
"objitem_count": "objitem_count",
|
||
"-objitem_count": "-objitem_count",
|
||
}
|
||
|
||
if sort_param in valid_sort_fields:
|
||
sources = sources.order_by(valid_sort_fields[sort_param])
|
||
|
||
# Create paginator
|
||
paginator = Paginator(sources, items_per_page)
|
||
page_obj = paginator.get_page(page_number)
|
||
|
||
# Prepare data for display
|
||
processed_sources = []
|
||
for source in page_obj:
|
||
# Format coordinates
|
||
def format_coords(point):
|
||
if point:
|
||
longitude = point.coords[0]
|
||
latitude = point.coords[1]
|
||
lon = f"{longitude}E" if longitude > 0 else f"{abs(longitude)}W"
|
||
lat = f"{latitude}N" if latitude > 0 else f"{abs(latitude)}S"
|
||
return f"{lat} {lon}"
|
||
return "-"
|
||
|
||
coords_average_str = format_coords(source.coords_average)
|
||
coords_kupsat_str = format_coords(source.coords_kupsat)
|
||
coords_valid_str = format_coords(source.coords_valid)
|
||
coords_reference_str = format_coords(source.coords_reference)
|
||
|
||
# Get count of related ObjItems
|
||
objitem_count = source.objitem_count
|
||
|
||
# Get satellites for this source
|
||
satellite_names = set()
|
||
for objitem in source.source_objitems.all():
|
||
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
|
||
if hasattr(objitem.parameter_obj, 'id_satellite') and objitem.parameter_obj.id_satellite:
|
||
satellite_names.add(objitem.parameter_obj.id_satellite.name)
|
||
|
||
satellite_str = ", ".join(sorted(satellite_names)) if satellite_names else "-"
|
||
|
||
processed_sources.append({
|
||
'id': source.id,
|
||
'coords_average': coords_average_str,
|
||
'coords_kupsat': coords_kupsat_str,
|
||
'coords_valid': coords_valid_str,
|
||
'coords_reference': coords_reference_str,
|
||
'objitem_count': objitem_count,
|
||
'satellite': satellite_str,
|
||
'created_at': source.created_at,
|
||
'updated_at': source.updated_at,
|
||
})
|
||
|
||
# Prepare context for template
|
||
context = {
|
||
'page_obj': page_obj,
|
||
'processed_sources': processed_sources,
|
||
'items_per_page': items_per_page,
|
||
'available_items_per_page': [50, 100, 500, 1000],
|
||
'sort': sort_param,
|
||
'search_query': search_query,
|
||
'has_coords_average': has_coords_average,
|
||
'has_coords_kupsat': has_coords_kupsat,
|
||
'has_coords_valid': has_coords_valid,
|
||
'has_coords_reference': has_coords_reference,
|
||
'objitem_count_min': objitem_count_min,
|
||
'objitem_count_max': objitem_count_max,
|
||
'date_from': date_from,
|
||
'date_to': date_to,
|
||
'satellites': satellites,
|
||
'selected_satellites': [
|
||
int(x) if isinstance(x, str) else x for x in selected_satellites if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
|
||
],
|
||
'full_width_page': True,
|
||
}
|
||
|
||
return render(request, "mainapp/source_list.html", context)
|
||
|
||
|
||
|
||
class AdminModeratorMixin(UserPassesTestMixin):
|
||
"""Mixin to restrict access to admin and moderator roles only."""
|
||
|
||
def test_func(self):
|
||
return (
|
||
self.request.user.is_authenticated and
|
||
hasattr(self.request.user, 'customuser') and
|
||
self.request.user.customuser.role in ['admin', 'moderator']
|
||
)
|
||
|
||
def handle_no_permission(self):
|
||
messages.error(self.request, 'У вас нет прав для выполнения этого действия.')
|
||
return redirect('mainapp:home')
|
||
|
||
|
||
class SourceUpdateView(LoginRequiredMixin, AdminModeratorMixin, View):
|
||
"""View for editing Source with 4 coordinate fields and related ObjItems."""
|
||
|
||
def get(self, request, pk):
|
||
source = get_object_or_404(Source, pk=pk)
|
||
form = SourceForm(instance=source)
|
||
|
||
# Get related ObjItems ordered by creation date
|
||
objitems = source.source_objitems.select_related(
|
||
'parameter_obj',
|
||
'parameter_obj__id_satellite',
|
||
'parameter_obj__polarization',
|
||
'parameter_obj__modulation',
|
||
'parameter_obj__standard',
|
||
'geo_obj',
|
||
'created_by__user',
|
||
'updated_by__user'
|
||
).order_by('created_at')
|
||
|
||
context = {
|
||
'object': source,
|
||
'form': form,
|
||
'objitems': objitems,
|
||
'full_width_page': True,
|
||
}
|
||
|
||
return render(request, 'mainapp/source_form.html', context)
|
||
|
||
def post(self, request, pk):
|
||
source = get_object_or_404(Source, pk=pk)
|
||
form = SourceForm(request.POST, instance=source)
|
||
|
||
if form.is_valid():
|
||
source = form.save(commit=False)
|
||
# Set updated_by to current user
|
||
if hasattr(request.user, 'customuser'):
|
||
source.updated_by = request.user.customuser
|
||
source.save()
|
||
|
||
messages.success(request, f'Источник #{source.id} успешно обновлен.')
|
||
|
||
# Redirect back with query params if present
|
||
if request.GET.urlencode():
|
||
return redirect(f"{reverse('mainapp:source_update', args=[source.id])}?{request.GET.urlencode()}")
|
||
return redirect('mainapp:source_update', pk=source.id)
|
||
|
||
# If form is invalid, re-render with errors
|
||
objitems = source.source_objitems.select_related(
|
||
'parameter_obj',
|
||
'parameter_obj__id_satellite',
|
||
'parameter_obj__polarization',
|
||
'parameter_obj__modulation',
|
||
'parameter_obj__standard',
|
||
'geo_obj',
|
||
'created_by__user',
|
||
'updated_by__user'
|
||
).order_by('created_at')
|
||
|
||
context = {
|
||
'object': source,
|
||
'form': form,
|
||
'objitems': objitems,
|
||
'full_width_page': True,
|
||
}
|
||
|
||
return render(request, 'mainapp/source_form.html', context)
|
||
|
||
|
||
class SourceDeleteView(LoginRequiredMixin, AdminModeratorMixin, View):
|
||
"""View for deleting Source."""
|
||
|
||
def get(self, request, pk):
|
||
source = get_object_or_404(Source, pk=pk)
|
||
|
||
context = {
|
||
'object': source,
|
||
'objitems_count': source.source_objitems.count(),
|
||
}
|
||
|
||
return render(request, 'mainapp/source_confirm_delete.html', context)
|
||
|
||
def post(self, request, pk):
|
||
source = get_object_or_404(Source, pk=pk)
|
||
source_id = source.id
|
||
|
||
try:
|
||
source.delete()
|
||
messages.success(request, f'Источник #{source_id} успешно удален.')
|
||
except Exception as e:
|
||
messages.error(request, f'Ошибка при удалении источника: {str(e)}')
|
||
return redirect('mainapp:source_update', pk=pk)
|
||
|
||
# Redirect to source list
|
||
if request.GET.urlencode():
|
||
return redirect(f"{reverse('mainapp:home')}?{request.GET.urlencode()}")
|
||
return redirect('mainapp:home')
|
||
|
||
|
||
class DeleteSelectedSourcesView(LoginRequiredMixin, AdminModeratorMixin, View):
|
||
"""View for deleting multiple selected sources with confirmation."""
|
||
|
||
def get(self, request):
|
||
"""Show confirmation page with details about sources to be deleted."""
|
||
ids = request.GET.get("ids", "")
|
||
if not ids:
|
||
messages.error(request, "Не выбраны источники для удаления")
|
||
return redirect('mainapp:home')
|
||
|
||
try:
|
||
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
|
||
sources = Source.objects.filter(id__in=id_list).prefetch_related(
|
||
'source_objitems',
|
||
'source_objitems__parameter_obj',
|
||
'source_objitems__parameter_obj__id_satellite',
|
||
'source_objitems__geo_obj'
|
||
).annotate(
|
||
objitem_count=Count('source_objitems')
|
||
)
|
||
|
||
# Prepare detailed information about sources
|
||
sources_info = []
|
||
total_objitems = 0
|
||
|
||
for source in sources:
|
||
# Get satellites for this source
|
||
satellite_names = set()
|
||
for objitem in source.source_objitems.all():
|
||
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
|
||
if hasattr(objitem.parameter_obj, 'id_satellite') and objitem.parameter_obj.id_satellite:
|
||
satellite_names.add(objitem.parameter_obj.id_satellite.name)
|
||
|
||
objitem_count = source.objitem_count
|
||
total_objitems += objitem_count
|
||
|
||
sources_info.append({
|
||
'id': source.id,
|
||
'objitem_count': objitem_count,
|
||
'satellites': ", ".join(sorted(satellite_names)) if satellite_names else "-",
|
||
})
|
||
|
||
context = {
|
||
'sources_info': sources_info,
|
||
'total_sources': len(sources_info),
|
||
'total_objitems': total_objitems,
|
||
'ids': ids,
|
||
}
|
||
|
||
return render(request, 'mainapp/source_bulk_delete_confirm.html', context)
|
||
|
||
except Exception as e:
|
||
messages.error(request, f'Ошибка при подготовке удаления: {str(e)}')
|
||
return redirect('mainapp:home')
|
||
|
||
def post(self, request):
|
||
"""Actually delete the selected sources."""
|
||
ids = request.POST.get("ids", "")
|
||
if not ids:
|
||
return JsonResponse({"error": "Нет ID для удаления"}, status=400)
|
||
|
||
try:
|
||
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
|
||
|
||
# Get count before deletion
|
||
sources = Source.objects.filter(id__in=id_list)
|
||
deleted_sources_count = sources.count()
|
||
|
||
# Delete sources (cascade will delete related objitems)
|
||
sources.delete()
|
||
|
||
messages.success(
|
||
request,
|
||
f'Успешно удалено источников: {deleted_sources_count}'
|
||
)
|
||
|
||
return JsonResponse({
|
||
"success": True,
|
||
"message": f"Успешно удалено источников: {deleted_sources_count}",
|
||
"deleted_count": deleted_sources_count,
|
||
})
|
||
|
||
except Exception as e:
|
||
return JsonResponse({"error": f"Ошибка при удалении: {str(e)}"}, status=500)
|