Files
dbstorage/dbapp/mainapp/views/tech_analyze.py

486 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.contrib import messages
from django.contrib.auth.mixins import LoginRequiredMixin
from django.core.paginator import Paginator
from django.db import transaction
from django.db.models import Q
from django.http import JsonResponse
from django.shortcuts import render
from django.views import View
from django.views.decorators.http import require_http_methods
import json
from ..models import (
TechAnalyze,
Satellite,
Polarization,
Modulation,
Standard,
ObjItem,
Parameter,
)
from ..mixins import RoleRequiredMixin
from ..utils import parse_pagination_params, find_matching_transponder, find_matching_lyngsat
class TechAnalyzeEntryView(LoginRequiredMixin, View):
"""
Представление для ввода данных технического анализа.
"""
def get(self, request):
satellites = Satellite.objects.all().order_by('name')
context = {
'satellites': satellites,
}
return render(request, 'mainapp/tech_analyze_entry.html', context)
class TechAnalyzeSaveView(LoginRequiredMixin, View):
"""
API endpoint для сохранения данных технического анализа.
"""
def post(self, request):
try:
data = json.loads(request.body)
satellite_id = data.get('satellite_id')
rows = data.get('rows', [])
if not satellite_id:
return JsonResponse({
'success': False,
'error': 'Не выбран спутник'
}, status=400)
if not rows:
return JsonResponse({
'success': False,
'error': 'Нет данных для сохранения'
}, status=400)
try:
satellite = Satellite.objects.get(id=satellite_id)
except Satellite.DoesNotExist:
return JsonResponse({
'success': False,
'error': 'Спутник не найден'
}, status=404)
created_count = 0
updated_count = 0
errors = []
with transaction.atomic():
for idx, row in enumerate(rows, start=1):
try:
name = row.get('name', '').strip()
if not name:
errors.append(f"Строка {idx}: отсутствует имя")
continue
# Обработка поляризации
polarization_name = row.get('polarization', '').strip() or '-'
polarization, _ = Polarization.objects.get_or_create(name=polarization_name)
# Обработка модуляции
modulation_name = row.get('modulation', '').strip() or '-'
modulation, _ = Modulation.objects.get_or_create(name=modulation_name)
# Обработка стандарта
standard_name = row.get('standard', '').strip()
if standard_name.lower() == 'unknown':
standard_name = '-'
if not standard_name:
standard_name = '-'
standard, _ = Standard.objects.get_or_create(name=standard_name)
# Обработка числовых полей
frequency = row.get('frequency')
if frequency:
try:
frequency = float(str(frequency).replace(',', '.'))
except (ValueError, TypeError):
frequency = 0
else:
frequency = 0
freq_range = row.get('freq_range')
if freq_range:
try:
freq_range = float(str(freq_range).replace(',', '.'))
except (ValueError, TypeError):
freq_range = 0
else:
freq_range = 0
bod_velocity = row.get('bod_velocity')
if bod_velocity:
try:
bod_velocity = float(str(bod_velocity).replace(',', '.'))
except (ValueError, TypeError):
bod_velocity = 0
else:
bod_velocity = 0
note = row.get('note', '').strip()
# Создание или обновление записи
tech_analyze, created = TechAnalyze.objects.update_or_create(
name=name,
defaults={
'satellite': satellite,
'polarization': polarization,
'frequency': frequency,
'freq_range': freq_range,
'bod_velocity': bod_velocity,
'modulation': modulation,
'standard': standard,
'note': note,
'updated_by': request.user.customuser if hasattr(request.user, 'customuser') else None,
}
)
if created:
tech_analyze.created_by = request.user.customuser if hasattr(request.user, 'customuser') else None
tech_analyze.save()
created_count += 1
else:
updated_count += 1
except Exception as e:
errors.append(f"Строка {idx}: {str(e)}")
response_data = {
'success': True,
'created': created_count,
'updated': updated_count,
'total': created_count + updated_count,
}
if errors:
response_data['errors'] = errors
return JsonResponse(response_data)
except json.JSONDecodeError:
return JsonResponse({
'success': False,
'error': 'Неверный формат данных'
}, status=400)
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
class LinkExistingPointsView(LoginRequiredMixin, View):
"""
API endpoint для привязки существующих точек к данным теханализа.
Алгоритм:
1. Получить все ObjItem для выбранного спутника
2. Для каждого ObjItem:
- Извлечь имя источника
- Найти соответствующую запись TechAnalyze по имени и спутнику
- Если найдена и данные отсутствуют в Parameter:
* Обновить модуляцию (если "-")
* Обновить символьную скорость (если -1.0 или None)
* Обновить стандарт (если "-")
* Обновить частоту (если 0 или None)
* Обновить полосу частот (если 0 или None)
* Подобрать подходящий транспондер
"""
def post(self, request):
try:
data = json.loads(request.body)
satellite_id = data.get('satellite_id')
if not satellite_id:
return JsonResponse({
'success': False,
'error': 'Не выбран спутник'
}, status=400)
try:
satellite = Satellite.objects.get(id=satellite_id)
except Satellite.DoesNotExist:
return JsonResponse({
'success': False,
'error': 'Спутник не найден'
}, status=404)
# Получаем все ObjItem для данного спутника
objitems = ObjItem.objects.filter(
parameter_obj__id_satellite=satellite
).select_related('parameter_obj', 'parameter_obj__modulation', 'parameter_obj__standard', 'parameter_obj__polarization')
updated_count = 0
skipped_count = 0
errors = []
with transaction.atomic():
for objitem in objitems:
try:
if not objitem.parameter_obj:
skipped_count += 1
continue
parameter = objitem.parameter_obj
source_name = objitem.name
# Проверяем, нужно ли обновлять данные
needs_update = (
(parameter.modulation and parameter.modulation.name == "-") or
parameter.bod_velocity is None or
parameter.bod_velocity == -1.0 or
parameter.bod_velocity == 0 or
(parameter.standard and parameter.standard.name == "-") or
parameter.frequency is None or
parameter.frequency == 0 or
parameter.frequency == -1.0 or
parameter.freq_range is None or
parameter.freq_range == 0 or
parameter.freq_range == -1.0 or
objitem.transponder is None
)
if not needs_update:
skipped_count += 1
continue
# Ищем данные в TechAnalyze по имени и спутнику
tech_analyze = TechAnalyze.objects.filter(
name=source_name,
satellite=satellite
).select_related('modulation', 'standard', 'polarization').first()
if not tech_analyze:
skipped_count += 1
continue
# Обновляем данные
updated = False
# Обновляем модуляцию
if parameter.modulation and parameter.modulation.name == "-" and tech_analyze.modulation:
parameter.modulation = tech_analyze.modulation
updated = True
# Обновляем символьную скорость
if (parameter.bod_velocity is None or parameter.bod_velocity == -1.0 or parameter.bod_velocity == 0) and \
tech_analyze.bod_velocity and tech_analyze.bod_velocity > 0:
parameter.bod_velocity = tech_analyze.bod_velocity
updated = True
# Обновляем стандарт
if parameter.standard and parameter.standard.name == "-" and tech_analyze.standard:
parameter.standard = tech_analyze.standard
updated = True
# Обновляем частоту
if (parameter.frequency is None or parameter.frequency == 0 or parameter.frequency == -1.0) and \
tech_analyze.frequency and tech_analyze.frequency > 0:
parameter.frequency = tech_analyze.frequency
updated = True
# Обновляем полосу частот
if (parameter.freq_range is None or parameter.freq_range == 0 or parameter.freq_range == -1.0) and \
tech_analyze.freq_range and tech_analyze.freq_range > 0:
parameter.freq_range = tech_analyze.freq_range
updated = True
# Обновляем поляризацию если нужно
if parameter.polarization and parameter.polarization.name == "-" and tech_analyze.polarization:
parameter.polarization = tech_analyze.polarization
updated = True
# Сохраняем parameter перед поиском транспондера (чтобы использовать обновленные данные)
if updated:
parameter.save()
# Подбираем транспондер если его нет (используем функцию из utils)
if objitem.transponder is None and parameter.frequency and parameter.frequency > 0:
transponder = find_matching_transponder(
satellite,
parameter.frequency,
parameter.polarization
)
if transponder:
objitem.transponder = transponder
updated = True
# Подбираем источник LyngSat если его нет (используем функцию из utils)
if objitem.lyngsat_source is None and parameter.frequency and parameter.frequency > 0:
lyngsat_source = find_matching_lyngsat(
satellite,
parameter.frequency,
parameter.polarization,
tolerance_mhz=0.1
)
if lyngsat_source:
objitem.lyngsat_source = lyngsat_source
updated = True
# Сохраняем objitem если были изменения транспондера или lyngsat
if objitem.transponder or objitem.lyngsat_source:
objitem.save()
if updated:
updated_count += 1
else:
skipped_count += 1
except Exception as e:
errors.append(f"ObjItem {objitem.id}: {str(e)}")
response_data = {
'success': True,
'updated': updated_count,
'skipped': skipped_count,
'total': objitems.count(),
}
if errors:
response_data['errors'] = errors
return JsonResponse(response_data)
except json.JSONDecodeError:
return JsonResponse({
'success': False,
'error': 'Неверный формат данных'
}, status=400)
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
class TechAnalyzeListView(LoginRequiredMixin, View):
"""
Представление для отображения списка данных технического анализа.
"""
def get(self, request):
# Получаем список спутников для фильтра
satellites = Satellite.objects.all().order_by('name')
# Получаем параметры из URL для передачи в шаблон
search_query = request.GET.get('search', '').strip()
satellite_ids = request.GET.getlist('satellite_id')
items_per_page = int(request.GET.get('items_per_page', 50))
context = {
'satellites': satellites,
'selected_satellites': [int(sid) for sid in satellite_ids if sid],
'search_query': search_query,
'items_per_page': items_per_page,
'available_items_per_page': [25, 50, 100, 200, 500],
'full_width_page': True,
}
return render(request, 'mainapp/tech_analyze_list.html', context)
class TechAnalyzeDeleteView(LoginRequiredMixin, RoleRequiredMixin, View):
"""
API endpoint для удаления выбранных записей теханализа.
"""
allowed_roles = ['admin', 'moderator']
def post(self, request):
try:
data = json.loads(request.body)
ids = data.get('ids', [])
if not ids:
return JsonResponse({
'success': False,
'error': 'Не выбраны записи для удаления'
}, status=400)
# Удаляем записи
deleted_count, _ = TechAnalyze.objects.filter(id__in=ids).delete()
return JsonResponse({
'success': True,
'deleted': deleted_count,
'message': f'Удалено записей: {deleted_count}'
})
except json.JSONDecodeError:
return JsonResponse({
'success': False,
'error': 'Неверный формат данных'
}, status=400)
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
class TechAnalyzeAPIView(LoginRequiredMixin, View):
"""
API endpoint для получения данных теханализа в формате для Tabulator.
"""
def get(self, request):
# Получаем параметры фильтрации
search_query = request.GET.get('search', '').strip()
satellite_ids = request.GET.getlist('satellite_id')
# Получаем параметры пагинации от Tabulator
page = int(request.GET.get('page', 1))
size = int(request.GET.get('size', 50))
# Базовый queryset
tech_analyzes = TechAnalyze.objects.select_related(
'satellite', 'polarization', 'modulation', 'standard', 'created_by', 'updated_by'
).order_by('-created_at')
# Применяем фильтры
if search_query:
tech_analyzes = tech_analyzes.filter(
Q(name__icontains=search_query) |
Q(id__icontains=search_query)
)
if satellite_ids:
tech_analyzes = tech_analyzes.filter(satellite_id__in=satellite_ids)
# Пагинация
paginator = Paginator(tech_analyzes, size)
page_obj = paginator.get_page(page)
# Формируем данные для Tabulator
results = []
for item in page_obj:
results.append({
'id': item.id,
'name': item.name or '',
'satellite_id': item.satellite.id if item.satellite else None,
'satellite_name': item.satellite.name if item.satellite else '-',
'frequency': float(item.frequency) if item.frequency else 0,
'freq_range': float(item.freq_range) if item.freq_range else 0,
'bod_velocity': float(item.bod_velocity) if item.bod_velocity else 0,
'polarization_name': item.polarization.name if item.polarization else '-',
'modulation_name': item.modulation.name if item.modulation else '-',
'standard_name': item.standard.name if item.standard else '-',
'note': item.note or '',
'created_at': item.created_at.isoformat() if item.created_at else None,
'updated_at': item.updated_at.isoformat() if item.updated_at else None,
})
return JsonResponse({
'last_page': paginator.num_pages,
'data': results,
})