Начал с усреднениями

This commit is contained in:
2025-11-28 00:18:04 +03:00
parent 908e11879d
commit d521b6baad
5 changed files with 1345 additions and 0 deletions

View File

@@ -0,0 +1,480 @@
"""
Points averaging view for satellite data grouping by day/night intervals.
"""
from datetime import datetime, timedelta
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import JsonResponse
from django.shortcuts import render
from django.views import View
from django.utils import timezone
from ..models import ObjItem, Satellite
from ..utils import calculate_mean_coords, format_frequency, format_symbol_rate, format_coords_display, RANGE_DISTANCE
class PointsAveragingView(LoginRequiredMixin, View):
"""
View for points averaging form with date range selection and grouping.
"""
def get(self, request):
# Get satellites that have points with geo data
satellites = Satellite.objects.filter(
parameters__objitem__geo_obj__coords__isnull=False
).distinct().order_by('name')
context = {
'satellites': satellites,
'full_width_page': True,
}
return render(request, 'mainapp/points_averaging.html', context)
class PointsAveragingAPIView(LoginRequiredMixin, View):
"""
API endpoint for grouping and averaging points by day/night intervals.
Groups points into:
- Day: 08:00 - 19:00
- Night: 19:00 - 08:00 (next day)
For each group, calculates average coordinates and checks for outliers (>56 km).
"""
def get(self, request):
satellite_id = request.GET.get('satellite_id', '').strip()
date_from = request.GET.get('date_from', '').strip()
date_to = request.GET.get('date_to', '').strip()
if not satellite_id:
return JsonResponse({'error': 'Выберите спутник'}, status=400)
if not date_from or not date_to:
return JsonResponse({'error': 'Укажите диапазон дат'}, status=400)
try:
satellite = Satellite.objects.get(id=int(satellite_id))
except (Satellite.DoesNotExist, ValueError):
return JsonResponse({'error': 'Спутник не найден'}, status=404)
# Parse dates
try:
date_from_obj = datetime.strptime(date_from, "%Y-%m-%d")
date_to_obj = datetime.strptime(date_to, "%Y-%m-%d") + timedelta(days=1)
except ValueError:
return JsonResponse({'error': 'Неверный формат даты'}, status=400)
# Get all points for the satellite in the date range
objitems = ObjItem.objects.filter(
parameter_obj__id_satellite=satellite,
geo_obj__coords__isnull=False,
geo_obj__timestamp__gte=date_from_obj,
geo_obj__timestamp__lt=date_to_obj,
).select_related(
'parameter_obj',
'parameter_obj__id_satellite',
'parameter_obj__polarization',
'parameter_obj__modulation',
'parameter_obj__standard',
'geo_obj',
'source',
).prefetch_related(
'geo_obj__mirrors'
).order_by('geo_obj__timestamp')
if not objitems.exists():
return JsonResponse({'error': 'Точки не найдены в указанном диапазоне'}, status=404)
# Group points by source name and day/night intervals
groups = self._group_points_by_intervals(objitems)
# Process each group: calculate average and check for outliers
result_groups = []
for group_key, points in groups.items():
group_result = self._process_group(group_key, points)
result_groups.append(group_result)
return JsonResponse({
'success': True,
'satellite': satellite.name,
'date_from': date_from,
'date_to': date_to,
'groups': result_groups,
'total_groups': len(result_groups),
})
def _group_points_by_intervals(self, objitems):
"""
Group points by source name and day/night intervals.
Day: 08:00 - 19:00
Night: 19:00 - 08:00 (next day)
"""
groups = {}
for objitem in objitems:
if not objitem.geo_obj or not objitem.geo_obj.timestamp:
continue
timestamp = objitem.geo_obj.timestamp
source_name = objitem.name or f"Объект #{objitem.id}"
# Determine interval
interval_key = self._get_interval_key(timestamp)
# Create group key: (source_name, interval_key)
group_key = (source_name, interval_key)
if group_key not in groups:
groups[group_key] = []
groups[group_key].append(objitem)
return groups
def _get_interval_key(self, timestamp):
"""
Get interval key for a timestamp.
Day: 08:00 - 19:00 -> "YYYY-MM-DD_day"
Night: 19:00 - 08:00 -> "YYYY-MM-DD_night" (date of the start of night)
"""
hour = timestamp.hour
date = timestamp.date()
if 8 <= hour < 19:
# Day interval
return f"{date.strftime('%Y-%m-%d')}_day"
elif hour >= 19:
# Night interval starting this day
return f"{date.strftime('%Y-%m-%d')}_night"
else:
# Night interval (00:00 - 08:00), belongs to previous day's night
prev_date = date - timedelta(days=1)
return f"{prev_date.strftime('%Y-%m-%d')}_night"
def _process_group(self, group_key, points):
"""
Process a group of points: calculate average and check for outliers.
Algorithm:
1. Find first pair of points within 56 km of each other
2. Calculate their average as initial center
3. Iteratively add points within 56 km of current average
4. Points not within 56 km of final average are outliers
"""
source_name, interval_key = group_key
# Parse interval info
date_str, interval_type = interval_key.rsplit('_', 1)
interval_date = datetime.strptime(date_str, '%Y-%m-%d').date()
if interval_type == 'day':
interval_label = f"{interval_date.strftime('%d.%m.%Y')} День (08:00-19:00)"
else:
interval_label = f"{interval_date.strftime('%d.%m.%Y')} Ночь (19:00-08:00)"
# Collect coordinates and build points_data
points_data = []
for objitem in points:
geo = objitem.geo_obj
param = getattr(objitem, 'parameter_obj', None)
coord = (geo.coords.x, geo.coords.y)
# Get mirrors
mirrors = '-'
if geo.mirrors.exists():
mirrors = ', '.join([m.name for m in geo.mirrors.all()])
# Format timestamp
timestamp_str = '-'
if geo.timestamp:
local_time = timezone.localtime(geo.timestamp)
timestamp_str = local_time.strftime("%d.%m.%Y %H:%M")
points_data.append({
'id': objitem.id,
'name': objitem.name or '-',
'frequency': format_frequency(param.frequency) if param else '-',
'freq_range': format_frequency(param.freq_range) if param else '-',
'bod_velocity': format_symbol_rate(param.bod_velocity) if param else '-',
'modulation': param.modulation.name if param and param.modulation else '-',
'snr': f"{param.snr:.0f}" if param and param.snr else '-',
'timestamp': timestamp_str,
'mirrors': mirrors,
'location': geo.location or '-',
'coordinates': format_coords_display(geo.coords),
'coord_tuple': coord,
'is_outlier': False,
'distance_from_avg': 0,
})
# Apply clustering algorithm
avg_coord, valid_indices = self._find_cluster_center(points_data)
# Mark outliers and calculate distances
outliers = []
valid_points = []
for i, point_data in enumerate(points_data):
coord = point_data['coord_tuple']
_, distance = calculate_mean_coords(avg_coord, coord)
point_data['distance_from_avg'] = round(distance, 2)
if i in valid_indices:
point_data['is_outlier'] = False
valid_points.append(point_data)
else:
point_data['is_outlier'] = True
outliers.append(point_data)
# Format average coordinates
avg_lat = avg_coord[1]
avg_lon = avg_coord[0]
lat_str = f"{abs(avg_lat):.4f}N" if avg_lat >= 0 else f"{abs(avg_lat):.4f}S"
lon_str = f"{abs(avg_lon):.4f}E" if avg_lon >= 0 else f"{abs(avg_lon):.4f}W"
avg_coords_str = f"{lat_str} {lon_str}"
# Get common parameters from first valid point (or first point if no valid)
first_point = valid_points[0] if valid_points else (points_data[0] if points_data else {})
return {
'source_name': source_name,
'interval_key': interval_key,
'interval_label': interval_label,
'total_points': len(points_data),
'valid_points_count': len(valid_points),
'outliers_count': len(outliers),
'has_outliers': len(outliers) > 0,
'avg_coordinates': avg_coords_str,
'avg_coord_tuple': avg_coord,
'frequency': first_point.get('frequency', '-'),
'freq_range': first_point.get('freq_range', '-'),
'bod_velocity': first_point.get('bod_velocity', '-'),
'modulation': first_point.get('modulation', '-'),
'snr': first_point.get('snr', '-'),
'mirrors': first_point.get('mirrors', '-'),
'points': points_data,
'outliers': outliers,
'valid_points': valid_points,
}
def _find_cluster_center(self, points_data):
"""
Find cluster center using the following algorithm:
1. Find first pair of points within 56 km of each other
2. Calculate their average as initial center
3. Iteratively add points within 56 km of current average
4. Return final average and indices of valid points
If only 1 point, return it as center.
If no pair found within 56 km, use first point as center.
Returns:
tuple: (avg_coord, set of valid point indices)
"""
if len(points_data) == 0:
return (0, 0), set()
if len(points_data) == 1:
return points_data[0]['coord_tuple'], {0}
# Step 1: Find first pair of points within 56 km
initial_pair = None
for i in range(len(points_data)):
for j in range(i + 1, len(points_data)):
coord_i = points_data[i]['coord_tuple']
coord_j = points_data[j]['coord_tuple']
_, distance = calculate_mean_coords(coord_i, coord_j)
if distance <= RANGE_DISTANCE:
initial_pair = (i, j)
break
if initial_pair:
break
# If no pair found within 56 km, use first point as center
if not initial_pair:
# All points are outliers except the first one
return points_data[0]['coord_tuple'], {0}
# Step 2: Calculate initial average from the pair
i, j = initial_pair
coord_i = points_data[i]['coord_tuple']
coord_j = points_data[j]['coord_tuple']
avg_coord, _ = calculate_mean_coords(coord_i, coord_j)
valid_indices = {i, j}
# Step 3: Iteratively add points within 56 km of current average
# Keep iterating until no new points are added
changed = True
while changed:
changed = False
for k in range(len(points_data)):
if k in valid_indices:
continue
coord_k = points_data[k]['coord_tuple']
_, distance = calculate_mean_coords(avg_coord, coord_k)
if distance <= RANGE_DISTANCE:
# Add point to cluster and recalculate average
valid_indices.add(k)
# Recalculate average with all valid points
avg_coord = self._calculate_average_from_indices(points_data, valid_indices)
changed = True
return avg_coord, valid_indices
def _calculate_average_from_indices(self, points_data, indices):
"""
Calculate average coordinate from points at given indices.
Uses incremental averaging.
"""
indices_list = sorted(indices)
if not indices_list:
return (0, 0)
avg_coord = points_data[indices_list[0]]['coord_tuple']
for idx in indices_list[1:]:
coord = points_data[idx]['coord_tuple']
avg_coord, _ = calculate_mean_coords(avg_coord, coord)
return avg_coord
class RecalculateGroupAPIView(LoginRequiredMixin, View):
"""
API endpoint for recalculating a group after removing outliers or including all points.
"""
def post(self, request):
import json
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
points = data.get('points', [])
include_all = data.get('include_all', False)
if not points:
return JsonResponse({'error': 'No points provided'}, status=400)
# If include_all is True, recalculate with all points using clustering algorithm
# If include_all is False, use only non-outlier points
if not include_all:
points = [p for p in points if not p.get('is_outlier', False)]
if not points:
return JsonResponse({'error': 'No valid points after filtering'}, status=400)
# Apply clustering algorithm
avg_coord, valid_indices = self._find_cluster_center(points)
# Mark outliers and calculate distances
for i, point in enumerate(points):
coord = tuple(point['coord_tuple'])
_, distance = calculate_mean_coords(avg_coord, coord)
point['distance_from_avg'] = round(distance, 2)
point['is_outlier'] = i not in valid_indices
# Format average coordinates
avg_lat = avg_coord[1]
avg_lon = avg_coord[0]
lat_str = f"{abs(avg_lat):.4f}N" if avg_lat >= 0 else f"{abs(avg_lat):.4f}S"
lon_str = f"{abs(avg_lon):.4f}E" if avg_lon >= 0 else f"{abs(avg_lon):.4f}W"
avg_coords_str = f"{lat_str} {lon_str}"
outliers = [p for p in points if p.get('is_outlier', False)]
valid_points = [p for p in points if not p.get('is_outlier', False)]
return JsonResponse({
'success': True,
'avg_coordinates': avg_coords_str,
'avg_coord_tuple': avg_coord,
'total_points': len(points),
'valid_points_count': len(valid_points),
'outliers_count': len(outliers),
'has_outliers': len(outliers) > 0,
'points': points,
})
def _find_cluster_center(self, points):
"""
Find cluster center using the following algorithm:
1. Find first pair of points within 56 km of each other
2. Calculate their average as initial center
3. Iteratively add points within 56 km of current average
4. Return final average and indices of valid points
"""
if len(points) == 0:
return (0, 0), set()
if len(points) == 1:
return tuple(points[0]['coord_tuple']), {0}
# Step 1: Find first pair of points within 56 km
initial_pair = None
for i in range(len(points)):
for j in range(i + 1, len(points)):
coord_i = tuple(points[i]['coord_tuple'])
coord_j = tuple(points[j]['coord_tuple'])
_, distance = calculate_mean_coords(coord_i, coord_j)
if distance <= RANGE_DISTANCE:
initial_pair = (i, j)
break
if initial_pair:
break
# If no pair found within 56 km, use first point as center
if not initial_pair:
return tuple(points[0]['coord_tuple']), {0}
# Step 2: Calculate initial average from the pair
i, j = initial_pair
coord_i = tuple(points[i]['coord_tuple'])
coord_j = tuple(points[j]['coord_tuple'])
avg_coord, _ = calculate_mean_coords(coord_i, coord_j)
valid_indices = {i, j}
# Step 3: Iteratively add points within 56 km of current average
changed = True
while changed:
changed = False
for k in range(len(points)):
if k in valid_indices:
continue
coord_k = tuple(points[k]['coord_tuple'])
_, distance = calculate_mean_coords(avg_coord, coord_k)
if distance <= RANGE_DISTANCE:
valid_indices.add(k)
avg_coord = self._calculate_average_from_indices(points, valid_indices)
changed = True
return avg_coord, valid_indices
def _calculate_average_from_indices(self, points, indices):
"""Calculate average coordinate from points at given indices."""
indices_list = sorted(indices)
if not indices_list:
return (0, 0)
avg_coord = tuple(points[indices_list[0]]['coord_tuple'])
for idx in indices_list[1:]:
coord = tuple(points[idx]['coord_tuple'])
avg_coord, _ = calculate_mean_coords(avg_coord, coord)
return avg_coord