Compare commits

...

2 Commits

8 changed files with 906 additions and 608 deletions

3
.gitignore vendored
View File

@@ -33,4 +33,5 @@ tiles
# Docker # Docker
# docker-* # docker-*
maplibre-gl-js-5.10.0.zip maplibre-gl-js-5.10.0.zip
cert.pem cert.pem
templ.json

View File

@@ -1,422 +1,430 @@
from django.test import TestCase, RequestFactory from django.test import TestCase, RequestFactory
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.contrib.gis.geos import Point from django.contrib.gis.geos import Point
from .models import CustomUser, Geo, ObjItem from .models import CustomUser, Geo, ObjItem
from .utils import format_coordinates, parse_pagination_params from .utils import format_coordinates, parse_pagination_params
from .mixins import RoleRequiredMixin, CoordinateProcessingMixin from .mixins import RoleRequiredMixin, CoordinateProcessingMixin
from django.views import View from django.views import View
class FormatCoordinatesTestCase(TestCase): class FormatCoordinatesTestCase(TestCase):
"""Тесты для функции format_coordinates""" """Тесты для функции format_coordinates"""
def test_format_positive_coordinates(self): def test_format_positive_coordinates(self):
"""Тест форматирования положительных координат""" """Тест форматирования положительных координат"""
result = format_coordinates(37.62, 55.75) result = format_coordinates(37.62, 55.75)
self.assertEqual(result, "55.75N 37.62E") self.assertEqual(result, "55.75N 37.62E")
def test_format_negative_longitude(self): def test_format_negative_longitude(self):
"""Тест форматирования с отрицательной долготой""" """Тест форматирования с отрицательной долготой"""
result = format_coordinates(-122.42, 37.77) result = format_coordinates(-122.42, 37.77)
self.assertEqual(result, "37.77N 122.42W") self.assertEqual(result, "37.77N 122.42W")
def test_format_negative_latitude(self): def test_format_negative_latitude(self):
"""Тест форматирования с отрицательной широтой""" """Тест форматирования с отрицательной широтой"""
result = format_coordinates(151.21, -33.87) result = format_coordinates(151.21, -33.87)
self.assertEqual(result, "33.87S 151.21E") self.assertEqual(result, "33.87S 151.21E")
def test_format_both_negative(self): def test_format_both_negative(self):
"""Тест форматирования с обеими отрицательными координатами""" """Тест форматирования с обеими отрицательными координатами"""
result = format_coordinates(-58.38, -34.60) result = format_coordinates(-58.38, -34.60)
self.assertEqual(result, "34.6S 58.38W") self.assertEqual(result, "34.6S 58.38W")
class ParsePaginationParamsTestCase(TestCase): class ParsePaginationParamsTestCase(TestCase):
"""Тесты для функции parse_pagination_params""" """Тесты для функции parse_pagination_params"""
def setUp(self): def setUp(self):
self.factory = RequestFactory() self.factory = RequestFactory()
def test_default_values(self): def test_default_values(self):
"""Тест значений по умолчанию""" """Тест значений по умолчанию"""
request = self.factory.get("/") request = self.factory.get("/")
page, per_page = parse_pagination_params(request) page, per_page = parse_pagination_params(request)
self.assertEqual(page, 1) self.assertEqual(page, 1)
self.assertEqual(per_page, 50) self.assertEqual(per_page, 50)
def test_custom_values(self): def test_custom_values(self):
"""Тест пользовательских значений""" """Тест пользовательских значений"""
request = self.factory.get("/?page=3&items_per_page=100") request = self.factory.get("/?page=3&items_per_page=100")
page, per_page = parse_pagination_params(request) page, per_page = parse_pagination_params(request)
self.assertEqual(page, 3) self.assertEqual(page, 3)
self.assertEqual(per_page, 100) self.assertEqual(per_page, 100)
def test_invalid_page_number(self): def test_invalid_page_number(self):
"""Тест невалидного номера страницы""" """Тест невалидного номера страницы"""
request = self.factory.get("/?page=invalid") request = self.factory.get("/?page=invalid")
page, per_page = parse_pagination_params(request) page, per_page = parse_pagination_params(request)
self.assertEqual(page, 1) self.assertEqual(page, 1)
def test_negative_page_number(self): def test_negative_page_number(self):
"""Тест отрицательного номера страницы""" """Тест отрицательного номера страницы"""
request = self.factory.get("/?page=-5") request = self.factory.get("/?page=-5")
page, per_page = parse_pagination_params(request) page, per_page = parse_pagination_params(request)
self.assertEqual(page, 1) self.assertEqual(page, 1)
def test_max_items_per_page_limit(self): def test_max_items_per_page_limit(self):
"""Тест ограничения максимального количества элементов""" """Тест ограничения максимального количества элементов"""
request = self.factory.get("/?items_per_page=20000") request = self.factory.get("/?items_per_page=20000")
page, per_page = parse_pagination_params(request) page, per_page = parse_pagination_params(request)
self.assertEqual(per_page, 10000) self.assertEqual(per_page, 10000)
class RoleRequiredMixinTestCase(TestCase): class RoleRequiredMixinTestCase(TestCase):
"""Тесты для RoleRequiredMixin""" """Тесты для RoleRequiredMixin"""
def setUp(self): def setUp(self):
self.factory = RequestFactory() self.factory = RequestFactory()
def test_admin_has_access(self): def test_admin_has_access(self):
"""Тест что администратор имеет доступ""" """Тест что администратор имеет доступ"""
user = User.objects.create_user(username="testuser", password="12345") user = User.objects.create_user(username="testuser", password="12345")
# Get the automatically created CustomUser and set role to 'admin' # Get the automatically created CustomUser and set role to 'admin'
custom_user = CustomUser.objects.get(user=user) custom_user = CustomUser.objects.get(user=user)
custom_user.role = "admin" custom_user.role = "admin"
custom_user.save() custom_user.save()
# Refresh user to get updated customuser # Refresh user to get updated customuser
user.refresh_from_db() user.refresh_from_db()
class TestView(RoleRequiredMixin, View): class TestView(RoleRequiredMixin, View):
required_roles = ["admin", "moderator"] required_roles = ["admin", "moderator"]
view = TestView() view = TestView()
request = self.factory.get("/") request = self.factory.get("/")
request.user = user request.user = user
view.request = request view.request = request
self.assertTrue(view.test_func()) self.assertTrue(view.test_func())
def test_user_without_role_denied(self): def test_user_without_role_denied(self):
"""Тест что пользователь без роли не имеет доступа""" """Тест что пользователь без роли не имеет доступа"""
user_no_role = User.objects.create_user(username="norole", password="12345") user_no_role = User.objects.create_user(username="norole", password="12345")
# Get the automatically created CustomUser - default role is 'user' # Get the automatically created CustomUser - default role is 'user'
custom_user_no_role = CustomUser.objects.get(user=user_no_role) custom_user_no_role = CustomUser.objects.get(user=user_no_role)
self.assertEqual(custom_user_no_role.role, "user") self.assertEqual(custom_user_no_role.role, "user")
class TestView(RoleRequiredMixin, View): class TestView(RoleRequiredMixin, View):
required_roles = ["admin", "moderator"] required_roles = ["admin", "moderator"]
view = TestView() view = TestView()
request = self.factory.get("/") request = self.factory.get("/")
request.user = user_no_role request.user = user_no_role
view.request = request view.request = request
self.assertFalse(view.test_func()) self.assertFalse(view.test_func())
class CoordinateProcessingMixinTestCase(TestCase): class CoordinateProcessingMixinTestCase(TestCase):
"""Тесты для CoordinateProcessingMixin""" """Тесты для CoordinateProcessingMixin"""
def setUp(self): def setUp(self):
self.factory = RequestFactory() self.factory = RequestFactory()
def test_extract_geo_coordinates(self): def test_extract_geo_coordinates(self):
"""Тест извлечения координат геолокации""" """Тест извлечения координат геолокации"""
class TestView(CoordinateProcessingMixin, View): class TestView(CoordinateProcessingMixin, View):
pass pass
view = TestView() view = TestView()
request = self.factory.post( request = self.factory.post(
"/", {"geo_longitude": "37.62", "geo_latitude": "55.75"} "/", {"geo_longitude": "37.62", "geo_latitude": "55.75"}
) )
view.request = request view.request = request
coords = view._extract_coordinates("geo") coords = view._extract_coordinates("geo")
self.assertIsNotNone(coords) self.assertIsNotNone(coords)
self.assertEqual(coords, (37.62, 55.75)) self.assertEqual(coords, (37.62, 55.75))
def test_extract_invalid_coordinates(self): def test_extract_invalid_coordinates(self):
"""Тест извлечения невалидных координат""" """Тест извлечения невалидных координат"""
class TestView(CoordinateProcessingMixin, View): class TestView(CoordinateProcessingMixin, View):
pass pass
view = TestView() view = TestView()
request = self.factory.post( request = self.factory.post(
"/", {"geo_longitude": "invalid", "geo_latitude": "55.75"} "/", {"geo_longitude": "invalid", "geo_latitude": "55.75"}
) )
view.request = request view.request = request
coords = view._extract_coordinates("geo") coords = view._extract_coordinates("geo")
self.assertIsNone(coords) self.assertIsNone(coords)
def test_process_coordinates(self): def test_process_coordinates(self):
"""Тест обработки координат и применения к объекту Geo""" """Тест обработки координат и применения к объекту Geo"""
class TestView(CoordinateProcessingMixin, View): class TestView(CoordinateProcessingMixin, View):
pass pass
view = TestView() view = TestView()
request = self.factory.post( request = self.factory.post(
"/", "/",
{ {
"geo_longitude": "37.62", "geo_longitude": "37.62",
"geo_latitude": "55.75", "geo_latitude": "55.75",
}, },
) )
view.request = request view.request = request
geo_instance = Geo() geo_instance = Geo()
view.process_coordinates(geo_instance) view.process_coordinates(geo_instance)
self.assertIsNotNone(geo_instance.coords) self.assertIsNotNone(geo_instance.coords)
self.assertEqual(geo_instance.coords.coords, (37.62, 55.75)) self.assertEqual(geo_instance.coords.coords, (37.62, 55.75))
class CSVImportTestCase(TestCase): class CSVImportTestCase(TestCase):
"""Тесты для функции get_points_from_csv""" """Тесты для функции get_points_from_csv"""
def setUp(self): def setUp(self):
"""Подготовка тестовых данных""" """Подготовка тестовых данных"""
from .models import CustomUser, Satellite, Polarization from .models import CustomUser, Satellite, Polarization
from django.contrib.auth.models import User from django.contrib.auth.models import User
# Создаем пользователя # Создаем пользователя
user = User.objects.create_user(username="testuser", password="12345") user = User.objects.create_user(username="testuser", password="12345")
self.custom_user = CustomUser.objects.get(user=user) self.custom_user = CustomUser.objects.get(user=user)
# Создаем спутник # Создаем спутник
self.satellite = Satellite.objects.create(name="Test Satellite", norad=12345) self.satellite = Satellite.objects.create(name="Test Satellite", norad=12345)
# Создаем поляризации # Создаем поляризации
Polarization.objects.get_or_create(name="Вертикальная") Polarization.objects.get_or_create(name="Вертикальная")
Polarization.objects.get_or_create(name="Горизонтальная") Polarization.objects.get_or_create(name="Горизонтальная")
Polarization.objects.get_or_create(name="Правая") Polarization.objects.get_or_create(name="Правая")
Polarization.objects.get_or_create(name="Левая") Polarization.objects.get_or_create(name="Левая")
Polarization.objects.get_or_create(name="-") Polarization.objects.get_or_create(name="-")
# Создаем спутники-зеркала для тестов # Создаем спутники-зеркала для тестов
Satellite.objects.get_or_create(name="Mirror1 Satellite", norad=11111) Satellite.objects.get_or_create(name="Mirror1 Satellite", norad=11111)
Satellite.objects.get_or_create(name="Mirror2 Satellite", norad=22222) Satellite.objects.get_or_create(name="Mirror2 Satellite", norad=22222)
Satellite.objects.get_or_create(name="Mirror3 Satellite", norad=33333) Satellite.objects.get_or_create(name="Mirror3 Satellite", norad=33333)
def test_initial_csv_import(self): def test_initial_csv_import(self):
"""Тест первичного импорта из CSV файла""" """Тест первичного импорта из CSV файла"""
from .utils import get_points_from_csv from .utils import get_points_from_csv
from .models import Source, ObjItem from .models import Source, ObjItem
# Тестовые данные CSV - 3 точки в разных местах # Тестовые данные CSV - 3 точки в разных местах
csv_content = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2;Mirror3 csv_content = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2;Mirror3
2;Signal2 H;55.7560;37.6175;0;01.01.2024 12:05:00;Test Satellite;12345;11520.3;36.0;1;good;Mirror1;Mirror2; 2;Signal2 H;55.7560;37.6175;0;01.01.2024 12:05:00;Test Satellite;12345;11520.3;36.0;1;good;Mirror1;Mirror2;
3;Signal3 V;56.8389;60.6057;0;01.01.2024 12:10:00;Test Satellite;12345;11540.7;36.0;1;good;Mirror1;Mirror2;""" 3;Signal3 V;56.8389;60.6057;0;01.01.2024 12:10:00;Test Satellite;12345;11540.7;36.0;1;good;Mirror1;Mirror2;"""
# Выполняем импорт # Выполняем импорт
sources_created = get_points_from_csv(csv_content, self.custom_user) result = get_points_from_csv(csv_content, self.custom_user)
# Проверяем результаты # Проверяем результаты
# Первые две точки близко (Москва), третья далеко (Екатеринбург) # Первые две точки близко (Москва), третья далеко (Екатеринбург)
# Должно быть создано 2 источника # Должно быть создано 2 источника
self.assertEqual(sources_created, 2) self.assertEqual(result['new_sources'], 2)
self.assertEqual(Source.objects.count(), 2) self.assertEqual(result['added'], 3)
self.assertEqual(ObjItem.objects.count(), 3) self.assertEqual(result['skipped'], 0)
self.assertEqual(len(result['errors']), 0)
# Проверяем, что первые две точки привязаны к одному источнику self.assertEqual(Source.objects.count(), 2)
source1 = Source.objects.first() self.assertEqual(ObjItem.objects.count(), 3)
items_in_source1 = ObjItem.objects.filter(source=source1).count()
self.assertEqual(items_in_source1, 2) # Проверяем, что первые две точки привязаны к одному источнику
source1 = Source.objects.first()
def test_csv_import_with_existing_sources(self): items_in_source1 = ObjItem.objects.filter(source=source1).count()
"""Тест импорта CSV с существующими источниками""" self.assertEqual(items_in_source1, 2)
from .utils import get_points_from_csv
from .models import Source, ObjItem def test_csv_import_with_existing_sources(self):
"""Тест импорта CSV с существующими источниками"""
# Первый импорт - создаем начальные данные from .utils import get_points_from_csv
csv_content_1 = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2; from .models import Source, ObjItem
2;Signal2 H;55.7560;37.6175;0;01.01.2024 12:05:00;Test Satellite;12345;11520.3;36.0;1;good;Mirror1;Mirror2;"""
# Первый импорт - создаем начальные данные
sources_created_1 = get_points_from_csv(csv_content_1, self.custom_user) csv_content_1 = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2;
self.assertEqual(sources_created_1, 1) 2;Signal2 H;55.7560;37.6175;0;01.01.2024 12:05:00;Test Satellite;12345;11520.3;36.0;1;good;Mirror1;Mirror2;"""
initial_sources_count = Source.objects.count()
initial_objitems_count = ObjItem.objects.count() result_1 = get_points_from_csv(csv_content_1, self.custom_user)
self.assertEqual(result_1['new_sources'], 1)
# Второй импорт - добавляем новые точки initial_sources_count = Source.objects.count()
# Точка 3 - близко к существующему источнику (Москва) initial_objitems_count = ObjItem.objects.count()
# Точка 4 - далеко (Екатеринбург) - создаст новый источник
csv_content_2 = """3;Signal3 V;55.7562;37.6177;0;01.01.2024 12:10:00;Test Satellite;12345;11540.7;36.0;1;good;Mirror1;Mirror2; # Второй импорт - добавляем новые точки
4;Signal4 H;56.8389;60.6057;0;01.01.2024 12:15:00;Test Satellite;12345;11560.2;36.0;1;good;Mirror1;Mirror2;""" # Точка 3 - близко к существующему источнику (Москва)
# Точка 4 - далеко (Екатеринбург) - создаст новый источник
sources_created_2 = get_points_from_csv(csv_content_2, self.custom_user) csv_content_2 = """3;Signal3 V;55.7562;37.6177;0;01.01.2024 12:10:00;Test Satellite;12345;11540.7;36.0;1;good;Mirror1;Mirror2;
4;Signal4 H;56.8389;60.6057;0;01.01.2024 12:15:00;Test Satellite;12345;11560.2;36.0;1;good;Mirror1;Mirror2;"""
# Проверяем результаты
# Должен быть создан 1 новый источник (для точки 4) result_2 = get_points_from_csv(csv_content_2, self.custom_user)
self.assertEqual(sources_created_2, 1)
self.assertEqual(Source.objects.count(), initial_sources_count + 1) # Проверяем результаты
self.assertEqual(ObjItem.objects.count(), initial_objitems_count + 2) # Должен быть создан 1 новый источник (для точки 4)
self.assertEqual(result_2['new_sources'], 1)
# Проверяем, что точка 3 добавлена к существующему источнику self.assertEqual(result_2['added'], 2)
first_source = Source.objects.first() self.assertEqual(Source.objects.count(), initial_sources_count + 1)
items_in_first_source = ObjItem.objects.filter(source=first_source).count() self.assertEqual(ObjItem.objects.count(), initial_objitems_count + 2)
self.assertEqual(items_in_first_source, 3) # 2 начальных + 1 новая
# Проверяем, что точка 3 добавлена к существующему источнику
def test_csv_import_skip_duplicates(self): first_source = Source.objects.first()
"""Тест пропуска дубликатов при импорте CSV""" items_in_first_source = ObjItem.objects.filter(source=first_source).count()
from .utils import get_points_from_csv self.assertEqual(items_in_first_source, 3) # 2 начальных + 1 новая
from .models import Source, ObjItem
def test_csv_import_skip_duplicates(self):
# Первый импорт """Тест пропуска дубликатов при импорте CSV"""
csv_content_1 = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2;""" from .utils import get_points_from_csv
from .models import Source, ObjItem
get_points_from_csv(csv_content_1, self.custom_user)
initial_sources_count = Source.objects.count() # Первый импорт
initial_objitems_count = ObjItem.objects.count() csv_content_1 = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2;"""
# Второй импорт - та же точка (дубликат) get_points_from_csv(csv_content_1, self.custom_user)
csv_content_2 = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2;""" initial_sources_count = Source.objects.count()
initial_objitems_count = ObjItem.objects.count()
sources_created = get_points_from_csv(csv_content_2, self.custom_user)
# Второй импорт - та же точка (дубликат)
# Проверяем, что дубликат пропущен csv_content_2 = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2;"""
self.assertEqual(sources_created, 0)
self.assertEqual(Source.objects.count(), initial_sources_count) result = get_points_from_csv(csv_content_2, self.custom_user)
self.assertEqual(ObjItem.objects.count(), initial_objitems_count)
# Проверяем, что дубликат пропущен
def test_csv_import_mixed_scenario(self): self.assertEqual(result['new_sources'], 0)
"""Тест смешанного сценария: дубликаты + новые точки + близкие точки""" self.assertEqual(result['added'], 0)
from .utils import get_points_from_csv self.assertEqual(result['skipped'], 1)
from .models import Source, ObjItem self.assertEqual(Source.objects.count(), initial_sources_count)
self.assertEqual(ObjItem.objects.count(), initial_objitems_count)
# Первый импорт - 2 точки в Москве
csv_content_1 = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2; def test_csv_import_mixed_scenario(self):
2;Signal2 H;55.7560;37.6175;0;01.01.2024 12:05:00;Test Satellite;12345;11520.3;36.0;1;good;Mirror1;Mirror2;""" """Тест смешанного сценария: дубликаты + новые точки + близкие точки"""
from .utils import get_points_from_csv
get_points_from_csv(csv_content_1, self.custom_user) from .models import Source, ObjItem
# Второй импорт: # Первый импорт - 2 точки в Москве
# - Точка 1 (дубликат) - должна быть пропущена csv_content_1 = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2;
# - Точка 3 (близко к Москве) - должна добавиться к существующему источнику 2;Signal2 H;55.7560;37.6175;0;01.01.2024 12:05:00;Test Satellite;12345;11520.3;36.0;1;good;Mirror1;Mirror2;"""
# - Точка 4 (Екатеринбург) - должна создать новый источник
# - Точка 5 (близко к Екатеринбургу) - должна добавиться к новому источнику get_points_from_csv(csv_content_1, self.custom_user)
csv_content_2 = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2;
3;Signal3 V;55.7562;37.6177;0;01.01.2024 12:10:00;Test Satellite;12345;11540.7;36.0;1;good;Mirror1;Mirror2; # Второй импорт:
4;Signal4 H;56.8389;60.6057;0;01.01.2024 12:15:00;Test Satellite;12345;11560.2;36.0;1;good;Mirror1;Mirror2; # - Точка 1 (дубликат) - должна быть пропущена
5;Signal5 V;56.8391;60.6059;0;01.01.2024 12:20:00;Test Satellite;12345;11580.8;36.0;1;good;Mirror1;Mirror2;""" # - Точка 3 (близко к Москве) - должна добавиться к существующему источнику
# - Точка 4 (Екатеринбург) - должна создать новый источник
sources_created = get_points_from_csv(csv_content_2, self.custom_user) # - Точка 5 (близко к Екатеринбургу) - должна добавиться к новому источнику
csv_content_2 = """1;Signal1 V;55.7558;37.6173;0;01.01.2024 12:00:00;Test Satellite;12345;11500.5;36.0;1;good;Mirror1;Mirror2;
# Проверяем результаты 3;Signal3 V;55.7562;37.6177;0;01.01.2024 12:10:00;Test Satellite;12345;11540.7;36.0;1;good;Mirror1;Mirror2;
self.assertEqual(sources_created, 1) # Только для Екатеринбурга 4;Signal4 H;56.8389;60.6057;0;01.01.2024 12:15:00;Test Satellite;12345;11560.2;36.0;1;good;Mirror1;Mirror2;
self.assertEqual(Source.objects.count(), 2) # Москва + Екатеринбург 5;Signal5 V;56.8391;60.6059;0;01.01.2024 12:20:00;Test Satellite;12345;11580.8;36.0;1;good;Mirror1;Mirror2;"""
self.assertEqual(ObjItem.objects.count(), 5) # 2 начальных + 3 новых (дубликат пропущен)
result = get_points_from_csv(csv_content_2, self.custom_user)
# Проверяем распределение по источникам
moscow_source = Source.objects.first() # Проверяем результаты
ekb_source = Source.objects.last() self.assertEqual(result['new_sources'], 1) # Только для Екатеринбурга
self.assertEqual(result['added'], 3) # Точки 3, 4, 5
moscow_items = ObjItem.objects.filter(source=moscow_source).count() self.assertEqual(result['skipped'], 1) # Точка 1 (дубликат)
ekb_items = ObjItem.objects.filter(source=ekb_source).count() self.assertEqual(Source.objects.count(), 2) # Москва + Екатеринбург
self.assertEqual(ObjItem.objects.count(), 5) # 2 начальных + 3 новых (дубликат пропущен)
self.assertEqual(moscow_items, 3) # 2 начальных + 1 новая
self.assertEqual(ekb_items, 2) # 2 новых точки в Екатеринбурге # Проверяем распределение по источникам
moscow_source = Source.objects.first()
ekb_source = Source.objects.last()
class FindMirrorSatellitesTestCase(TestCase): moscow_items = ObjItem.objects.filter(source=moscow_source).count()
"""Тесты для функции find_mirror_satellites""" ekb_items = ObjItem.objects.filter(source=ekb_source).count()
def setUp(self): self.assertEqual(moscow_items, 3) # 2 начальных + 1 новая
"""Подготовка тестовых данных""" self.assertEqual(ekb_items, 2) # 2 новых точки в Екатеринбурге
from .models import Satellite
# Создаем спутники с разными именами
Satellite.objects.create(name="Eutelsat 16A", norad=40874) class FindMirrorSatellitesTestCase(TestCase):
Satellite.objects.create(name="Eutelsat 21B", norad=41591) """Тесты для функции find_mirror_satellites"""
Satellite.objects.create(name="Astra 4A", norad=41404)
Satellite.objects.create(name="Turksat 4A", norad=40361) def setUp(self):
Satellite.objects.create(name="Express AM6", norad=39508) """Подготовка тестовых данных"""
from .models import Satellite
def test_find_exact_match(self):
"""Тест поиска спутника по точному совпадению""" # Создаем спутники с разными именами
from .utils import find_mirror_satellites Satellite.objects.create(name="Eutelsat 16A", norad=40874)
Satellite.objects.create(name="Eutelsat 21B", norad=41591)
mirrors = find_mirror_satellites(["Eutelsat 16A"]) Satellite.objects.create(name="Astra 4A", norad=41404)
self.assertEqual(len(mirrors), 1) Satellite.objects.create(name="Turksat 4A", norad=40361)
self.assertEqual(mirrors[0].name, "Eutelsat 16A") Satellite.objects.create(name="Express AM6", norad=39508)
def test_find_partial_match(self): def test_find_exact_match(self):
"""Тест поиска спутника по частичному совпадению""" """Тест поиска спутника по точному совпадению"""
from .utils import find_mirror_satellites from .utils import find_mirror_satellites
# Ищем по части имени "Eutelsat" mirrors = find_mirror_satellites(["Eutelsat 16A"])
mirrors = find_mirror_satellites(["eutelsat"]) self.assertEqual(len(mirrors), 1)
self.assertEqual(len(mirrors), 2) self.assertEqual(mirrors[0].name, "Eutelsat 16A")
names = [m.name for m in mirrors]
self.assertIn("Eutelsat 16A", names) def test_find_partial_match(self):
self.assertIn("Eutelsat 21B", names) """Тест поиска спутника по частичному совпадению"""
from .utils import find_mirror_satellites
def test_find_case_insensitive(self):
"""Тест поиска без учета регистра""" # Ищем по части имени "Eutelsat"
from .utils import find_mirror_satellites mirrors = find_mirror_satellites(["eutelsat"])
self.assertEqual(len(mirrors), 2)
# Разные варианты регистра names = [m.name for m in mirrors]
mirrors1 = find_mirror_satellites(["ASTRA"]) self.assertIn("Eutelsat 16A", names)
mirrors2 = find_mirror_satellites(["astra"]) self.assertIn("Eutelsat 21B", names)
mirrors3 = find_mirror_satellites(["AsTrA"])
def test_find_case_insensitive(self):
self.assertEqual(len(mirrors1), 1) """Тест поиска без учета регистра"""
self.assertEqual(len(mirrors2), 1) from .utils import find_mirror_satellites
self.assertEqual(len(mirrors3), 1)
self.assertEqual(mirrors1[0].name, "Astra 4A") # Разные варианты регистра
self.assertEqual(mirrors2[0].name, "Astra 4A") mirrors1 = find_mirror_satellites(["ASTRA"])
self.assertEqual(mirrors3[0].name, "Astra 4A") mirrors2 = find_mirror_satellites(["astra"])
mirrors3 = find_mirror_satellites(["AsTrA"])
def test_find_multiple_mirrors(self):
"""Тест поиска нескольких зеркал""" self.assertEqual(len(mirrors1), 1)
from .utils import find_mirror_satellites self.assertEqual(len(mirrors2), 1)
self.assertEqual(len(mirrors3), 1)
mirrors = find_mirror_satellites(["Eutelsat", "Turksat"]) self.assertEqual(mirrors1[0].name, "Astra 4A")
self.assertEqual(len(mirrors), 3) # 2 Eutelsat + 1 Turksat self.assertEqual(mirrors2[0].name, "Astra 4A")
names = [m.name for m in mirrors] self.assertEqual(mirrors3[0].name, "Astra 4A")
self.assertIn("Eutelsat 16A", names)
self.assertIn("Eutelsat 21B", names) def test_find_multiple_mirrors(self):
self.assertIn("Turksat 4A", names) """Тест поиска нескольких зеркал"""
from .utils import find_mirror_satellites
def test_find_with_spaces(self):
"""Тест поиска с пробелами в начале и конце""" mirrors = find_mirror_satellites(["Eutelsat", "Turksat"])
from .utils import find_mirror_satellites self.assertEqual(len(mirrors), 3) # 2 Eutelsat + 1 Turksat
names = [m.name for m in mirrors]
mirrors = find_mirror_satellites([" Express "]) self.assertIn("Eutelsat 16A", names)
self.assertEqual(len(mirrors), 1) self.assertIn("Eutelsat 21B", names)
self.assertEqual(mirrors[0].name, "Express AM6") self.assertIn("Turksat 4A", names)
def test_find_empty_list(self): def test_find_with_spaces(self):
"""Тест с пустым списком""" """Тест поиска с пробелами в начале и конце"""
from .utils import find_mirror_satellites from .utils import find_mirror_satellites
mirrors = find_mirror_satellites([]) mirrors = find_mirror_satellites([" Express "])
self.assertEqual(len(mirrors), 0) self.assertEqual(len(mirrors), 1)
self.assertEqual(mirrors[0].name, "Express AM6")
def test_find_with_dash(self):
"""Тест с дефисом (должен быть пропущен)""" def test_find_empty_list(self):
from .utils import find_mirror_satellites """Тест с пустым списком"""
from .utils import find_mirror_satellites
mirrors = find_mirror_satellites(["-"])
self.assertEqual(len(mirrors), 0) mirrors = find_mirror_satellites([])
self.assertEqual(len(mirrors), 0)
def test_find_no_match(self):
"""Тест когда спутник не найден""" def test_find_with_dash(self):
from .utils import find_mirror_satellites """Тест с дефисом (должен быть пропущен)"""
from .utils import find_mirror_satellites
mirrors = find_mirror_satellites(["NonExistentSatellite"])
self.assertEqual(len(mirrors), 0) mirrors = find_mirror_satellites(["-"])
self.assertEqual(len(mirrors), 0)
def test_find_removes_duplicates(self):
"""Тест удаления дубликатов""" def test_find_no_match(self):
from .utils import find_mirror_satellites """Тест когда спутник не найден"""
from .utils import find_mirror_satellites
# Ищем один и тот же спутник дважды
mirrors = find_mirror_satellites(["Astra", "Astra 4A"]) mirrors = find_mirror_satellites(["NonExistentSatellite"])
self.assertEqual(len(mirrors), 1) self.assertEqual(len(mirrors), 0)
self.assertEqual(mirrors[0].name, "Astra 4A")
def test_find_removes_duplicates(self):
"""Тест удаления дубликатов"""
from .utils import find_mirror_satellites
# Ищем один и тот же спутник дважды
mirrors = find_mirror_satellites(["Astra", "Astra 4A"])
self.assertEqual(len(mirrors), 1)
self.assertEqual(mirrors[0].name, "Astra 4A")

