Files
dbstorage/dbapp/mainapp/views/base.py

504 lines
20 KiB
Python

"""
Base views and utilities.
"""
from datetime import datetime, timedelta
from django.contrib.auth import logout
from django.contrib.auth.mixins import LoginRequiredMixin
from django.core.paginator import Paginator
from django.db.models import Count, Q
from django.shortcuts import redirect, render
from django.views import View
from ..models import Source, ObjItem, Satellite, Modulation, Polarization, ObjectMark
from ..utils import parse_pagination_params
class HomeView(LoginRequiredMixin, View):
"""
Main page with filters for displaying sources or objitems.
"""
def get(self, request):
# Get pagination parameters
page_number, items_per_page = parse_pagination_params(request)
# Get display mode: 'sources' or 'objitems'
display_mode = request.GET.get("display_mode", "sources")
# Get filter parameters
selected_satellites = request.GET.getlist("satellite_id")
date_from = request.GET.get("date_from", "").strip()
date_to = request.GET.get("date_to", "").strip()
freq_min = request.GET.get("freq_min", "").strip()
freq_max = request.GET.get("freq_max", "").strip()
range_min = request.GET.get("range_min", "").strip()
range_max = request.GET.get("range_max", "").strip()
selected_modulations = request.GET.getlist("modulation")
selected_polarizations = request.GET.getlist("polarization")
# Source-specific filters
has_coords_average = request.GET.get("has_coords_average")
has_kupsat = request.GET.get("has_kupsat")
has_valid = request.GET.get("has_valid")
has_reference = request.GET.get("has_reference")
# ObjItem-specific filters
has_geo = request.GET.get("has_geo")
has_lyngsat = request.GET.get("has_lyngsat")
# Marks filters
show_marks = request.GET.get("show_marks", "0")
marks_date_from = request.GET.get("marks_date_from", "").strip()
marks_date_to = request.GET.get("marks_date_to", "").strip()
marks_status = request.GET.get("marks_status", "") # all, present, absent
# Get all satellites, modulations, polarizations for filters
satellites = Satellite.objects.all().order_by("name")
modulations = Modulation.objects.all().order_by("name")
polarizations = Polarization.objects.all().order_by("name")
# Prepare context
context = {
'display_mode': display_mode,
'satellites': satellites,
'modulations': modulations,
'polarizations': polarizations,
'selected_satellites': [int(x) for x in selected_satellites if x.isdigit()],
'selected_modulations': [int(x) for x in selected_modulations if x.isdigit()],
'selected_polarizations': [int(x) for x in selected_polarizations if x.isdigit()],
'date_from': date_from,
'date_to': date_to,
'freq_min': freq_min,
'freq_max': freq_max,
'range_min': range_min,
'range_max': range_max,
'has_coords_average': has_coords_average,
'has_kupsat': has_kupsat,
'has_valid': has_valid,
'has_reference': has_reference,
'has_geo': has_geo,
'has_lyngsat': has_lyngsat,
'show_marks': show_marks,
'marks_date_from': marks_date_from,
'marks_date_to': marks_date_to,
'marks_status': marks_status,
'items_per_page': items_per_page,
'available_items_per_page': [50, 100, 500, 1000],
'full_width_page': True,
}
if display_mode == "objitems":
# Display ObjItems
queryset = self._get_objitems_queryset(
selected_satellites, date_from, date_to,
freq_min, freq_max, range_min, range_max,
selected_modulations, selected_polarizations,
has_geo, has_lyngsat
)
paginator = Paginator(queryset, items_per_page)
page_obj = paginator.get_page(page_number)
processed_objitems = self._process_objitems(
page_obj, show_marks, marks_date_from, marks_date_to, marks_status
)
context.update({
'page_obj': page_obj,
'processed_objitems': processed_objitems,
})
else:
# Display Sources
queryset = self._get_sources_queryset(
selected_satellites, date_from, date_to,
freq_min, freq_max, range_min, range_max,
selected_modulations, selected_polarizations,
has_coords_average, has_kupsat, has_valid, has_reference
)
paginator = Paginator(queryset, items_per_page)
page_obj = paginator.get_page(page_number)
processed_sources = self._process_sources(
page_obj, show_marks, marks_date_from, marks_date_to, marks_status
)
context.update({
'page_obj': page_obj,
'processed_sources': processed_sources,
})
return render(request, "mainapp/home.html", context)
def _get_sources_queryset(self, selected_satellites, date_from, date_to,
freq_min, freq_max, range_min, range_max,
selected_modulations, selected_polarizations,
has_coords_average, has_kupsat, has_valid, has_reference):
"""Build queryset for sources with filters."""
sources = Source.objects.prefetch_related(
'source_objitems',
'source_objitems__parameter_obj',
'source_objitems__parameter_obj__id_satellite',
'source_objitems__geo_obj'
).annotate(objitem_count=Count('source_objitems'))
# Filter by satellites
if selected_satellites:
sources = sources.filter(
source_objitems__parameter_obj__id_satellite_id__in=selected_satellites
).distinct()
# Filter by date range (using Geo timestamps)
if date_from or date_to:
geo_filter = Q()
if date_from:
try:
date_from_obj = datetime.strptime(date_from, "%Y-%m-%d")
geo_filter &= Q(source_objitems__geo_obj__timestamp__gte=date_from_obj)
except (ValueError, TypeError):
pass
if date_to:
try:
date_to_obj = datetime.strptime(date_to, "%Y-%m-%d") + timedelta(days=1)
geo_filter &= Q(source_objitems__geo_obj__timestamp__lt=date_to_obj)
except (ValueError, TypeError):
pass
if geo_filter:
sources = sources.filter(geo_filter).distinct()
# Filter by frequency
if freq_min:
try:
sources = sources.filter(
source_objitems__parameter_obj__frequency__gte=float(freq_min)
).distinct()
except ValueError:
pass
if freq_max:
try:
sources = sources.filter(
source_objitems__parameter_obj__frequency__lte=float(freq_max)
).distinct()
except ValueError:
pass
# Filter by frequency range
if range_min:
try:
sources = sources.filter(
source_objitems__parameter_obj__freq_range__gte=float(range_min)
).distinct()
except ValueError:
pass
if range_max:
try:
sources = sources.filter(
source_objitems__parameter_obj__freq_range__lte=float(range_max)
).distinct()
except ValueError:
pass
# Filter by modulation
if selected_modulations:
sources = sources.filter(
source_objitems__parameter_obj__modulation_id__in=selected_modulations
).distinct()
# Filter by polarization
if selected_polarizations:
sources = sources.filter(
source_objitems__parameter_obj__polarization_id__in=selected_polarizations
).distinct()
# Filter by coordinates presence
if has_coords_average == "1":
sources = sources.filter(coords_average__isnull=False)
elif has_coords_average == "0":
sources = sources.filter(coords_average__isnull=True)
if has_kupsat == "1":
sources = sources.filter(coords_kupsat__isnull=False)
elif has_kupsat == "0":
sources = sources.filter(coords_kupsat__isnull=True)
if has_valid == "1":
sources = sources.filter(coords_valid__isnull=False)
elif has_valid == "0":
sources = sources.filter(coords_valid__isnull=True)
if has_reference == "1":
sources = sources.filter(coords_reference__isnull=False)
elif has_reference == "0":
sources = sources.filter(coords_reference__isnull=True)
return sources.order_by('-id')
def _get_objitems_queryset(self, selected_satellites, date_from, date_to,
freq_min, freq_max, range_min, range_max,
selected_modulations, selected_polarizations,
has_geo, has_lyngsat):
"""Build queryset for objitems with filters."""
objitems = ObjItem.objects.select_related(
'parameter_obj',
'parameter_obj__id_satellite',
'parameter_obj__modulation',
'parameter_obj__polarization',
'geo_obj',
'source',
'lyngsat_source'
)
# Filter by satellites
if selected_satellites:
objitems = objitems.filter(parameter_obj__id_satellite_id__in=selected_satellites)
# Filter by date range
if date_from:
try:
date_from_obj = datetime.strptime(date_from, "%Y-%m-%d")
objitems = objitems.filter(geo_obj__timestamp__gte=date_from_obj)
except (ValueError, TypeError):
pass
if date_to:
try:
date_to_obj = datetime.strptime(date_to, "%Y-%m-%d") + timedelta(days=1)
objitems = objitems.filter(geo_obj__timestamp__lt=date_to_obj)
except (ValueError, TypeError):
pass
# Filter by frequency
if freq_min:
try:
objitems = objitems.filter(parameter_obj__frequency__gte=float(freq_min))
except ValueError:
pass
if freq_max:
try:
objitems = objitems.filter(parameter_obj__frequency__lte=float(freq_max))
except ValueError:
pass
# Filter by frequency range
if range_min:
try:
objitems = objitems.filter(parameter_obj__freq_range__gte=float(range_min))
except ValueError:
pass
if range_max:
try:
objitems = objitems.filter(parameter_obj__freq_range__lte=float(range_max))
except ValueError:
pass
# Filter by modulation
if selected_modulations:
objitems = objitems.filter(parameter_obj__modulation_id__in=selected_modulations)
# Filter by polarization
if selected_polarizations:
objitems = objitems.filter(parameter_obj__polarization_id__in=selected_polarizations)
# Filter by coordinates presence
if has_geo == "1":
objitems = objitems.filter(geo_obj__isnull=False)
elif has_geo == "0":
objitems = objitems.filter(geo_obj__isnull=True)
# Filter by LyngSat connection
if has_lyngsat == "1":
objitems = objitems.filter(lyngsat_source__isnull=False)
elif has_lyngsat == "0":
objitems = objitems.filter(lyngsat_source__isnull=True)
return objitems.order_by('-id')
def _process_sources(self, page_obj, show_marks="0", marks_date_from="", marks_date_to="", marks_status=""):
"""Process sources for display."""
processed = []
for source in page_obj:
# Get satellites
satellite_names = set()
for objitem in source.source_objitems.all():
if objitem.parameter_obj and objitem.parameter_obj.id_satellite:
satellite_names.add(objitem.parameter_obj.id_satellite.name)
# Format coordinates
def format_coords(point):
if point:
lon, lat = point.coords[0], point.coords[1]
lon_str = f"{lon}E" if lon > 0 else f"{abs(lon)}W"
lat_str = f"{lat}N" if lat > 0 else f"{abs(lat)}S"
return f"{lat_str} {lon_str}"
return "-"
# Get marks if requested
marks_data = []
if show_marks == "1":
marks_qs = source.marks.select_related('created_by__user').all()
# Filter marks by date
if marks_date_from:
try:
date_from_obj = datetime.strptime(marks_date_from, "%Y-%m-%dT%H:%M")
marks_qs = marks_qs.filter(timestamp__gte=date_from_obj)
except (ValueError, TypeError):
try:
date_from_obj = datetime.strptime(marks_date_from, "%Y-%m-%d")
marks_qs = marks_qs.filter(timestamp__gte=date_from_obj)
except (ValueError, TypeError):
pass
if marks_date_to:
try:
date_to_obj = datetime.strptime(marks_date_to, "%Y-%m-%dT%H:%M")
marks_qs = marks_qs.filter(timestamp__lte=date_to_obj)
except (ValueError, TypeError):
try:
date_to_obj = datetime.strptime(marks_date_to, "%Y-%m-%d") + timedelta(days=1)
marks_qs = marks_qs.filter(timestamp__lt=date_to_obj)
except (ValueError, TypeError):
pass
# Filter marks by status
if marks_status == "present":
marks_qs = marks_qs.filter(mark=True)
elif marks_status == "absent":
marks_qs = marks_qs.filter(mark=False)
# Process marks
for mark in marks_qs:
marks_data.append({
'id': mark.id,
'mark': mark.mark,
'timestamp': mark.timestamp,
'created_by': str(mark.created_by) if mark.created_by else "-",
'can_edit': mark.can_edit(),
})
processed.append({
'id': source.id,
'satellites': ", ".join(sorted(satellite_names)) if satellite_names else "-",
'objitem_count': source.objitem_count,
'coords_average': format_coords(source.coords_average),
'coords_kupsat': format_coords(source.coords_kupsat),
'coords_valid': format_coords(source.coords_valid),
'coords_reference': format_coords(source.coords_reference),
'created_at': source.created_at,
'marks': marks_data,
})
return processed
def _process_objitems(self, page_obj, show_marks="0", marks_date_from="", marks_date_to="", marks_status=""):
"""Process objitems for display."""
processed = []
for objitem in page_obj:
param = objitem.parameter_obj
geo = objitem.geo_obj
source = objitem.source
# Format geo coordinates
geo_coords = "-"
geo_date = "-"
if geo and geo.coords:
lon, lat = geo.coords.coords[0], geo.coords.coords[1]
lon_str = f"{lon}E" if lon > 0 else f"{abs(lon)}W"
lat_str = f"{lat}N" if lat > 0 else f"{abs(lat)}S"
geo_coords = f"{lat_str} {lon_str}"
if geo.timestamp:
geo_date = geo.timestamp.strftime("%Y-%m-%d")
# Format source coordinates
def format_coords(point):
if point:
lon, lat = point.coords[0], point.coords[1]
lon_str = f"{lon}E" if lon > 0 else f"{abs(lon)}W"
lat_str = f"{lat}N" if lat > 0 else f"{abs(lat)}S"
return f"{lat_str} {lon_str}"
return "-"
kupsat_coords = format_coords(source.coords_kupsat) if source else "-"
valid_coords = format_coords(source.coords_valid) if source else "-"
# Get marks if requested
marks_data = []
if show_marks == "1":
marks_qs = objitem.marks.select_related('created_by__user').all()
# Filter marks by date
if marks_date_from:
try:
date_from_obj = datetime.strptime(marks_date_from, "%Y-%m-%d")
marks_qs = marks_qs.filter(timestamp__gte=date_from_obj)
except (ValueError, TypeError):
pass
if marks_date_to:
try:
date_to_obj = datetime.strptime(marks_date_to, "%Y-%m-%d") + timedelta(days=1)
marks_qs = marks_qs.filter(timestamp__lt=date_to_obj)
except (ValueError, TypeError):
pass
# Filter marks by status
if marks_status == "present":
marks_qs = marks_qs.filter(mark=True)
elif marks_status == "absent":
marks_qs = marks_qs.filter(mark=False)
# Process marks
for mark in marks_qs:
marks_data.append({
'id': mark.id,
'mark': mark.mark,
'timestamp': mark.timestamp,
'created_by': str(mark.created_by) if mark.created_by else "-",
'can_edit': mark.can_edit(),
})
processed.append({
'id': objitem.id,
'name': objitem.name or "-",
'satellite': param.id_satellite.name if param and param.id_satellite else "-",
'frequency': param.frequency if param else "-",
'freq_range': param.freq_range if param else "-",
'polarization': param.polarization.name if param and param.polarization else "-",
'modulation': param.modulation.name if param and param.modulation else "-",
'bod_velocity': param.bod_velocity if param else "-",
'snr': param.snr if param else "-",
'geo_coords': geo_coords,
'geo_date': geo_date,
'kupsat_coords': kupsat_coords,
'valid_coords': valid_coords,
'source_id': source.id if source else None,
'lyngsat_id': objitem.lyngsat_source.id if objitem.lyngsat_source else None,
'marks': marks_data,
})
return processed
class ActionsPageView(View):
"""View for displaying the actions page."""
def get(self, request):
if request.user.is_authenticated:
return render(request, "mainapp/actions.html")
else:
return render(request, "mainapp/login_required.html")
def custom_logout(request):
"""Custom logout view."""
logout(request)
return redirect("mainapp:home")