Files
dbstorage/dbapp/mainapp/views/source.py

458 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Source related views.
"""
from datetime import datetime
from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
from django.core.paginator import Paginator
from django.db.models import Count, Q
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.views import View
from ..forms import SourceForm
from ..models import Source, Satellite
from ..utils import parse_pagination_params
class SourceListView(LoginRequiredMixin, View):
"""
View for displaying a list of sources (Source).
"""
def get(self, request):
# Get pagination parameters
page_number, items_per_page = parse_pagination_params(request)
# Get sorting parameters (default to ID ascending)
sort_param = request.GET.get("sort", "id")
# Get filter parameters
search_query = request.GET.get("search", "").strip()
has_coords_average = request.GET.get("has_coords_average")
has_coords_kupsat = request.GET.get("has_coords_kupsat")
has_coords_valid = request.GET.get("has_coords_valid")
has_coords_reference = request.GET.get("has_coords_reference")
has_lyngsat = request.GET.get("has_lyngsat")
objitem_count_min = request.GET.get("objitem_count_min", "").strip()
objitem_count_max = request.GET.get("objitem_count_max", "").strip()
date_from = request.GET.get("date_from", "").strip()
date_to = request.GET.get("date_to", "").strip()
selected_satellites = request.GET.getlist("satellite_id")
# Get all satellites for filter
satellites = (
Satellite.objects.filter(parameters__objitem__source__isnull=False)
.distinct()
.only("id", "name")
.order_by("name")
)
# Get all Source objects with query optimization
# Using annotate to count ObjItems efficiently (single query with GROUP BY)
# Using prefetch_related for reverse ForeignKey relationships to avoid N+1 queries
sources = Source.objects.prefetch_related(
'source_objitems',
'source_objitems__parameter_obj',
'source_objitems__parameter_obj__id_satellite',
'source_objitems__geo_obj',
'marks',
'marks__created_by__user'
).annotate(
objitem_count=Count('source_objitems')
)
# Apply filters
# Filter by coords_average presence
if has_coords_average == "1":
sources = sources.filter(coords_average__isnull=False)
elif has_coords_average == "0":
sources = sources.filter(coords_average__isnull=True)
# Filter by coords_kupsat presence
if has_coords_kupsat == "1":
sources = sources.filter(coords_kupsat__isnull=False)
elif has_coords_kupsat == "0":
sources = sources.filter(coords_kupsat__isnull=True)
# Filter by coords_valid presence
if has_coords_valid == "1":
sources = sources.filter(coords_valid__isnull=False)
elif has_coords_valid == "0":
sources = sources.filter(coords_valid__isnull=True)
# Filter by coords_reference presence
if has_coords_reference == "1":
sources = sources.filter(coords_reference__isnull=False)
elif has_coords_reference == "0":
sources = sources.filter(coords_reference__isnull=True)
# Filter by LyngSat presence
if has_lyngsat == "1":
sources = sources.filter(source_objitems__lyngsat_source__isnull=False).distinct()
elif has_lyngsat == "0":
sources = sources.filter(
~Q(source_objitems__lyngsat_source__isnull=False)
).distinct()
# Filter by ObjItem count
if objitem_count_min:
try:
min_count = int(objitem_count_min)
sources = sources.filter(objitem_count__gte=min_count)
except ValueError:
pass
if objitem_count_max:
try:
max_count = int(objitem_count_max)
sources = sources.filter(objitem_count__lte=max_count)
except ValueError:
pass
# Filter by creation date range
if date_from:
try:
date_from_obj = datetime.strptime(date_from, "%Y-%m-%d")
sources = sources.filter(created_at__gte=date_from_obj)
except (ValueError, TypeError):
pass
if date_to:
try:
from datetime import timedelta
date_to_obj = datetime.strptime(date_to, "%Y-%m-%d")
# Add one day to include entire end date
date_to_obj = date_to_obj + timedelta(days=1)
sources = sources.filter(created_at__lt=date_to_obj)
except (ValueError, TypeError):
pass
# Search by ID
if search_query:
try:
search_id = int(search_query)
sources = sources.filter(id=search_id)
except ValueError:
# If not a number, ignore
pass
# Filter by satellites
if selected_satellites:
sources = sources.filter(
source_objitems__parameter_obj__id_satellite_id__in=selected_satellites
).distinct()
# Apply sorting
valid_sort_fields = {
"id": "id",
"-id": "-id",
"created_at": "created_at",
"-created_at": "-created_at",
"updated_at": "updated_at",
"-updated_at": "-updated_at",
"objitem_count": "objitem_count",
"-objitem_count": "-objitem_count",
}
if sort_param in valid_sort_fields:
sources = sources.order_by(valid_sort_fields[sort_param])
# Create paginator
paginator = Paginator(sources, items_per_page)
page_obj = paginator.get_page(page_number)
# Prepare data for display
processed_sources = []
has_any_lyngsat = False # Track if any source has LyngSat data
for source in page_obj:
# Format coordinates
def format_coords(point):
if point:
longitude = point.coords[0]
latitude = point.coords[1]
lon = f"{longitude}E" if longitude > 0 else f"{abs(longitude)}W"
lat = f"{latitude}N" if latitude > 0 else f"{abs(latitude)}S"
return f"{lat} {lon}"
return "-"
coords_average_str = format_coords(source.coords_average)
coords_kupsat_str = format_coords(source.coords_kupsat)
coords_valid_str = format_coords(source.coords_valid)
coords_reference_str = format_coords(source.coords_reference)
# Get count of related ObjItems
objitem_count = source.objitem_count
# Get satellites for this source and check for LyngSat
satellite_names = set()
has_lyngsat = False
lyngsat_id = None
for objitem in source.source_objitems.all():
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
if hasattr(objitem.parameter_obj, 'id_satellite') and objitem.parameter_obj.id_satellite:
satellite_names.add(objitem.parameter_obj.id_satellite.name)
# Check if any objitem has LyngSat
if hasattr(objitem, 'lyngsat_source') and objitem.lyngsat_source:
has_lyngsat = True
lyngsat_id = objitem.lyngsat_source.id
has_any_lyngsat = True
satellite_str = ", ".join(sorted(satellite_names)) if satellite_names else "-"
# Get all marks (presence/absence)
marks_data = []
for mark in source.marks.all():
marks_data.append({
'mark': mark.mark,
'timestamp': mark.timestamp,
'created_by': str(mark.created_by) if mark.created_by else '-',
})
processed_sources.append({
'id': source.id,
'coords_average': coords_average_str,
'coords_kupsat': coords_kupsat_str,
'coords_valid': coords_valid_str,
'coords_reference': coords_reference_str,
'objitem_count': objitem_count,
'satellite': satellite_str,
'created_at': source.created_at,
'updated_at': source.updated_at,
'has_lyngsat': has_lyngsat,
'lyngsat_id': lyngsat_id,
'marks': marks_data,
})
# Prepare context for template
context = {
'page_obj': page_obj,
'processed_sources': processed_sources,
'items_per_page': items_per_page,
'available_items_per_page': [50, 100, 500, 1000],
'sort': sort_param,
'search_query': search_query,
'has_coords_average': has_coords_average,
'has_coords_kupsat': has_coords_kupsat,
'has_coords_valid': has_coords_valid,
'has_coords_reference': has_coords_reference,
'has_lyngsat': has_lyngsat,
'has_any_lyngsat': has_any_lyngsat,
'objitem_count_min': objitem_count_min,
'objitem_count_max': objitem_count_max,
'date_from': date_from,
'date_to': date_to,
'satellites': satellites,
'selected_satellites': [
int(x) if isinstance(x, str) else x for x in selected_satellites if (isinstance(x, int) or (isinstance(x, str) and x.isdigit()))
],
'full_width_page': True,
}
return render(request, "mainapp/source_list.html", context)
class AdminModeratorMixin(UserPassesTestMixin):
"""Mixin to restrict access to admin and moderator roles only."""
def test_func(self):
return (
self.request.user.is_authenticated and
hasattr(self.request.user, 'customuser') and
self.request.user.customuser.role in ['admin', 'moderator']
)
def handle_no_permission(self):
messages.error(self.request, 'У вас нет прав для выполнения этого действия.')
return redirect('mainapp:home')
class SourceUpdateView(LoginRequiredMixin, AdminModeratorMixin, View):
"""View for editing Source with 4 coordinate fields and related ObjItems."""
def get(self, request, pk):
source = get_object_or_404(Source, pk=pk)
form = SourceForm(instance=source)
# Get related ObjItems ordered by creation date
objitems = source.source_objitems.select_related(
'parameter_obj',
'parameter_obj__id_satellite',
'parameter_obj__polarization',
'parameter_obj__modulation',
'parameter_obj__standard',
'geo_obj',
'created_by__user',
'updated_by__user'
).order_by('created_at')
context = {
'object': source,
'form': form,
'objitems': objitems,
'full_width_page': True,
}
return render(request, 'mainapp/source_form.html', context)
def post(self, request, pk):
source = get_object_or_404(Source, pk=pk)
form = SourceForm(request.POST, instance=source)
if form.is_valid():
source = form.save(commit=False)
# Set updated_by to current user
if hasattr(request.user, 'customuser'):
source.updated_by = request.user.customuser
source.save()
messages.success(request, f'Источник #{source.id} успешно обновлен.')
# Redirect back with query params if present
if request.GET.urlencode():
return redirect(f"{reverse('mainapp:source_update', args=[source.id])}?{request.GET.urlencode()}")
return redirect('mainapp:source_update', pk=source.id)
# If form is invalid, re-render with errors
objitems = source.source_objitems.select_related(
'parameter_obj',
'parameter_obj__id_satellite',
'parameter_obj__polarization',
'parameter_obj__modulation',
'parameter_obj__standard',
'geo_obj',
'created_by__user',
'updated_by__user'
).order_by('created_at')
context = {
'object': source,
'form': form,
'objitems': objitems,
'full_width_page': True,
}
return render(request, 'mainapp/source_form.html', context)
class SourceDeleteView(LoginRequiredMixin, AdminModeratorMixin, View):
"""View for deleting Source."""
def get(self, request, pk):
source = get_object_or_404(Source, pk=pk)
context = {
'object': source,
'objitems_count': source.source_objitems.count(),
}
return render(request, 'mainapp/source_confirm_delete.html', context)
def post(self, request, pk):
source = get_object_or_404(Source, pk=pk)
source_id = source.id
try:
source.delete()
messages.success(request, f'Источник #{source_id} успешно удален.')
except Exception as e:
messages.error(request, f'Ошибка при удалении источника: {str(e)}')
return redirect('mainapp:source_update', pk=pk)
# Redirect to source list
if request.GET.urlencode():
return redirect(f"{reverse('mainapp:home')}?{request.GET.urlencode()}")
return redirect('mainapp:home')
class DeleteSelectedSourcesView(LoginRequiredMixin, AdminModeratorMixin, View):
"""View for deleting multiple selected sources with confirmation."""
def get(self, request):
"""Show confirmation page with details about sources to be deleted."""
ids = request.GET.get("ids", "")
if not ids:
messages.error(request, "Не выбраны источники для удаления")
return redirect('mainapp:home')
try:
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
sources = Source.objects.filter(id__in=id_list).prefetch_related(
'source_objitems',
'source_objitems__parameter_obj',
'source_objitems__parameter_obj__id_satellite',
'source_objitems__geo_obj'
).annotate(
objitem_count=Count('source_objitems')
)
# Prepare detailed information about sources
sources_info = []
total_objitems = 0
for source in sources:
# Get satellites for this source
satellite_names = set()
for objitem in source.source_objitems.all():
if hasattr(objitem, 'parameter_obj') and objitem.parameter_obj:
if hasattr(objitem.parameter_obj, 'id_satellite') and objitem.parameter_obj.id_satellite:
satellite_names.add(objitem.parameter_obj.id_satellite.name)
objitem_count = source.objitem_count
total_objitems += objitem_count
sources_info.append({
'id': source.id,
'objitem_count': objitem_count,
'satellites': ", ".join(sorted(satellite_names)) if satellite_names else "-",
})
context = {
'sources_info': sources_info,
'total_sources': len(sources_info),
'total_objitems': total_objitems,
'ids': ids,
}
return render(request, 'mainapp/source_bulk_delete_confirm.html', context)
except Exception as e:
messages.error(request, f'Ошибка при подготовке удаления: {str(e)}')
return redirect('mainapp:home')
def post(self, request):
"""Actually delete the selected sources."""
ids = request.POST.get("ids", "")
if not ids:
return JsonResponse({"error": "Нет ID для удаления"}, status=400)
try:
id_list = [int(x) for x in ids.split(",") if x.isdigit()]
# Get count before deletion
sources = Source.objects.filter(id__in=id_list)
deleted_sources_count = sources.count()
# Delete sources (cascade will delete related objitems)
sources.delete()
messages.success(
request,
f'Успешно удалено источников: {deleted_sources_count}'
)
return JsonResponse({
"success": True,
"message": f"Успешно удалено источников: {deleted_sources_count}",
"deleted_count": deleted_sources_count,
})
except Exception as e:
return JsonResponse({"error": f"Ошибка при удалении: {str(e)}"}, status=500)