View File

@@ -122,6 +122,72 @@ MINIMUM_BANDWIDTH_MHZ = 0.08
RANGE_DISTANCE = 56 RANGE_DISTANCE = 56
# ============================================================================
# Вспомогательные функции для работы со спутниками
# ============================================================================
class SatelliteNotFoundError(Exception):
"""Исключение, возникающее когда спутник не найден в базе данных."""
pass
def get_satellite_by_norad(norad_id: int) -> Satellite:
"""
Получает спутник по NORAD ID с обработкой ошибок.
Args:
norad_id: NORAD ID спутника
Returns:
Satellite: объект спутника
Raises:
SatelliteNotFoundError: если спутник не найден
ValueError: если norad_id некорректен
"""
if not norad_id or norad_id == -1:
raise ValueError(f"Некорректный NORAD ID: {norad_id}")
try:
return Satellite.objects.get(norad=norad_id)
except Satellite.DoesNotExist:
raise SatelliteNotFoundError(
f"Спутник с NORAD ID {norad_id} не найден в базе данных. "
f"Добавьте спутник в справочник перед импортом данных."
)
except Satellite.MultipleObjectsReturned:
# Если по какой-то причине есть дубликаты, берем первый
return Satellite.objects.filter(norad=norad_id).first()
def get_satellite_by_name(name: str) -> Satellite:
"""
Получает спутник по имени с обработкой ошибок.
Args:
name: имя спутника
Returns:
Satellite: объект спутника
Raises:
SatelliteNotFoundError: если спутник не найден
"""
if not name or name.strip() == "-":
raise ValueError(f"Некорректное имя спутника: {name}")
try:
return Satellite.objects.get(name=name.strip())
except Satellite.DoesNotExist:
raise SatelliteNotFoundError(
f"Спутник '{name}' не найден в базе данных. "
f"Добавьте спутник в справочник перед импортом данных."
)
except Satellite.MultipleObjectsReturned:
# Если есть дубликаты по имени, берем первый
return Satellite.objects.filter(name=name.strip()).first()
def get_all_constants(): def get_all_constants():
sats = [sat.name for sat in Satellite.objects.all()] sats = [sat.name for sat in Satellite.objects.all()]
standards = [sat.name for sat in Standard.objects.all()] standards = [sat.name for sat in Standard.objects.all()]
@@ -307,7 +373,12 @@ def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None, is_au
is_automatic: если True, точки не добавляются к Source (optional, default=False) is_automatic: если True, точки не добавляются к Source (optional, default=False)
Returns: Returns:
int: количество созданных Source (или 0 если is_automatic=True) dict: словарь с результатами импорта {
'new_sources': количество созданных Source,
'added': количество добавленных точек,
'skipped': количество пропущенных дубликатов,
'errors': список ошибок
}
""" """
try: try:
df.rename(columns={"Модуляция ": "Модуляция"}, inplace=True) df.rename(columns={"Модуляция ": "Модуляция"}, inplace=True)
@@ -321,6 +392,7 @@ def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None, is_au
new_sources_count = 0 new_sources_count = 0
added_count = 0 added_count = 0
skipped_count = 0 skipped_count = 0
errors = []
# Словарь для кэширования Source в рамках текущего импорта # Словарь для кэширования Source в рамках текущего импорта
# Ключ: (имя источника, id Source), Значение: объект Source # Ключ: (имя источника, id Source), Значение: объект Source
@@ -391,13 +463,21 @@ def fill_data_from_df(df: pd.DataFrame, sat: Satellite, current_user=None, is_au
added_count += 1 added_count += 1
except Exception as e: except Exception as e:
error_msg = f"Строка {idx + 2}: {str(e)}"
print(f"Ошибка при обработке строки {idx}: {e}") print(f"Ошибка при обработке строки {idx}: {e}")
errors.append(error_msg)
continue continue
print(f"Импорт завершен: создано {new_sources_count} новых источников, " print(f"Импорт завершен: создано {new_sources_count} новых источников, "
f"добавлено {added_count} точек, пропущено {skipped_count} дубликатов") f"добавлено {added_count} точек, пропущено {skipped_count} дубликатов, "
f"ошибок: {len(errors)}")
return new_sources_count return {
'new_sources': new_sources_count,
'added': added_count,
'skipped': skipped_count,
'errors': errors
}
def _create_objitem_from_row(row, sat, source, user_to_use, consts, is_automatic=False): def _create_objitem_from_row(row, sat, source, user_to_use, consts, is_automatic=False):
@@ -574,6 +654,11 @@ def _create_objitem_from_row(row, sat, source, user_to_use, consts, is_automatic
def add_satellite_list(): def add_satellite_list():
"""
Добавляет список спутников в базу данных (если их еще нет).
Примечание: Эта функция устарела. Используйте админ-панель для добавления спутников.
"""
sats = [ sats = [
"AZERSPACE 2", "AZERSPACE 2",
"Amos 4", "Amos 4",
@@ -673,33 +758,44 @@ def get_points_from_csv(file_content, current_user=None, is_automatic=False):
is_automatic: если True, точки не добавляются к Source (optional, default=False) is_automatic: если True, точки не добавляются к Source (optional, default=False)
Returns: Returns:
int: количество созданных Source (или 0 если is_automatic=True) dict: словарь с результатами импорта {
'new_sources': количество созданных Source,
'added': количество добавленных точек,
'skipped': количество пропущенных дубликатов,
'errors': список ошибок
}
""" """
# Читаем CSV без предопределенных имен колонок
df = pd.read_csv( df = pd.read_csv(
io.StringIO(file_content), io.StringIO(file_content),
sep=";", sep=";",
names=[ header=None
"id",
"obj",
"lat",
"lon",
"h",
"time",
"sat",
"norad_id",
"freq",
"f_range",
"et",
"qaul",
"mir_1",
"mir_2",
"mir_3",
"mir_4",
"mir_5",
"mir_6",
"mir_7",
],
) )
# Присваиваем имена первым 12 колонкам
base_columns = [
"id",
"obj",
"lat",
"lon",
"h",
"time",
"sat",
"norad_id",
"freq",
"f_range",
"et",
"qual",
]
# Все колонки после "qual" (индекс 11) - это зеркала
num_columns = len(df.columns)
mirror_columns = [f"mir_{i+1}" for i in range(num_columns - len(base_columns))]
# Объединяем имена колонок
df.columns = base_columns + mirror_columns
# Преобразуем типы данных
df[["lat", "lon", "freq", "f_range"]] = ( df[["lat", "lon", "freq", "f_range"]] = (
df[["lat", "lon", "freq", "f_range"]] df[["lat", "lon", "freq", "f_range"]]
.replace(",", ".", regex=True) .replace(",", ".", regex=True)
@@ -711,6 +807,7 @@ def get_points_from_csv(file_content, current_user=None, is_automatic=False):
new_sources_count = 0 new_sources_count = 0
added_count = 0 added_count = 0
skipped_count = 0 skipped_count = 0
errors = []
# Словарь для кэширования Source в рамках текущего импорта # Словарь для кэширования Source в рамках текущего импорта
# Ключ: (имя источника, имя спутника, id Source), Значение: объект Source # Ключ: (имя источника, имя спутника, id Source), Значение: объект Source
@@ -733,14 +830,15 @@ def get_points_from_csv(file_content, current_user=None, is_automatic=False):
skipped_count += 1 skipped_count += 1
continue continue
# Получаем или создаем объект спутника # Получаем объект спутника по NORAD ID
# sat_obj, _ = Satellite.objects.get_or_create( try:
# name=sat_name, defaults={"norad": row["norad_id"]} sat_obj = get_satellite_by_norad(row["norad_id"])
# ) except (SatelliteNotFoundError, ValueError) as e:
error_msg = f"Строка {idx + 2}: {str(e)}"
print(error_msg)
errors.append(error_msg)
continue
sat_obj, _ = Satellite.objects.get_or_create(
norad=row["norad_id"], defaults={"name": sat_name}
)
source = None source = None
# Если is_automatic=False, работаем с Source # Если is_automatic=False, работаем с Source
@@ -781,17 +879,25 @@ def get_points_from_csv(file_content, current_user=None, is_automatic=False):
sources_cache[(source_name, sat_name, source.id)] = source sources_cache[(source_name, sat_name, source.id)] = source
# Создаем ObjItem (с Source или без, в зависимости от is_automatic) # Создаем ObjItem (с Source или без, в зависимости от is_automatic)
_create_objitem_from_csv_row(row, source, user_to_use, is_automatic) _create_objitem_from_csv_row(row, source, user_to_use, is_automatic, mirror_columns)
added_count += 1 added_count += 1
except Exception as e: except Exception as e:
error_msg = f"Строка {idx + 2}: {str(e)}"
print(f"Ошибка при обработке строки {idx}: {e}") print(f"Ошибка при обработке строки {idx}: {e}")
errors.append(error_msg)
continue continue
print(f"Импорт завершен: создано {new_sources_count} новых источников, " print(f"Импорт завершен: создано {new_sources_count} новых источников, "
f"добавлено {added_count} точек, пропущено {skipped_count} дубликатов") f"добавлено {added_count} точек, пропущено {skipped_count} дубликатов, "
f"ошибок: {len(errors)}")
return new_sources_count return {
'new_sources': new_sources_count,
'added': added_count,
'skipped': skipped_count,
'errors': errors
}
def _is_duplicate_by_coords_and_time(coord_tuple, timestamp, tolerance_km=0.001): def _is_duplicate_by_coords_and_time(coord_tuple, timestamp, tolerance_km=0.001):
@@ -896,7 +1002,7 @@ def _find_tech_analyze_data(name: str, satellite: Satellite):
return None return None
def _create_objitem_from_csv_row(row, source, user_to_use, is_automatic=False): def _create_objitem_from_csv_row(row, source, user_to_use, is_automatic=False, mirror_columns=None):
""" """
Вспомогательная функция для создания ObjItem из строки CSV DataFrame. Вспомогательная функция для создания ObjItem из строки CSV DataFrame.
@@ -908,6 +1014,7 @@ def _create_objitem_from_csv_row(row, source, user_to_use, is_automatic=False):
source: объект Source для связи (может быть None если is_automatic=True) source: объект Source для связи (может быть None если is_automatic=True)
user_to_use: пользователь для created_by user_to_use: пользователь для created_by
is_automatic: если True, точка не связывается с Source is_automatic: если True, точка не связывается с Source
mirror_columns: список имен колонок с зеркалами (optional)
""" """
# Определяем поляризацию # Определяем поляризацию
match row["obj"].split(" ")[-1]: match row["obj"].split(" ")[-1]:
@@ -923,21 +1030,36 @@ def _create_objitem_from_csv_row(row, source, user_to_use, is_automatic=False):
pol = "-" pol = "-"
pol_obj, _ = Polarization.objects.get_or_create(name=pol) pol_obj, _ = Polarization.objects.get_or_create(name=pol)
sat_obj, _ = Satellite.objects.get_or_create(
name=row["sat"], defaults={"norad": row["norad_id"]} # Получаем объект спутника по NORAD ID
) try:
sat_obj = get_satellite_by_norad(row["norad_id"])
except (SatelliteNotFoundError, ValueError) as e:
raise Exception(f"Не удалось получить спутник: {str(e)}")
# Ищем данные в TechAnalyze # Ищем данные в TechAnalyze
tech_data = _find_tech_analyze_data(row["obj"], sat_obj) tech_data = _find_tech_analyze_data(row["obj"], sat_obj)
# Обработка зеркал - теперь это спутники # Обработка зеркал - теперь это спутники
mirror_names = [] mirror_names = []
if not pd.isna(row["mir_1"]) and row["mir_1"].strip() != "-":
mirror_names.append(row["mir_1"]) # Если переданы имена колонок зеркал, используем их
if not pd.isna(row["mir_2"]) and row["mir_2"].strip() != "-": if mirror_columns:
mirror_names.append(row["mir_2"]) for mir_col in mirror_columns:
if not pd.isna(row["mir_3"]) and row["mir_3"].strip() != "-": if mir_col in row.index:
mirror_names.append(row["mir_3"]) mir_value = row[mir_col]
if not pd.isna(mir_value) and str(mir_value).strip() != "-" and str(mir_value).strip() != "":
mirror_names.append(str(mir_value).strip())
else:
# Fallback на старый способ (для обратной совместимости)
for i in range(1, 100): # Проверяем до 100 колонок зеркал
mir_col = f"mir_{i}"
if mir_col in row.index:
mir_value = row[mir_col]
if not pd.isna(mir_value) and str(mir_value).strip() != "-" and str(mir_value).strip() != "":
mirror_names.append(str(mir_value).strip())
else:
break
# Находим спутники-зеркала # Находим спутники-зеркала
mirror_satellites = find_mirror_satellites(mirror_names) mirror_satellites = find_mirror_satellites(mirror_names)

View File

@@ -88,14 +88,25 @@ class LoadExcelDataView(LoginRequiredMixin, FormMessageMixin, FormView):
df = df.head(number) df = df.head(number)
result = fill_data_from_df(df, selected_sat, self.request.user.customuser, is_automatic) result = fill_data_from_df(df, selected_sat, self.request.user.customuser, is_automatic)
# Формируем сообщение об успехе
if is_automatic: if is_automatic:
messages.success( success_msg = f"Данные успешно загружены как автоматические! Добавлено точек: {result['added']}"
self.request, f"Данные успешно загружены как автоматические! Добавлено точек: {len(df)}"
)
else: else:
messages.success( success_msg = f"Данные успешно загружены! Создано источников: {result['new_sources']}, добавлено точек: {result['added']}"
self.request, f"Данные успешно загружены! Создано источников: {result}"
if result['skipped'] > 0:
success_msg += f", пропущено дубликатов: {result['skipped']}"
messages.success(self.request, success_msg)
# Показываем ошибки, если они есть
if result['errors']:
error_count = len(result['errors'])
messages.warning(
self.request,
f"Обнаружено ошибок: {error_count}. Первые ошибки: " + "; ".join(result['errors'][:5])
) )
except Exception as e: except Exception as e:
messages.error(self.request, f"Ошибка при обработке файла: {str(e)}") messages.error(self.request, f"Ошибка при обработке файла: {str(e)}")
@@ -124,10 +135,25 @@ class LoadCsvDataView(LoginRequiredMixin, FormMessageMixin, FormView):
result = get_points_from_csv(content, self.request.user.customuser, is_automatic) result = get_points_from_csv(content, self.request.user.customuser, is_automatic)
# Формируем сообщение об успехе
if is_automatic: if is_automatic:
messages.success(self.request, "Данные успешно загружены как автоматические!") success_msg = f"Данные успешно загружены как автоматические! Добавлено точек: {result['added']}"
else: else:
messages.success(self.request, f"Данные успешно загружены! Создано источников: {result}") success_msg = f"Данные успешно загружены! Создано источников: {result['new_sources']}, добавлено точек: {result['added']}"
if result['skipped'] > 0:
success_msg += f", пропущено дубликатов: {result['skipped']}"
messages.success(self.request, success_msg)
# Показываем ошибки, если они есть
if result['errors']:
error_count = len(result['errors'])
messages.warning(
self.request,
f"Обнаружено ошибок: {error_count}. Первые ошибки: " + "; ".join(result['errors'][:5])
)
except Exception as e: except Exception as e:
messages.error(self.request, f"Ошибка при обработке файла: {str(e)}") messages.error(self.request, f"Ошибка при обработке файла: {str(e)}")
return redirect("mainapp:load_csv_data") return redirect("mainapp:load_csv_data")

View File

@@ -113,9 +113,17 @@ class LoadExcelDataView(LoginRequiredMixin, FormMessageMixin, FormView):
df = df.head(number) df = df.head(number)
result = fill_data_from_df(df, selected_sat, self.request.user.customuser) result = fill_data_from_df(df, selected_sat, self.request.user.customuser)
messages.success( # Обработка нового формата результата
self.request, f"Данные успешно загружены! Обработано строк: {result}" success_msg = f"Данные успешно загружены! Создано источников: {result['new_sources']}, добавлено точек: {result['added']}"
) if result['skipped'] > 0:
success_msg += f", пропущено дубликатов: {result['skipped']}"
messages.success(self.request, success_msg)
if result['errors']:
messages.warning(
self.request,
f"Обнаружено ошибок: {len(result['errors'])}. Первые ошибки: " + "; ".join(result['errors'][:5])
)
except Exception as e: except Exception as e:
messages.error(self.request, f"Ошибка при обработке файла: {str(e)}") messages.error(self.request, f"Ошибка при обработке файла: {str(e)}")
@@ -180,7 +188,19 @@ class LoadCsvDataView(LoginRequiredMixin, FormMessageMixin, FormView):
if isinstance(content, bytes): if isinstance(content, bytes):
content = content.decode("utf-8") content = content.decode("utf-8")
get_points_from_csv(content, self.request.user.customuser) result = get_points_from_csv(content, self.request.user.customuser)
# Обработка нового формата результата
success_msg = f"Данные успешно загружены! Создано источников: {result['new_sources']}, добавлено точек: {result['added']}"
if result['skipped'] > 0:
success_msg += f", пропущено дубликатов: {result['skipped']}"
messages.success(self.request, success_msg)
if result['errors']:
messages.warning(
self.request,
f"Обнаружено ошибок: {len(result['errors'])}. Первые ошибки: " + "; ".join(result['errors'][:5])
)
except Exception as e: except Exception as e:
messages.error(self.request, f"Ошибка при обработке файла: {str(e)}") messages.error(self.request, f"Ошибка при обработке файла: {str(e)}")
return redirect("mainapp:load_csv_data") return redirect("mainapp:load_csv_data")

View File

@@ -223,8 +223,11 @@ def parse_transponders_from_json(filepath: str, user=None):
# Third-party imports (additional) # Third-party imports (additional)
import logging
from lxml import etree from lxml import etree
logger = logging.getLogger(__name__)
def parse_transponders_from_xml(data_in: BytesIO, user=None): def parse_transponders_from_xml(data_in: BytesIO, user=None):
""" """
Парсит транспондеры из XML файла. Парсит транспондеры из XML файла.
@@ -232,9 +235,23 @@ def parse_transponders_from_xml(data_in: BytesIO, user=None):
Если имя спутника содержит альтернативное имя в скобках, оно извлекается Если имя спутника содержит альтернативное имя в скобках, оно извлекается
и сохраняется в поле alternative_name. и сохраняется в поле alternative_name.
Процесс импорта:
1. Сначала создаются/обновляются все спутники
2. Затем для каждого спутника добавляются его транспондеры
Args: Args:
data_in: BytesIO объект с XML данными data_in: BytesIO объект с XML данными
user: пользователь для установки created_by и updated_by (optional) user: пользователь для установки created_by и updated_by (optional)
Returns:
dict: Статистика импорта с ключами:
- satellites_created: количество созданных спутников
- satellites_updated: количество обновлённых спутников
- satellites_skipped: количество пропущенных спутников (дубликаты)
- satellites_ignored: количество игнорированных спутников (X, DONT USE)
- transponders_created: количество созданных транспондеров
- transponders_existing: количество существующих транспондеров
- errors: список ошибок с деталями
""" """
tree = etree.parse(data_in) tree = etree.parse(data_in)
ns = { ns = {
@@ -243,48 +260,38 @@ def parse_transponders_from_xml(data_in: BytesIO, user=None):
'tr': 'http://schemas.datacontract.org/2004/07/Geolocation.Common.Extensions' 'tr': 'http://schemas.datacontract.org/2004/07/Geolocation.Common.Extensions'
} }
satellites = tree.xpath('//ns:SatelliteMemo', namespaces=ns) satellites = tree.xpath('//ns:SatelliteMemo', namespaces=ns)
for sat in satellites[:]:
# Статистика импорта
stats = {
'satellites_created': 0,
'satellites_updated': 0,
'satellites_skipped': 0,
'satellites_ignored': 0,
'transponders_created': 0,
'transponders_existing': 0,
'errors': []
}
# Этап 1: Создание/обновление спутников
satellite_map = {} # Словарь для связи XML элементов со спутниками в БД
for sat in satellites:
name_full = sat.xpath('./ns:name/text()', namespaces=ns)[0] name_full = sat.xpath('./ns:name/text()', namespaces=ns)[0]
# Игнорируем служебные записи
if name_full == 'X' or 'DONT USE' in name_full: if name_full == 'X' or 'DONT USE' in name_full:
stats['satellites_ignored'] += 1
logger.info(f"Игнорирован спутник: {name_full}")
continue continue
# Парсим имя спутника и альтернативное имя # Парсим имя спутника и альтернативное имя
main_name, alt_name = parse_satellite_name(name_full) main_name, alt_name = parse_satellite_name(name_full)
norad = sat.xpath('./ns:norad/text()', namespaces=ns) norad = sat.xpath('./ns:norad/text()', namespaces=ns)
beams = sat.xpath('.//ns:BeamMemo', namespaces=ns)
intl_code = sat.xpath('.//ns:internationalCode/text()', namespaces=ns) intl_code = sat.xpath('.//ns:internationalCode/text()', namespaces=ns)
sub_sat_point = sat.xpath('.//ns:subSatellitePoint/text()', namespaces=ns) sub_sat_point = sat.xpath('.//ns:subSatellitePoint/text()', namespaces=ns)
zones = {}
for zone in beams: try:
zone_name = zone.xpath('./ns:name/text()', namespaces=ns)[0] if zone.xpath('./ns:name/text()', namespaces=ns) else '-'
zones[zone.xpath('./ns:id/text()', namespaces=ns)[0]] = {
"name": zone_name,
"pol": zone.xpath('./ns:polarization/text()', namespaces=ns)[0],
}
transponders = sat.xpath('.//ns:TransponderMemo', namespaces=ns)
for transponder in transponders:
tr_id = transponder.xpath('./ns:downlinkBeamId/text()', namespaces=ns)[0]
downlink_start = float(transponder.xpath('./ns:downlinkFrequency/tr:start/text()', namespaces=ns)[0])
downlink_end = float(transponder.xpath('./ns:downlinkFrequency/tr:end/text()', namespaces=ns)[0])
uplink_start = float(transponder.xpath('./ns:uplinkFrequency/tr:start/text()', namespaces=ns)[0])
uplink_end = float(transponder.xpath('./ns:uplinkFrequency/tr:end/text()', namespaces=ns)[0])
tr_data = zones[tr_id]
match tr_data['pol']:
case 'Horizontal':
pol = 'Горизонтальная'
case 'Vertical':
pol = 'Вертикальная'
case 'CircularRight':
pol = 'Правая'
case 'CircularLeft':
pol = 'Левая'
case _:
pol = '-'
tr_name = transponder.xpath('./ns:name/text()', namespaces=ns)[0]
pol_obj, _ = Polarization.objects.get_or_create(name=pol)
# Ищем спутник по имени или альтернативному имени # Ищем спутник по имени или альтернативному имени
sat_obj = find_satellite_by_name(main_name) sat_obj = find_satellite_by_name(main_name)
if not sat_obj: if not sat_obj:
@@ -296,8 +303,10 @@ def parse_transponders_from_xml(data_in: BytesIO, user=None):
international_code=intl_code[0] if intl_code else "", international_code=intl_code[0] if intl_code else "",
undersat_point=float(sub_sat_point[0]) if sub_sat_point else None undersat_point=float(sub_sat_point[0]) if sub_sat_point else None
) )
stats['satellites_created'] += 1
logger.info(f"Создан спутник: {main_name} (альт. имя: {alt_name})")
else: else:
# Если найден, обновляем альтернативное имя если не установлено # Если найден, обновляем поля если они не установлены
updated = False updated = False
if alt_name and not sat_obj.alternative_name: if alt_name and not sat_obj.alternative_name:
sat_obj.alternative_name = alt_name sat_obj.alternative_name = alt_name
@@ -313,20 +322,130 @@ def parse_transponders_from_xml(data_in: BytesIO, user=None):
updated = True updated = True
if updated: if updated:
sat_obj.save() sat_obj.save()
stats['satellites_updated'] += 1
logger.info(f"Обновлён спутник: {main_name}")
trans_obj, created = Transponders.objects.get_or_create( # Сохраняем связь XML элемента со спутником в БД
polarization=pol_obj, satellite_map[sat] = sat_obj
downlink=(downlink_start+downlink_end)/2/1000000,
uplink=(uplink_start+uplink_end)/2/1000000, except Satellite.MultipleObjectsReturned:
frequency_range=abs(downlink_end-downlink_start)/1000000, # Найдено несколько спутников - пропускаем
name=tr_name, stats['satellites_skipped'] += 1
defaults={ duplicates = Satellite.objects.filter(
"zone_name": tr_data['name'], Q(name__icontains=main_name.lower()) |
"sat_id": sat_obj, Q(alternative_name__icontains=main_name.lower())
}
) )
if user: duplicate_names = [f"{s.name} (ID: {s.id})" for s in duplicates]
if created: error_msg = f"Найдено несколько спутников для '{name_full}': {', '.join(duplicate_names)}"
trans_obj.created_by = user stats['errors'].append({
trans_obj.updated_by = user 'type': 'duplicate_satellite',
trans_obj.save() 'satellite': name_full,
'details': duplicate_names
})
logger.warning(error_msg)
continue
except Exception as e:
# Другие ошибки при обработке спутника
stats['satellites_skipped'] += 1
error_msg = f"Ошибка при обработке спутника '{name_full}': {str(e)}"
stats['errors'].append({
'type': 'satellite_error',
'satellite': name_full,
'error': str(e)
})
logger.error(error_msg, exc_info=True)
continue
# Этап 2: Добавление транспондеров для каждого спутника
for sat, sat_obj in satellite_map.items():
sat_name = sat.xpath('./ns:name/text()', namespaces=ns)[0]
try:
beams = sat.xpath('.//ns:BeamMemo', namespaces=ns)
zones = {}
for zone in beams:
zone_name = zone.xpath('./ns:name/text()', namespaces=ns)[0] if zone.xpath('./ns:name/text()', namespaces=ns) else '-'
zones[zone.xpath('./ns:id/text()', namespaces=ns)[0]] = {
"name": zone_name,
"pol": zone.xpath('./ns:polarization/text()', namespaces=ns)[0],
}
transponders = sat.xpath('.//ns:TransponderMemo', namespaces=ns)
for transponder in transponders:
try:
tr_id = transponder.xpath('./ns:downlinkBeamId/text()', namespaces=ns)[0]
downlink_start = float(transponder.xpath('./ns:downlinkFrequency/tr:start/text()', namespaces=ns)[0])
downlink_end = float(transponder.xpath('./ns:downlinkFrequency/tr:end/text()', namespaces=ns)[0])
uplink_start = float(transponder.xpath('./ns:uplinkFrequency/tr:start/text()', namespaces=ns)[0])
uplink_end = float(transponder.xpath('./ns:uplinkFrequency/tr:end/text()', namespaces=ns)[0])
tr_data = zones[tr_id]
match tr_data['pol']:
case 'Horizontal':
pol = 'Горизонтальная'
case 'Vertical':
pol = 'Вертикальная'
case 'CircularRight':
pol = 'Правая'
case 'CircularLeft':
pol = 'Левая'
case _:
pol = '-'
tr_name = transponder.xpath('./ns:name/text()', namespaces=ns)[0]
pol_obj, _ = Polarization.objects.get_or_create(name=pol)
trans_obj, created = Transponders.objects.get_or_create(
polarization=pol_obj,
downlink=(downlink_start+downlink_end)/2/1000000,
uplink=(uplink_start+uplink_end)/2/1000000,
frequency_range=abs(downlink_end-downlink_start)/1000000,
name=tr_name,
defaults={
"zone_name": tr_data['name'],
"sat_id": sat_obj,
}
)
if created:
stats['transponders_created'] += 1
else:
stats['transponders_existing'] += 1
if user:
if created:
trans_obj.created_by = user
trans_obj.updated_by = user
trans_obj.save()
except Exception as e:
error_msg = f"Ошибка при обработке транспондера спутника '{sat_name}': {str(e)}"
stats['errors'].append({
'type': 'transponder_error',
'satellite': sat_name,
'error': str(e)
})
logger.error(error_msg, exc_info=True)
continue
except Exception as e:
error_msg = f"Ошибка при обработке транспондеров спутника '{sat_name}': {str(e)}"
stats['errors'].append({
'type': 'transponders_processing_error',
'satellite': sat_name,
'error': str(e)
})
logger.error(error_msg, exc_info=True)
continue
# Итоговая статистика в лог
logger.info(
f"Импорт завершён. Спутники: создано {stats['satellites_created']}, "
f"обновлено {stats['satellites_updated']}, пропущено {stats['satellites_skipped']}, "
f"игнорировано {stats['satellites_ignored']}. "
f"Транспондеры: создано {stats['transponders_created']}, "
f"существующих {stats['transponders_existing']}. "
f"Ошибок: {len(stats['errors'])}"
)
return stats

View File

@@ -1,8 +1,9 @@
services: services:
web: web:
build: # build:
context: ./dbapp # context: ./dbapp
dockerfile: Dockerfile # dockerfile: Dockerfile
image: https://registry.geraltserv.ru/geolocation:latest
env_file: env_file:
- .env.prod - .env.prod
depends_on: depends_on:
@@ -14,9 +15,10 @@ services:
- 8000 - 8000
worker: worker:
build: # build:
context: ./dbapp # context: ./dbapp
dockerfile: Dockerfile # dockerfile: Dockerfile
image: https://registry.geraltserv.ru/geolocation:latest
env_file: env_file:
- .env.prod - .env.prod
#entrypoint: [] #entrypoint: []

View File

@@ -1,76 +1,76 @@
services: services:
db: db:
image: postgis/postgis:18-3.6 image: postgis/postgis:18-3.6
container_name: postgres-postgis container_name: postgres-postgis
restart: unless-stopped restart: unless-stopped
environment: environment:
POSTGRES_DB: geodb POSTGRES_DB: geodb
POSTGRES_USER: geralt POSTGRES_USER: geralt
POSTGRES_PASSWORD: 123456 POSTGRES_PASSWORD: 123456
ports: ports:
- "5432:5432" - "5432:5432"
volumes: volumes:
- postgres_data:/var/lib/postgresql - postgres_data:/var/lib/postgresql
networks: networks:
- app-network - app-network
redis: redis:
image: redis:8.2.3-alpine image: redis:8.2.3-alpine
container_name: redis container_name: redis
restart: unless-stopped restart: unless-stopped
ports: ports:
- "6379:6379" - "6379:6379"
volumes: volumes:
- redis_data:/data - redis_data:/data
networks: networks:
- app-network - app-network
flaresolverr: flaresolverr:
image: ghcr.io/flaresolverr/flaresolverr:latest image: ghcr.io/flaresolverr/flaresolverr:latest
container_name: flaresolverr-dev container_name: flaresolverr-dev
restart: unless-stopped restart: unless-stopped
ports: ports:
- "8191:8191" - "8191:8191"
environment: environment:
- LOG_LEVEL=info - LOG_LEVEL=info
- LOG_HTML=false - LOG_HTML=false
- CAPTCHA_SOLVER=none - CAPTCHA_SOLVER=none
networks: networks:
- app-network - app-network
# nginx: # nginx:
# image: nginx:alpine # image: nginx:alpine
# container_name: nginx # container_name: nginx
# restart: unless-stopped # restart: unless-stopped
# ports: # ports:
# - "80:80" # - "80:80"
# # - "443:443" # # - "443:443"
# volumes: # volumes:
# - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro # - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
# - ./nginx/conf.d:/etc/nginx/conf.d:ro # - ./nginx/conf.d:/etc/nginx/conf.d:ro
# - ./dbapp/staticfiles:/app/staticfiles:ro # - ./dbapp/staticfiles:/app/staticfiles:ro
# networks: # networks:
# - app-network # - app-network
# tileserver: # tileserver:
# image: maptiler/tileserver-gl:latest # image: maptiler/tileserver-gl:latest
# container_name: tileserver-gl-dev # container_name: tileserver-gl-dev
# restart: unless-stopped # restart: unless-stopped
# ports: # ports:
# - "8080:8080" # - "8080:8080"
# volumes: # volumes:
# - ./tiles:/data # - ./tiles:/data
# - tileserver_config_dev:/config # - tileserver_config_dev:/config
# environment: # environment:
# - VERBOSE=true # - VERBOSE=true
# networks: # networks:
# - app-network # - app-network
volumes: volumes:
postgres_data: postgres_data:
redis_data: redis_data:
# tileserver_config_dev: # tileserver_config_dev:
networks: networks:
app-network: app-network:
driver: bridge driver: bridge