# Standard library imports from collections import defaultdict from io import BytesIO # Django imports from django.utils import timezone from django.contrib import messages from django.contrib.admin.views.decorators import staff_member_required from django.contrib.auth import logout from django.contrib.auth.mixins import LoginRequiredMixin from django.core.paginator import Paginator from django.db import models from django.db.models import F from django.http import HttpResponse, JsonResponse from django.shortcuts import redirect, render from django.urls import reverse_lazy from django.utils.decorators import method_decorator from django.views import View from django.views.generic import ( CreateView, DeleteView, FormView, UpdateView, ) # Third-party imports import pandas as pd # Local imports from .clusters import get_clusters from .forms import ( GeoForm, LoadCsvData, LoadExcelData, NewEventForm, ObjItemForm, ParameterForm, UploadFileForm, UploadVchLoad, VchLinkForm, FillLyngsatDataForm, LinkLyngsatForm, ) from .mixins import CoordinateProcessingMixin, FormMessageMixin, RoleRequiredMixin from .models import Geo, Modulation, ObjItem, Polarization, Satellite from .utils import ( add_satellite_list, compare_and_link_vch_load, fill_data_from_df, get_points_from_csv, get_vch_load_from_html, kub_report, parse_pagination_params, ) from mapsapp.utils import parse_transponders_from_xml class AddSatellitesView(LoginRequiredMixin, View): def get(self, request): add_satellite_list() return redirect("mainapp:home") # class AddTranspondersView(View): # def get(self, request): # try: # parse_transponders_from_json(BASE_DIR / "transponders.json") # except FileNotFoundError: # print("Файл не найден") # return redirect('home') class AddTranspondersView(LoginRequiredMixin, FormMessageMixin, FormView): template_name = "mainapp/transponders_upload.html" form_class = UploadFileForm success_message = "Файл успешно обработан" error_message = "Форма заполнена некорректно" def form_valid(self, form): uploaded_file = self.request.FILES["file"] try: content = uploaded_file.read() parse_transponders_from_xml(BytesIO(content)) except ValueError as e: messages.error(self.request, f"Ошибка при чтении таблиц: {e}") return redirect("mainapp:add_trans") except Exception as e: messages.error(self.request, f"Неизвестная ошибка: {e}") return redirect("mainapp:add_trans") return super().form_valid(form) def get_success_url(self): return reverse_lazy("mainapp:add_trans") from django.views.generic import View class ActionsPageView(View): def get(self, request): if request.user.is_authenticated: return render(request, "mainapp/actions.html") else: return render(request, "mainapp/login_required.html") class LoadExcelDataView(LoginRequiredMixin, FormMessageMixin, FormView): template_name = "mainapp/add_data_from_excel.html" form_class = LoadExcelData error_message = "Форма заполнена некорректно" def form_valid(self, form): uploaded_file = self.request.FILES["file"] selected_sat = form.cleaned_data["sat_choice"] number = form.cleaned_data["number_input"] try: import io df = pd.read_excel(io.BytesIO(uploaded_file.read())) if number > 0: df = df.head(number) result = fill_data_from_df(df, selected_sat, self.request.user.customuser) messages.success( self.request, f"Данные успешно загружены! Обработано строк: {result}" ) except Exception as e: messages.error(self.request, f"Ошибка при обработке файла: {str(e)}") return redirect("mainapp:load_excel_data") def get_success_url(self): return reverse_lazy("mainapp:load_excel_data") class GetLocationsView(LoginRequiredMixin, View): def get(self, request, sat_id): locations = ( ObjItem.objects.filter(parameter_obj__id_satellite=sat_id) .select_related( "geo_obj", "parameter_obj", "parameter_obj__polarization", ) ) if not locations.exists(): return JsonResponse({"error": "Объектов не найдено"}, status=404) features = [] for loc in locations: if not hasattr(loc, "geo_obj") or not loc.geo_obj or not loc.geo_obj.coords: continue param = getattr(loc, 'parameter_obj', None) if not param: continue features.append( { "type": "Feature", "geometry": { "type": "Point", "coordinates": [loc.geo_obj.coords[0], loc.geo_obj.coords[1]], }, "properties": { "pol": param.polarization.name if param.polarization else "-", "freq": param.frequency * 1000000 if param.frequency else 0, "name": loc.name or "-", "id": loc.geo_obj.id, }, } ) return JsonResponse({"type": "FeatureCollection", "features": features}) class LoadCsvDataView(LoginRequiredMixin, FormMessageMixin, FormView): template_name = "mainapp/add_data_from_csv.html" form_class = LoadCsvData success_message = "Данные успешно загружены!" error_message = "Форма заполнена некорректно" def form_valid(self, form): uploaded_file = self.request.FILES["file"] try: content = uploaded_file.read() if isinstance(content, bytes): content = content.decode("utf-8") get_points_from_csv(content, self.request.user.customuser) except Exception as e: messages.error(self.request, f"Ошибка при обработке файла: {str(e)}") return redirect("mainapp:load_csv_data") return super().form_valid(form) def get_success_url(self): return reverse_lazy("mainapp:load_csv_data") @method_decorator(staff_member_required, name="dispatch") class ShowMapView(RoleRequiredMixin, View): required_roles = ["admin", "moderator"] def get(self, request): ids = request.GET.get("ids", "") points = [] if ids: id_list = [int(x) for x in ids.split(",") if x.isdigit()] locations = ObjItem.objects.filter(id__in=id_list).select_related( "parameter_obj", "parameter_obj__id_satellite", "parameter_obj__polarization", "parameter_obj__modulation", "parameter_obj__standard", "geo_obj", ) for obj in locations: if ( not hasattr(obj, "geo_obj") or not obj.geo_obj or not obj.geo_obj.coords ): continue param = getattr(obj, 'parameter_obj', None) if not param: continue points.append( { "name": f"{obj.name}", "freq": f"{param.frequency} [{param.freq_range}] МГц", "point": (obj.geo_obj.coords.x, obj.geo_obj.coords.y), } ) else: return redirect("admin") grouped = defaultdict(list) for p in points: grouped[p["name"]].append({"point": p["point"], "frequency": p["freq"]}) groups = [ {"name": name, "points": coords_list} for name, coords_list in grouped.items() ] context = { "groups": groups, } return render(request, "admin/map_custom.html", context) class ShowSelectedObjectsMapView(LoginRequiredMixin, View): def get(self, request): ids = request.GET.get("ids", "") points = [] if ids: id_list = [int(x) for x in ids.split(",") if x.isdigit()] locations = ObjItem.objects.filter(id__in=id_list).select_related( "parameter_obj", "parameter_obj__id_satellite", "parameter_obj__polarization", "parameter_obj__modulation", "parameter_obj__standard", "geo_obj", ) for obj in locations: if ( not hasattr(obj, "geo_obj") or not obj.geo_obj or not obj.geo_obj.coords ): continue param = getattr(obj, 'parameter_obj', None) if not param: continue points.append( { "name": f"{obj.name}", "freq": f"{param.frequency} [{param.freq_range}] МГц", "point": (obj.geo_obj.coords.x, obj.geo_obj.coords.y), } ) else: return redirect("mainapp:objitem_list") # Group points by object name from collections import defaultdict grouped = defaultdict(list) for p in points: grouped[p["name"]].append({"point": p["point"], "frequency": p["freq"]}) groups = [ {"name": name, "points": coords_list} for name, coords_list in grouped.items() ] context = { "groups": groups, } return render(request, "mainapp/objitem_map.html", context) class ClusterTestView(LoginRequiredMixin, View): def get(self, request): objs = ObjItem.objects.filter( name__icontains="! Astra 4A 12654,040 [1,962] МГц H" ) coords = [] for obj in objs: if hasattr(obj, "geo_obj") and obj.geo_obj and obj.geo_obj.coords: coords.append( (obj.geo_obj.coords.coords[1], obj.geo_obj.coords.coords[0]) ) get_clusters(coords) return JsonResponse({"success": "ок"}) def custom_logout(request): logout(request) return redirect("mainapp:home") class UploadVchLoadView(LoginRequiredMixin, FormMessageMixin, FormView): template_name = "mainapp/upload_html.html" form_class = UploadVchLoad success_message = "Файл успешно обработан" error_message = "Форма заполнена некорректно" def form_valid(self, form): selected_sat = form.cleaned_data["sat_choice"] uploaded_file = self.request.FILES["file"] try: get_vch_load_from_html(uploaded_file, selected_sat) except ValueError as e: messages.error(self.request, f"Ошибка при чтении таблиц: {e}") return redirect("mainapp:vch_load") except Exception as e: messages.error(self.request, f"Неизвестная ошибка: {e}") return redirect("mainapp:vch_load") return super().form_valid(form) def get_success_url(self): return reverse_lazy("mainapp:vch_load") class LinkVchSigmaView(LoginRequiredMixin, FormView): template_name = "mainapp/link_vch.html" form_class = VchLinkForm def form_valid(self, form): # value1 больше не используется - погрешность частоты определяется автоматически freq_range = form.cleaned_data["value2"] sat_id = form.cleaned_data["sat_choice"] # Передаём 0 для eps_freq и ku_range, так как они не используются count_all, link_count = compare_and_link_vch_load(sat_id, 0, freq_range, 0) messages.success( self.request, f"Привязано {link_count} из {count_all} объектов" ) return redirect("mainapp:link_vch_sigma") def form_invalid(self, form): return self.render_to_response(self.get_context_data(form=form)) class LinkLyngsatSourcesView(LoginRequiredMixin, FormMessageMixin, FormView): """Представление для привязки источников LyngSat к объектам""" template_name = "mainapp/link_lyngsat.html" form_class = LinkLyngsatForm success_message = "Привязка источников LyngSat завершена" error_message = "Ошибка при привязке источников" def form_valid(self, form): from lyngsatapp.models import LyngSat satellites = form.cleaned_data.get("satellites") frequency_tolerance = form.cleaned_data.get("frequency_tolerance", 0.5) # Если спутники не выбраны, обрабатываем все if satellites: objitems = ObjItem.objects.filter( parameter_obj__id_satellite__in=satellites ).select_related('parameter_obj', 'parameter_obj__polarization') else: objitems = ObjItem.objects.filter( parameter_obj__isnull=False ).select_related('parameter_obj', 'parameter_obj__polarization') linked_count = 0 total_count = objitems.count() for objitem in objitems: if not hasattr(objitem, 'parameter_obj') or not objitem.parameter_obj: continue param = objitem.parameter_obj # Округляем частоту объекта if param.frequency: rounded_freq = round(param.frequency, 0) # Округление до целого # Ищем подходящий источник LyngSat # Сравниваем по округленной частоте и поляризации lyngsat_sources = LyngSat.objects.filter( id_satellite=param.id_satellite, polarization=param.polarization, frequency__gte=rounded_freq - frequency_tolerance, frequency__lte=rounded_freq + frequency_tolerance ).order_by('frequency') if lyngsat_sources.exists(): # Берем первый подходящий источник objitem.lyngsat_source = lyngsat_sources.first() objitem.save(update_fields=['lyngsat_source']) linked_count += 1 messages.success( self.request, f"Привязано {linked_count} из {total_count} объектов к источникам LyngSat" ) return redirect("mainapp:link_lyngsat") def form_invalid(self, form): return self.render_to_response(self.get_context_data(form=form)) class LyngsatDataAPIView(LoginRequiredMixin, View): """API для получения данных LyngSat источника""" def get(self, request, lyngsat_id): from lyngsatapp.models import LyngSat try: lyngsat = LyngSat.objects.select_related( 'id_satellite', 'polarization', 'modulation', 'standard' ).get(id=lyngsat_id) # Форматируем дату с учетом локального времени last_update_str = '-' if lyngsat.last_update: local_time = timezone.localtime(lyngsat.last_update) last_update_str = local_time.strftime("%d.%m.%Y") data = { 'id': lyngsat.id, 'satellite': lyngsat.id_satellite.name if lyngsat.id_satellite else '-', 'frequency': f"{lyngsat.frequency:.3f}" if lyngsat.frequency else '-', 'polarization': lyngsat.polarization.name if lyngsat.polarization else '-', 'modulation': lyngsat.modulation.name if lyngsat.modulation else '-', 'standard': lyngsat.standard.name if lyngsat.standard else '-', 'sym_velocity': f"{lyngsat.sym_velocity:.0f}" if lyngsat.sym_velocity else '-', 'fec': lyngsat.fec or '-', 'channel_info': lyngsat.channel_info or '-', 'last_update': last_update_str, 'url': lyngsat.url or None, } return JsonResponse(data) except LyngSat.DoesNotExist: return JsonResponse({'error': 'Источник LyngSat не найден'}, status=404) except Exception as e: return JsonResponse({'error': str(e)}, status=500) class SigmaParameterDataAPIView(LoginRequiredMixin, View): """API для получения данных SigmaParameter""" def get(self, request, parameter_id): from .models import Parameter try: parameter = Parameter.objects.select_related( 'id_satellite', 'polarization', 'modulation', 'standard' ).prefetch_related( 'sigma_parameter__mark', 'sigma_parameter__id_satellite', 'sigma_parameter__polarization', 'sigma_parameter__modulation', 'sigma_parameter__standard' ).get(id=parameter_id) # Получаем все связанные SigmaParameter sigma_params = parameter.sigma_parameter.all() sigma_data = [] for sigma in sigma_params: # Получаем отметки marks = [] for mark in sigma.mark.all().order_by('-timestamp'): mark_str = '+' if mark.mark else '-' date_str = '-' if mark.timestamp: local_time = timezone.localtime(mark.timestamp) date_str = local_time.strftime("%d.%m.%Y %H:%M") marks.append({ 'mark': mark_str, 'date': date_str }) # Форматируем даты начала и окончания datetime_begin_str = '-' if sigma.datetime_begin: local_time = timezone.localtime(sigma.datetime_begin) datetime_begin_str = local_time.strftime("%d.%m.%Y %H:%M") datetime_end_str = '-' if sigma.datetime_end: local_time = timezone.localtime(sigma.datetime_end) datetime_end_str = local_time.strftime("%d.%m.%Y %H:%M") sigma_data.append({ 'id': sigma.id, 'satellite': sigma.id_satellite.name if sigma.id_satellite else '-', 'frequency': f"{sigma.frequency:.3f}" if sigma.frequency else '-', 'transfer_frequency': f"{sigma.transfer_frequency:.3f}" if sigma.transfer_frequency else '-', 'freq_range': f"{sigma.freq_range:.3f}" if sigma.freq_range else '-', 'polarization': sigma.polarization.name if sigma.polarization else '-', 'modulation': sigma.modulation.name if sigma.modulation else '-', 'standard': sigma.standard.name if sigma.standard else '-', 'bod_velocity': f"{sigma.bod_velocity:.0f}" if sigma.bod_velocity else '-', 'snr': f"{sigma.snr:.1f}" if sigma.snr is not None else '-', 'power': f"{sigma.power:.1f}" if sigma.power is not None else '-', 'status': sigma.status or '-', 'packets': 'Да' if sigma.packets else 'Нет' if sigma.packets is not None else '-', 'datetime_begin': datetime_begin_str, 'datetime_end': datetime_end_str, 'marks': marks }) return JsonResponse({ 'parameter_id': parameter.id, 'sigma_parameters': sigma_data }) except Parameter.DoesNotExist: return JsonResponse({'error': 'Parameter не найден'}, status=404) except Exception as e: return JsonResponse({'error': str(e)}, status=500) class SourceObjItemsAPIView(LoginRequiredMixin, View): """API для получения списка ObjItem, связанных с источником""" def get(self, request, source_id): from .models import Source try: # Загружаем Source с prefetch_related для ObjItem source = Source.objects.prefetch_related( 'source_objitems', 'source_objitems__parameter_obj', 'source_objitems__parameter_obj__id_satellite', 'source_objitems__parameter_obj__polarization', 'source_objitems__parameter_obj__modulation', 'source_objitems__geo_obj' ).get(id=source_id) # Получаем все связанные ObjItem, отсортированные по created_at objitems = source.source_objitems.all().order_by('created_at') objitems_data = [] for objitem in objitems: # Получаем данные параметра param = getattr(objitem, 'parameter_obj', None) satellite_name = '-' frequency = '-' freq_range = '-' polarization = '-' bod_velocity = '-' modulation = '-' snr = '-' if param: if hasattr(param, 'id_satellite') and param.id_satellite: satellite_name = param.id_satellite.name frequency = f"{param.frequency:.3f}" if param.frequency is not None else '-' freq_range = f"{param.freq_range:.3f}" if param.freq_range is not None else '-' if hasattr(param, 'polarization') and param.polarization: polarization = param.polarization.name bod_velocity = f"{param.bod_velocity:.0f}" if param.bod_velocity is not None else '-' if hasattr(param, 'modulation') and param.modulation: modulation = param.modulation.name snr = f"{param.snr:.0f}" if param.snr is not None else '-' # Получаем геоданные geo_timestamp = '-' geo_location = '-' geo_coords = '-' if hasattr(objitem, 'geo_obj') and objitem.geo_obj: if objitem.geo_obj.timestamp: local_time = timezone.localtime(objitem.geo_obj.timestamp) geo_timestamp = local_time.strftime("%d.%m.%Y %H:%M") geo_location = objitem.geo_obj.location or '-' if objitem.geo_obj.coords: longitude = objitem.geo_obj.coords.coords[0] latitude = objitem.geo_obj.coords.coords[1] lon = f"{longitude}E" if longitude > 0 else f"{abs(longitude)}W" lat = f"{latitude}N" if latitude > 0 else f"{abs(latitude)}S" geo_coords = f"{lat} {lon}" objitems_data.append({ 'id': objitem.id, 'name': objitem.name or '-', 'satellite_name': satellite_name, 'frequency': frequency, 'freq_range': freq_range, 'polarization': polarization, 'bod_velocity': bod_velocity, 'modulation': modulation, 'snr': snr, 'geo_timestamp': geo_timestamp, 'geo_location': geo_location, 'geo_coords': geo_coords }) return JsonResponse({ 'source_id': source_id, 'objitems': objitems_data }) except Source.DoesNotExist: return JsonResponse({'error': 'Источник не найден'}, status=404) except Exception as e: return JsonResponse({'error': str(e)}, status=500) class SourceListView(LoginRequiredMixin, View): """ Представление для отображения списка источников (Source). """ def get(self, request): from .models import Source from django.db.models import Count from datetime import datetime # Получаем параметры пагинации page_number, items_per_page = parse_pagination_params(request) # Получаем параметры сортировки sort_param = request.GET.get("sort", "-created_at") # Получаем параметры фильтров search_query = request.GET.get("search", "").strip() has_coords_average = request.GET.get("has_coords_average") has_coords_kupsat = request.GET.get("has_coords_kupsat") has_coords_valid = request.GET.get("has_coords_valid") has_coords_reference = request.GET.get("has_coords_reference") objitem_count_min = request.GET.get("objitem_count_min", "").strip() objitem_count_max = request.GET.get("objitem_count_max", "").strip() date_from = request.GET.get("date_from", "").strip() date_to = request.GET.get("date_to", "").strip() # Получаем все Source объекты с оптимизацией запросов sources = Source.objects.select_related( 'created_by__user', 'updated_by__user' ).prefetch_related( 'source_objitems', 'source_objitems__parameter_obj', 'source_objitems__geo_obj' ).annotate( objitem_count=Count('source_objitems') ) # Применяем фильтры # Фильтр по наличию coords_average if has_coords_average == "1": sources = sources.filter(coords_average__isnull=False) elif has_coords_average == "0": sources = sources.filter(coords_average__isnull=True) # Фильтр по наличию coords_kupsat if has_coords_kupsat == "1": sources = sources.filter(coords_kupsat__isnull=False) elif has_coords_kupsat == "0": sources = sources.filter(coords_kupsat__isnull=True) # Фильтр по наличию coords_valid if has_coords_valid == "1": sources = sources.filter(coords_valid__isnull=False) elif has_coords_valid == "0": sources = sources.filter(coords_valid__isnull=True) # Фильтр по наличию coords_reference if has_coords_reference == "1": sources = sources.filter(coords_reference__isnull=False) elif has_coords_reference == "0": sources = sources.filter(coords_reference__isnull=True) # Фильтр по количеству ObjItem if objitem_count_min: try: min_count = int(objitem_count_min) sources = sources.filter(objitem_count__gte=min_count) except ValueError: pass if objitem_count_max: try: max_count = int(objitem_count_max) sources = sources.filter(objitem_count__lte=max_count) except ValueError: pass # Фильтр по диапазону дат создания if date_from: try: date_from_obj = datetime.strptime(date_from, "%Y-%m-%d") sources = sources.filter(created_at__gte=date_from_obj) except (ValueError, TypeError): pass if date_to: try: from datetime import timedelta date_to_obj = datetime.strptime(date_to, "%Y-%m-%d") # Добавляем один день чтобы включить весь конечный день date_to_obj = date_to_obj + timedelta(days=1) sources = sources.filter(created_at__lt=date_to_obj) except (ValueError, TypeError): pass # Поиск по ID if search_query: try: search_id = int(search_query) sources = sources.filter(id=search_id) except ValueError: # Если не число, игнорируем pass # Применяем сортировку valid_sort_fields = { "id": "id", "-id": "-id", "created_at": "created_at", "-created_at": "-created_at", "updated_at": "updated_at", "-updated_at": "-updated_at", "objitem_count": "objitem_count", "-objitem_count": "-objitem_count", } if sort_param in valid_sort_fields: sources = sources.order_by(valid_sort_fields[sort_param]) # Создаем пагинатор paginator = Paginator(sources, items_per_page) page_obj = paginator.get_page(page_number) # Подготавливаем данные для отображения processed_sources = [] for source in page_obj: # Форматируем координаты def format_coords(point): if point: longitude = point.coords[0] latitude = point.coords[1] lon = f"{longitude}E" if longitude > 0 else f"{abs(longitude)}W" lat = f"{latitude}N" if latitude > 0 else f"{abs(latitude)}S" return f"{lat} {lon}" return "-" coords_average_str = format_coords(source.coords_average) coords_kupsat_str = format_coords(source.coords_kupsat) coords_valid_str = format_coords(source.coords_valid) coords_reference_str = format_coords(source.coords_reference) # Получаем количество связанных ObjItem objitem_count = source.objitem_count processed_sources.append({ 'id': source.id, 'coords_average': coords_average_str, 'coords_kupsat': coords_kupsat_str, 'coords_valid': coords_valid_str, 'coords_reference': coords_reference_str, 'objitem_count': objitem_count, 'created_at': source.created_at, 'updated_at': source.updated_at, 'created_by': source.created_by, 'updated_by': source.updated_by, }) # Подготавливаем контекст для шаблона context = { 'page_obj': page_obj, 'processed_sources': processed_sources, 'items_per_page': items_per_page, 'available_items_per_page': [50, 100, 500, 1000], 'sort': sort_param, 'search_query': search_query, 'has_coords_average': has_coords_average, 'has_coords_kupsat': has_coords_kupsat, 'has_coords_valid': has_coords_valid, 'has_coords_reference': has_coords_reference, 'objitem_count_min': objitem_count_min, 'objitem_count_max': objitem_count_max, 'date_from': date_from, 'date_to': date_to, 'full_width_page': True, } return render(request, "mainapp/source_list.html", context) class ProcessKubsatView(LoginRequiredMixin, FormMessageMixin, FormView): template_name = "mainapp/process_kubsat.html" form_class = NewEventForm error_message = "Форма заполнена некорректно" def form_valid(self, form): uploaded_file = self.request.FILES["file"] try: content = uploaded_file.read() df = kub_report(BytesIO(content)) output = BytesIO() with pd.ExcelWriter(output, engine="openpyxl") as writer: df.to_excel(writer, index=False, sheet_name="Результат") output.seek(0) response = HttpResponse( output.getvalue(), content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ) response["Content-Disposition"] = ( 'attachment; filename="kubsat_report.xlsx"' ) messages.success(self.request, "Событие успешно обработано!") return response except Exception as e: messages.error(self.request, f"Ошибка при обработке файла: {str(e)}") return redirect("mainapp:kubsat_excel") class DeleteSelectedObjectsView(RoleRequiredMixin, View): required_roles = ["admin", "moderator"] def post(self, request): ids = request.POST.get("ids", "") if not ids: return JsonResponse({"error": "Нет ID для удаления"}, status=400) try: id_list = [int(x) for x in ids.split(",") if x.isdigit()] deleted_count, _ = ObjItem.objects.filter(id__in=id_list).delete() return JsonResponse( { "success": True, "message": "Объект успешно удалён", "deleted_count": deleted_count, } ) except Exception as e: return JsonResponse({"error": f"Ошибка при удалении: {str(e)}"}, status=500) class ObjItemListView(LoginRequiredMixin, View): def get(self, request): satellites = ( Satellite.objects.filter(parameters__objitem__isnull=False) .distinct() .only("id", "name") .order_by("name") ) selected_sat_id = request.GET.get("satellite_id") page_number, items_per_page = parse_pagination_params(request) sort_param = request.GET.get("sort", "") freq_min = request.GET.get("freq_min") freq_max = request.GET.get("freq_max") range_min = request.GET.get("range_min") range_max = request.GET.get("range_max") snr_min = request.GET.get("snr_min") snr_max = request.GET.get("snr_max") bod_min = request.GET.get("bod_min") bod_max = request.GET.get("bod_max") search_query = request.GET.get("search") selected_modulations = request.GET.getlist("modulation") selected_polarizations = request.GET.getlist("polarization") selected_satellites = request.GET.getlist("satellite_id") has_kupsat = request.GET.get("has_kupsat") has_valid = request.GET.get("has_valid") date_from = request.GET.get("date_from") date_to = request.GET.get("date_to") objects = ObjItem.objects.none() if selected_satellites or selected_sat_id: if selected_sat_id and not selected_satellites: try: selected_sat_id_single = int(selected_sat_id) selected_satellites = [selected_sat_id_single] except ValueError: selected_satellites = [] if selected_satellites: objects = ( ObjItem.objects.select_related( "geo_obj", "source", "updated_by__user", "created_by__user", "lyngsat_source", "parameter_obj", "parameter_obj__id_satellite", "parameter_obj__polarization", "parameter_obj__modulation", "parameter_obj__standard", ) .prefetch_related( "parameter_obj__sigma_parameter", "parameter_obj__sigma_parameter__polarization", ) .filter(parameter_obj__id_satellite_id__in=selected_satellites) ) else: objects = ObjItem.objects.select_related( "geo_obj", "source", "updated_by__user", "created_by__user", "lyngsat_source", "parameter_obj", "parameter_obj__id_satellite", "parameter_obj__polarization", "parameter_obj__modulation", "parameter_obj__standard", ).prefetch_related( "parameter_obj__sigma_parameter", "parameter_obj__sigma_parameter__polarization", ) if freq_min is not None and freq_min.strip() != "": try: freq_min_val = float(freq_min) objects = objects.filter( parameter_obj__frequency__gte=freq_min_val ) except ValueError: pass if freq_max is not None and freq_max.strip() != "": try: freq_max_val = float(freq_max) objects = objects.filter( parameter_obj__frequency__lte=freq_max_val ) except ValueError: pass if range_min is not None and range_min.strip() != "": try: range_min_val = float(range_min) objects = objects.filter( parameter_obj__freq_range__gte=range_min_val ) except ValueError: pass if range_max is not None and range_max.strip() != "": try: range_max_val = float(range_max) objects = objects.filter( parameter_obj__freq_range__lte=range_max_val ) except ValueError: pass if snr_min is not None and snr_min.strip() != "": try: snr_min_val = float(snr_min) objects = objects.filter(parameter_obj__snr__gte=snr_min_val) except ValueError: pass if snr_max is not None and snr_max.strip() != "": try: snr_max_val = float(snr_max) objects = objects.filter(parameter_obj__snr__lte=snr_max_val) except ValueError: pass if bod_min is not None and bod_min.strip() != "": try: bod_min_val = float(bod_min) objects = objects.filter( parameter_obj__bod_velocity__gte=bod_min_val ) except ValueError: pass if bod_max is not None and bod_max.strip() != "": try: bod_max_val = float(bod_max) objects = objects.filter( parameter_obj__bod_velocity__lte=bod_max_val ) except ValueError: pass if selected_modulations: objects = objects.filter( parameter_obj__modulation__id__in=selected_modulations ) if selected_polarizations: objects = objects.filter( parameter_obj__polarization__id__in=selected_polarizations ) if has_kupsat == "1": objects = objects.filter(source__coords_kupsat__isnull=False) elif has_kupsat == "0": objects = objects.filter(source__coords_kupsat__isnull=True) if has_valid == "1": objects = objects.filter(source__coords_valid__isnull=False) elif has_valid == "0": objects = objects.filter(source__coords_valid__isnull=True) # Date filter for geo_obj timestamp date_from = request.GET.get("date_from") date_to = request.GET.get("date_to") if date_from and date_from.strip(): try: from datetime import datetime date_from_obj = datetime.strptime(date_from, "%Y-%m-%d") objects = objects.filter(geo_obj__timestamp__gte=date_from_obj) except (ValueError, TypeError): pass if date_to and date_to.strip(): try: from datetime import datetime, timedelta date_to_obj = datetime.strptime(date_to, "%Y-%m-%d") # Add one day to include the entire end date date_to_obj = date_to_obj + timedelta(days=1) objects = objects.filter(geo_obj__timestamp__lt=date_to_obj) except (ValueError, TypeError): pass # Filter by source type (lyngsat_source) has_source_type = request.GET.get("has_source_type") if has_source_type == "1": objects = objects.filter(lyngsat_source__isnull=False) elif has_source_type == "0": objects = objects.filter(lyngsat_source__isnull=True) # Filter by sigma (sigma parameters) has_sigma = request.GET.get("has_sigma") if has_sigma == "1": objects = objects.filter(parameter_obj__sigma_parameter__isnull=False) elif has_sigma == "0": objects = objects.filter(parameter_obj__sigma_parameter__isnull=True) if search_query: search_query = search_query.strip() if search_query: objects = objects.filter( models.Q(name__icontains=search_query) | models.Q(geo_obj__location__icontains=search_query) ) else: selected_sat_id = None objects = objects.annotate( first_param_freq=F("parameter_obj__frequency"), first_param_range=F("parameter_obj__freq_range"), first_param_snr=F("parameter_obj__snr"), first_param_bod=F("parameter_obj__bod_velocity"), first_param_sat_name=F("parameter_obj__id_satellite__name"), first_param_pol_name=F("parameter_obj__polarization__name"), first_param_mod_name=F("parameter_obj__modulation__name"), ) valid_sort_fields = { "name": "name", "-name": "-name", "updated_at": "updated_at", "-updated_at": "-updated_at", "created_at": "created_at", "-created_at": "-created_at", "updated_by": "updated_by__user__username", "-updated_by": "-updated_by__user__username", "created_by": "created_by__user__username", "-created_by": "-created_by__user__username", "geo_timestamp": "geo_obj__timestamp", "-geo_timestamp": "-geo_obj__timestamp", "frequency": "first_param_freq", "-frequency": "-first_param_freq", "freq_range": "first_param_range", "-freq_range": "-first_param_range", "snr": "first_param_snr", "-snr": "-first_param_snr", "bod_velocity": "first_param_bod", "-bod_velocity": "-first_param_bod", "satellite": "first_param_sat_name", "-satellite": "-first_param_sat_name", "polarization": "first_param_pol_name", "-polarization": "-first_param_pol_name", "modulation": "first_param_mod_name", "-modulation": "-first_param_mod_name", } if sort_param in valid_sort_fields: objects = objects.order_by(valid_sort_fields[sort_param]) paginator = Paginator(objects, items_per_page) page_obj = paginator.get_page(page_number) processed_objects = [] for obj in page_obj: param = getattr(obj, 'parameter_obj', None) geo_coords = "-" geo_timestamp = "-" geo_location = "-" kupsat_coords = "-" valid_coords = "-" distance_geo_kup = "-" distance_geo_valid = "-" distance_kup_valid = "-" if hasattr(obj, "geo_obj") and obj.geo_obj: geo_timestamp = obj.geo_obj.timestamp geo_location = obj.geo_obj.location if obj.geo_obj.coords: longitude = obj.geo_obj.coords.coords[0] latitude = obj.geo_obj.coords.coords[1] lon = f"{longitude}E" if longitude > 0 else f"{abs(longitude)}W" lat = f"{latitude}N" if latitude > 0 else f"{abs(latitude)}S" geo_coords = f"{lat} {lon}" satellite_name = "-" frequency = "-" freq_range = "-" polarization_name = "-" bod_velocity = "-" modulation_name = "-" snr = "-" standard_name = "-" comment = "-" is_average = "-" if param: if hasattr(param, "id_satellite") and param.id_satellite: satellite_name = ( param.id_satellite.name if hasattr(param.id_satellite, "name") else "-" ) frequency = ( f"{param.frequency:.3f}" if param.frequency is not None else "-" ) freq_range = ( f"{param.freq_range:.3f}" if param.freq_range is not None else "-" ) bod_velocity = ( f"{param.bod_velocity:.0f}" if param.bod_velocity is not None else "-" ) snr = f"{param.snr:.0f}" if param.snr is not None else "-" if hasattr(param, "polarization") and param.polarization: polarization_name = ( param.polarization.name if hasattr(param.polarization, "name") else "-" ) if hasattr(param, "modulation") and param.modulation: modulation_name = ( param.modulation.name if hasattr(param.modulation, "name") else "-" ) if hasattr(param, "standard") and param.standard: standard_name = ( param.standard.name if hasattr(param.standard, "name") else "-" ) if hasattr(obj, "geo_obj") and obj.geo_obj: comment = obj.geo_obj.comment or "-" is_average = "Да" if obj.geo_obj.is_average else "Нет" if obj.geo_obj.is_average is not None else "-" source_type = "ТВ" if obj.lyngsat_source else "-" has_sigma = False sigma_info = "-" if param: sigma_count = param.sigma_parameter.count() if sigma_count > 0: has_sigma = True first_sigma = param.sigma_parameter.first() if first_sigma: sigma_freq = f"{first_sigma.transfer_frequency:.3f}" if first_sigma.transfer_frequency else "-" sigma_range = f"{first_sigma.freq_range:.3f}" if first_sigma.freq_range else "-" sigma_pol = first_sigma.polarization.name if first_sigma.polarization else "-" sigma_pol_short = sigma_pol[0] if sigma_pol and sigma_pol != "-" else "-" sigma_info = f"{sigma_freq}/{sigma_range}/{sigma_pol_short}" processed_objects.append( { "id": obj.id, "name": obj.name or "-", "satellite_name": satellite_name, "frequency": frequency, "freq_range": freq_range, "polarization": polarization_name, "bod_velocity": bod_velocity, "modulation": modulation_name, "snr": snr, "geo_timestamp": geo_timestamp, "geo_location": geo_location, "geo_coords": geo_coords, "kupsat_coords": kupsat_coords, "valid_coords": valid_coords, "distance_geo_kup": distance_geo_kup, "distance_geo_valid": distance_geo_valid, "distance_kup_valid": distance_kup_valid, "updated_by": obj.updated_by if obj.updated_by else "-", "comment": comment, "is_average": is_average, "source_type": source_type, "standard": standard_name, "has_sigma": has_sigma, "sigma_info": sigma_info, "obj": obj, } ) modulations = Modulation.objects.all() polarizations = Polarization.objects.all() # Get the new filter values has_source_type = request.GET.get("has_source_type") has_sigma = request.GET.get("has_sigma") context = { "satellites": satellites, "selected_satellite_id": selected_sat_id, "page_obj": page_obj, "processed_objects": processed_objects, "items_per_page": items_per_page, "available_items_per_page": [50, 100, 500, 1000], "freq_min": freq_min, "freq_max": freq_max, "range_min": range_min, "range_max": range_max, "snr_min": snr_min, "snr_max": snr_max, "bod_min": bod_min, "bod_max": bod_max, "search_query": search_query, "selected_modulations": [ int(x) for x in selected_modulations if x.isdigit() ], "selected_polarizations": [ int(x) for x in selected_polarizations if x.isdigit() ], "selected_satellites": [int(x) for x in selected_satellites if x.isdigit()], "has_kupsat": has_kupsat, "has_valid": has_valid, "date_from": date_from, "date_to": date_to, "has_source_type": has_source_type, "has_sigma": has_sigma, "modulations": modulations, "polarizations": polarizations, "full_width_page": True, "sort": sort_param, } return render(request, "mainapp/objitem_list.html", context) class ObjItemFormView( RoleRequiredMixin, CoordinateProcessingMixin, FormMessageMixin, UpdateView ): """ Базовый класс для создания и редактирования ObjItem. Содержит общую логику обработки форм, координат и параметров. """ model = ObjItem form_class = ObjItemForm template_name = "mainapp/objitem_form.html" success_url = reverse_lazy("mainapp:home") required_roles = ["admin", "moderator"] def get_success_url(self): """Возвращает URL с сохраненными параметрами фильтров.""" # Получаем все параметры из GET запроса и сохраняем их в URL if self.request.GET: from urllib.parse import urlencode query_string = urlencode(self.request.GET) return reverse_lazy("mainapp:objitem_list") + '?' + query_string return reverse_lazy("mainapp:objitem_list") def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["LEAFLET_CONFIG"] = { "DEFAULT_CENTER": (55.75, 37.62), "DEFAULT_ZOOM": 5, } # Сохраняем параметры возврата для кнопки "Назад" context["return_params"] = self.request.GET.get('return_params', '') # Работаем с одной формой параметра вместо formset if self.object and hasattr(self.object, "parameter_obj") and self.object.parameter_obj: context["parameter_form"] = ParameterForm( instance=self.object.parameter_obj, prefix="parameter" ) else: context["parameter_form"] = ParameterForm(prefix="parameter") if self.object and hasattr(self.object, "geo_obj") and self.object.geo_obj: context["geo_form"] = GeoForm( instance=self.object.geo_obj, prefix="geo" ) else: context["geo_form"] = GeoForm(prefix="geo") return context def form_valid(self, form): # Получаем форму параметра if self.object and hasattr(self.object, "parameter_obj") and self.object.parameter_obj: parameter_form = ParameterForm( self.request.POST, instance=self.object.parameter_obj, prefix="parameter" ) else: parameter_form = ParameterForm(self.request.POST, prefix="parameter") if self.object and hasattr(self.object, "geo_obj") and self.object.geo_obj: geo_form = GeoForm(self.request.POST, instance=self.object.geo_obj, prefix="geo") else: geo_form = GeoForm(self.request.POST, prefix="geo") # Сохраняем основной объект self.object = form.save(commit=False) self.set_user_fields() self.object.save() # Сохраняем связанный параметр if parameter_form.is_valid(): self.save_parameter(parameter_form) else: context = self.get_context_data() context.update({ 'form': form, 'parameter_form': parameter_form, 'geo_form': geo_form, }) return self.render_to_response(context) # Сохраняем геоданные if geo_form.is_valid(): self.save_geo_data(geo_form) else: context = self.get_context_data() context.update({ 'form': form, 'parameter_form': parameter_form, 'geo_form': geo_form, }) return self.render_to_response(context) return super().form_valid(form) def set_user_fields(self): """Устанавливает поля пользователя для объекта.""" raise NotImplementedError("Subclasses must implement set_user_fields()") def save_parameter(self, parameter_form): """Сохраняет параметр объекта через OneToOne связь.""" if parameter_form.is_valid(): instance = parameter_form.save(commit=False) instance.objitem = self.object instance.save() def save_geo_data(self, geo_form): """Сохраняет геоданные объекта.""" geo_instance = self.get_or_create_geo_instance() # Обновляем поля из geo_form if geo_form.is_valid(): geo_instance.location = geo_form.cleaned_data["location"] geo_instance.comment = geo_form.cleaned_data["comment"] geo_instance.is_average = geo_form.cleaned_data["is_average"] # Обрабатываем координаты self.process_coordinates(geo_instance) # Обрабатываем дату/время self.process_timestamp(geo_instance) geo_instance.save() def get_or_create_geo_instance(self): """Получает или создает экземпляр Geo.""" if hasattr(self.object, "geo_obj") and self.object.geo_obj: return self.object.geo_obj return Geo(objitem=self.object) class ObjItemUpdateView(ObjItemFormView): """Представление для редактирования ObjItem.""" success_message = "Объект успешно сохранён!" def set_user_fields(self): self.object.updated_by = self.request.user.customuser class ObjItemCreateView(ObjItemFormView, CreateView): """Представление для создания ObjItem.""" success_message = "Объект успешно создан!" def set_user_fields(self): self.object.created_by = self.request.user.customuser self.object.updated_by = self.request.user.customuser class ObjItemDeleteView(RoleRequiredMixin, FormMessageMixin, DeleteView): model = ObjItem template_name = "mainapp/objitem_confirm_delete.html" success_url = reverse_lazy("mainapp:objitem_list") success_message = "Объект успешно удалён!" required_roles = ["admin", "moderator"] def get_success_url(self): """Возвращает URL с сохраненными параметрами фильтров.""" # Получаем все параметры из GET запроса и сохраняем их в URL if self.request.GET: from urllib.parse import urlencode query_string = urlencode(self.request.GET) return reverse_lazy("mainapp:objitem_list") + '?' + query_string return reverse_lazy("mainapp:objitem_list") class ObjItemDetailView(LoginRequiredMixin, View): """ Представление для просмотра деталей ObjItem в режиме чтения. Доступно для всех авторизованных пользователей, показывает данные в режиме чтения. """ def get(self, request, pk): obj = ObjItem.objects.filter(pk=pk).select_related( 'geo_obj', 'updated_by__user', 'created_by__user', 'parameter_obj', 'parameter_obj__id_satellite', 'parameter_obj__polarization', 'parameter_obj__modulation', 'parameter_obj__standard', ).first() if not obj: from django.http import Http404 raise Http404("Объект не найден") # Сохраняем параметры возврата для кнопки "Назад" return_params = request.GET.get('return_params', '') context = { 'object': obj, 'return_params': return_params } return render(request, "mainapp/objitem_detail.html", context) class FillLyngsatDataView(LoginRequiredMixin, FormMessageMixin, FormView): """ Представление для заполнения данных из Lyngsat. Позволяет выбрать спутники и регионы для парсинга данных с сайта Lyngsat. Запускает асинхронную задачу Celery для обработки. """ template_name = "mainapp/fill_lyngsat_data.html" form_class = FillLyngsatDataForm success_url = reverse_lazy("mainapp:lyngsat_task_status") error_message = "Форма заполнена некорректно" def form_valid(self, form): satellites = form.cleaned_data["satellites"] regions = form.cleaned_data["regions"] use_cache = form.cleaned_data.get("use_cache", True) force_refresh = form.cleaned_data.get("force_refresh", False) # Получаем названия спутников target_sats = [sat.name for sat in satellites] try: from lyngsatapp.tasks import fill_lyngsat_data_task # Запускаем асинхронную задачу с параметрами кеширования task = fill_lyngsat_data_task.delay( target_sats, regions, force_refresh=force_refresh, use_cache=use_cache ) cache_status = "без кеша" if not use_cache else ("с обновлением кеша" if force_refresh else "с кешированием") messages.success( self.request, f"Задача запущена ({cache_status})! ID задачи: {task.id}. " "Вы будете перенаправлены на страницу отслеживания прогресса." ) # Перенаправляем на страницу статуса задачи return redirect('mainapp:lyngsat_task_status', task_id=task.id) except Exception as e: messages.error(self.request, f"Ошибка при запуске задачи: {str(e)}") return redirect("mainapp:fill_lyngsat_data") class LyngsatTaskStatusView(LoginRequiredMixin, View): """ Представление для отслеживания статуса задачи заполнения данных Lyngsat. """ template_name = "mainapp/lyngsat_task_status.html" def get(self, request, task_id=None): context = { 'task_id': task_id } return render(request, self.template_name, context) class LyngsatTaskStatusAPIView(LoginRequiredMixin, View): """ API для получения статуса задачи Celery. """ def get(self, request, task_id): from celery.result import AsyncResult from django.core.cache import cache task = AsyncResult(task_id) response_data = { 'task_id': task_id, 'state': task.state, 'result': None, 'error': None } if task.state == 'PENDING': response_data['status'] = 'Задача в очереди...' elif task.state == 'PROGRESS': response_data['status'] = task.info.get('status', '') response_data['current'] = task.info.get('current', 0) response_data['total'] = task.info.get('total', 1) response_data['percent'] = int((task.info.get('current', 0) / task.info.get('total', 1)) * 100) elif task.state == 'SUCCESS': # Получаем результат из кеша result = cache.get(f'lyngsat_task_{task_id}') if result: response_data['result'] = result response_data['status'] = 'Задача завершена успешно' else: response_data['result'] = task.result response_data['status'] = 'Задача завершена' elif task.state == 'FAILURE': response_data['status'] = 'Ошибка при выполнении задачи' response_data['error'] = str(task.info) else: response_data['status'] = task.state return JsonResponse(response_data) class ClearLyngsatCacheView(LoginRequiredMixin, View): """ Представление для очистки кеша LyngSat. """ def post(self, request): from lyngsatapp.tasks import clear_cache_task cache_type = request.POST.get('cache_type', 'all') try: # Запускаем задачу очистки кеша task = clear_cache_task.delay(cache_type) messages.success( request, f"Задача очистки кеша ({cache_type}) запущена! ID задачи: {task.id}" ) except Exception as e: messages.error(request, f"Ошибка при запуске задачи очистки кеша: {str(e)}") return redirect(request.META.get('HTTP_REFERER', 'mainapp:home')) def get(self, request): """Страница управления кешем""" return render(request, 'mainapp/clear_lyngsat_cache.html')