Files
dbstorage/dbapp/mainapp/views/api.py

302 lines
13 KiB
Python

"""
API endpoints for AJAX requests and data retrieval.
"""
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import JsonResponse
from django.utils import timezone
from django.views import View
from ..models import ObjItem
class GetLocationsView(LoginRequiredMixin, View):
"""API endpoint for getting locations by satellite ID in GeoJSON format."""
def get(self, request, sat_id):
locations = (
ObjItem.objects.filter(parameter_obj__id_satellite=sat_id)
.select_related(
"geo_obj",
"parameter_obj",
"parameter_obj__polarization",
)
)
if not locations.exists():
return JsonResponse({"error": "Объектов не найдено"}, status=404)
features = []
for loc in locations:
if not hasattr(loc, "geo_obj") or not loc.geo_obj or not loc.geo_obj.coords:
continue
param = getattr(loc, 'parameter_obj', None)
if not param:
continue
features.append(
{
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [loc.geo_obj.coords[0], loc.geo_obj.coords[1]],
},
"properties": {
"pol": param.polarization.name if param.polarization else "-",
"freq": param.frequency * 1000000 if param.frequency else 0,
"name": loc.name or "-",
"id": loc.geo_obj.id,
},
}
)
return JsonResponse({"type": "FeatureCollection", "features": features})
class LyngsatDataAPIView(LoginRequiredMixin, View):
"""API endpoint for getting LyngSat source data."""
def get(self, request, lyngsat_id):
from lyngsatapp.models import LyngSat
try:
lyngsat = LyngSat.objects.select_related(
'id_satellite',
'polarization',
'modulation',
'standard'
).get(id=lyngsat_id)
# Format date with local timezone
last_update_str = '-'
if lyngsat.last_update:
local_time = timezone.localtime(lyngsat.last_update)
last_update_str = local_time.strftime("%d.%m.%Y")
data = {
'id': lyngsat.id,
'satellite': lyngsat.id_satellite.name if lyngsat.id_satellite else '-',
'frequency': f"{lyngsat.frequency:.3f}" if lyngsat.frequency else '-',
'polarization': lyngsat.polarization.name if lyngsat.polarization else '-',
'modulation': lyngsat.modulation.name if lyngsat.modulation else '-',
'standard': lyngsat.standard.name if lyngsat.standard else '-',
'sym_velocity': f"{lyngsat.sym_velocity:.0f}" if lyngsat.sym_velocity else '-',
'fec': lyngsat.fec or '-',
'channel_info': lyngsat.channel_info or '-',
'last_update': last_update_str,
'url': lyngsat.url or None,
}
return JsonResponse(data)
except LyngSat.DoesNotExist:
return JsonResponse({'error': 'Источник LyngSat не найден'}, status=404)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
class SigmaParameterDataAPIView(LoginRequiredMixin, View):
"""API endpoint for getting SigmaParameter data."""
def get(self, request, parameter_id):
from ..models import Parameter
try:
parameter = Parameter.objects.select_related(
'id_satellite',
'polarization',
'modulation',
'standard'
).prefetch_related(
'sigma_parameter__mark',
'sigma_parameter__id_satellite',
'sigma_parameter__polarization',
'sigma_parameter__modulation',
'sigma_parameter__standard'
).get(id=parameter_id)
# Get all related SigmaParameter
sigma_params = parameter.sigma_parameter.all()
sigma_data = []
for sigma in sigma_params:
# Get marks
marks = []
for mark in sigma.mark.all().order_by('-timestamp'):
mark_str = '+' if mark.mark else '-'
date_str = '-'
if mark.timestamp:
local_time = timezone.localtime(mark.timestamp)
date_str = local_time.strftime("%d.%m.%Y %H:%M")
marks.append({
'mark': mark_str,
'date': date_str
})
# Format start and end dates
datetime_begin_str = '-'
if sigma.datetime_begin:
local_time = timezone.localtime(sigma.datetime_begin)
datetime_begin_str = local_time.strftime("%d.%m.%Y %H:%M")
datetime_end_str = '-'
if sigma.datetime_end:
local_time = timezone.localtime(sigma.datetime_end)
datetime_end_str = local_time.strftime("%d.%m.%Y %H:%M")
sigma_data.append({
'id': sigma.id,
'satellite': sigma.id_satellite.name if sigma.id_satellite else '-',
'frequency': f"{sigma.frequency:.3f}" if sigma.frequency else '-',
'transfer_frequency': f"{sigma.transfer_frequency:.3f}" if sigma.transfer_frequency else '-',
'freq_range': f"{sigma.freq_range:.3f}" if sigma.freq_range else '-',
'polarization': sigma.polarization.name if sigma.polarization else '-',
'modulation': sigma.modulation.name if sigma.modulation else '-',
'standard': sigma.standard.name if sigma.standard else '-',
'bod_velocity': f"{sigma.bod_velocity:.0f}" if sigma.bod_velocity else '-',
'snr': f"{sigma.snr:.1f}" if sigma.snr is not None else '-',
'power': f"{sigma.power:.1f}" if sigma.power is not None else '-',
'status': sigma.status or '-',
'packets': 'Да' if sigma.packets else 'Нет' if sigma.packets is not None else '-',
'datetime_begin': datetime_begin_str,
'datetime_end': datetime_end_str,
'marks': marks
})
return JsonResponse({
'parameter_id': parameter.id,
'sigma_parameters': sigma_data
})
except Parameter.DoesNotExist:
return JsonResponse({'error': 'Parameter не найден'}, status=404)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
class SourceObjItemsAPIView(LoginRequiredMixin, View):
"""API endpoint for getting ObjItems related to a Source."""
def get(self, request, source_id):
from ..models import Source
try:
# Load Source with prefetch_related for ObjItem
source = Source.objects.prefetch_related(
'source_objitems',
'source_objitems__parameter_obj',
'source_objitems__parameter_obj__id_satellite',
'source_objitems__parameter_obj__polarization',
'source_objitems__parameter_obj__modulation',
'source_objitems__geo_obj'
).get(id=source_id)
# Get all related ObjItems, sorted by created_at
objitems = source.source_objitems.all().order_by('created_at')
objitems_data = []
for objitem in objitems:
# Get parameter data
param = getattr(objitem, 'parameter_obj', None)
satellite_name = '-'
frequency = '-'
freq_range = '-'
polarization = '-'
bod_velocity = '-'
modulation = '-'
snr = '-'
if param:
if hasattr(param, 'id_satellite') and param.id_satellite:
satellite_name = param.id_satellite.name
frequency = f"{param.frequency:.3f}" if param.frequency is not None else '-'
freq_range = f"{param.freq_range:.3f}" if param.freq_range is not None else '-'
if hasattr(param, 'polarization') and param.polarization:
polarization = param.polarization.name
bod_velocity = f"{param.bod_velocity:.0f}" if param.bod_velocity is not None else '-'
if hasattr(param, 'modulation') and param.modulation:
modulation = param.modulation.name
snr = f"{param.snr:.0f}" if param.snr is not None else '-'
# Get geo data
geo_timestamp = '-'
geo_location = '-'
geo_coords = '-'
if hasattr(objitem, 'geo_obj') and objitem.geo_obj:
if objitem.geo_obj.timestamp:
local_time = timezone.localtime(objitem.geo_obj.timestamp)
geo_timestamp = local_time.strftime("%d.%m.%Y %H:%M")
geo_location = objitem.geo_obj.location or '-'
if objitem.geo_obj.coords:
longitude = objitem.geo_obj.coords.coords[0]
latitude = objitem.geo_obj.coords.coords[1]
lon = f"{longitude}E" if longitude > 0 else f"{abs(longitude)}W"
lat = f"{latitude}N" if latitude > 0 else f"{abs(latitude)}S"
geo_coords = f"{lat} {lon}"
objitems_data.append({
'id': objitem.id,
'name': objitem.name or '-',
'satellite_name': satellite_name,
'frequency': frequency,
'freq_range': freq_range,
'polarization': polarization,
'bod_velocity': bod_velocity,
'modulation': modulation,
'snr': snr,
'geo_timestamp': geo_timestamp,
'geo_location': geo_location,
'geo_coords': geo_coords
})
return JsonResponse({
'source_id': source_id,
'objitems': objitems_data
})
except Source.DoesNotExist:
return JsonResponse({'error': 'Источник не найден'}, status=404)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
class LyngsatTaskStatusAPIView(LoginRequiredMixin, View):
"""API endpoint for getting Celery task status."""
def get(self, request, task_id):
from celery.result import AsyncResult
from django.core.cache import cache
task = AsyncResult(task_id)
response_data = {
'task_id': task_id,
'state': task.state,
'result': None,
'error': None
}
if task.state == 'PENDING':
response_data['status'] = 'Задача в очереди...'
elif task.state == 'PROGRESS':
response_data['status'] = task.info.get('status', '')
response_data['current'] = task.info.get('current', 0)
response_data['total'] = task.info.get('total', 1)
response_data['percent'] = int((task.info.get('current', 0) / task.info.get('total', 1)) * 100)
elif task.state == 'SUCCESS':
# Get result from cache
result = cache.get(f'lyngsat_task_{task_id}')
if result:
response_data['result'] = result
response_data['status'] = 'Задача завершена успешно'
else:
response_data['result'] = task.result
response_data['status'] = 'Задача завершена'
elif task.state == 'FAILURE':
response_data['status'] = 'Ошибка при выполнении задачи'
response_data['error'] = str(task.info)
else:
response_data['status'] = task.state
return JsonResponse(response_data)