Files
dbstorage/dbapp/mainapp/management/commands/generate_test_marks.py

243 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Management command для генерации тестовых отметок сигналов.
Использование:
python manage.py generate_test_marks --satellite_id=1 --user_id=1 --date_range=10.10.2025-15.10.2025
python manage.py generate_test_marks --satellite_id=1 --user_id=1 --date_range=10.10.2025-15.10.2025 --tech_analyze_ids=1,2,3
python manage.py generate_test_marks --satellite_id=1 --user_id=1 --date_range=10.10.2025-15.10.2025 --time_range=7:00-9:30
Параметры:
--satellite_id: ID спутника (обязательный)
--user_id: ID пользователя CustomUser (обязательный)
--date_range: Диапазон дат в формате ДД.ММ.ГГГГ-ДД.ММ.ГГГГ (обязательный)
--tech_analyze_ids: ID теханализов через запятую (опциональный, если не указан - все теханализы спутника)
--time_range: Временной диапазон в формате ЧЧ:ММ-ЧЧ:ММ (опциональный, по умолчанию 8:00-11:00)
--clear: Удалить существующие отметки перед генерацией
Особенности:
- Генерирует отметки только в будние дни (пн-пт)
- Время отметок: по умолчанию утро с 8:00 до 11:00 (настраивается через --time_range)
- Одна отметка в день для всех сигналов спутника
- Все отметки в один день имеют одинаковый timestamp (пакетное сохранение)
- Все отметки имеют значение True (сигнал присутствует)
"""
import random
from datetime import datetime, timedelta
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
from mainapp.models import TechAnalyze, ObjectMark, Satellite, CustomUser
class Command(BaseCommand):
help = 'Генерирует тестовые отметки сигналов для теханализов выбранного спутника'
def add_arguments(self, parser):
parser.add_argument(
'--satellite_id',
type=int,
required=True,
help='ID спутника для генерации отметок'
)
parser.add_argument(
'--user_id',
type=int,
required=True,
help='ID пользователя CustomUser - автор всех отметок'
)
parser.add_argument(
'--date_range',
type=str,
required=True,
help='Диапазон дат в формате ДД.ММ.ГГГГ-ДД.ММ.ГГГГ (например: 10.10.2025-15.10.2025)'
)
parser.add_argument(
'--tech_analyze_ids',
type=str,
required=False,
help='ID теханализов через запятую (например: 1,2,3). Если не указан - все теханализы спутника'
)
parser.add_argument(
'--time_range',
type=str,
required=False,
default='8:00-11:00',
help='Временной диапазон в формате ЧЧ:ММ-ЧЧ:ММ (например: 7:00-9:30). По умолчанию: 8:00-11:00'
)
parser.add_argument(
'--clear',
action='store_true',
help='Удалить существующие отметки перед генерацией'
)
def parse_time(self, time_str):
"""Парсит время в формате ЧЧ:ММ или Ч:ММ"""
parts = time_str.strip().split(':')
if len(parts) != 2:
raise ValueError(f'Неверный формат времени: {time_str}')
return int(parts[0]), int(parts[1])
def handle(self, *args, **options):
satellite_id = options['satellite_id']
user_id = options['user_id']
date_range = options['date_range']
tech_analyze_ids_str = options['tech_analyze_ids']
time_range = options['time_range']
clear = options['clear']
# Проверяем существование пользователя
try:
custom_user = CustomUser.objects.select_related('user').get(id=user_id)
except CustomUser.DoesNotExist:
raise CommandError(f'Пользователь CustomUser с ID {user_id} не найден')
# Парсим диапазон дат
try:
start_str, end_str = date_range.split('-')
start_date = datetime.strptime(start_str.strip(), '%d.%m.%Y')
end_date = datetime.strptime(end_str.strip(), '%d.%m.%Y')
# Делаем timezone-aware
start_date = timezone.make_aware(start_date)
end_date = timezone.make_aware(end_date)
if start_date > end_date:
raise CommandError('Начальная дата должна быть раньше конечной')
except ValueError as e:
raise CommandError(
f'Неверный формат даты. Используйте ДД.ММ.ГГГГ-ДД.ММ.ГГГГ (например: 10.10.2025-15.10.2025). Ошибка: {e}'
)
# Парсим временной диапазон
try:
time_start_str, time_end_str = time_range.split('-')
start_hour, start_minute = self.parse_time(time_start_str)
end_hour, end_minute = self.parse_time(time_end_str)
# Валидация времени
if not (0 <= start_hour <= 23 and 0 <= start_minute <= 59):
raise ValueError(f'Некорректное начальное время: {time_start_str}')
if not (0 <= end_hour <= 23 and 0 <= end_minute <= 59):
raise ValueError(f'Некорректное конечное время: {time_end_str}')
# Конвертируем в минуты для удобства сравнения и генерации
start_minutes = start_hour * 60 + start_minute
end_minutes = end_hour * 60 + end_minute
if start_minutes >= end_minutes:
raise CommandError('Начальное время должно быть раньше конечного')
except ValueError as e:
raise CommandError(
f'Неверный формат времени. Используйте ЧЧ:ММ-ЧЧ:ММ (например: 7:00-9:30). Ошибка: {e}'
)
# Парсим ID теханализов если указаны
tech_analyze_ids = None
if tech_analyze_ids_str:
try:
tech_analyze_ids = [int(tid.strip()) for tid in tech_analyze_ids_str.split(',')]
except ValueError:
raise CommandError(
f'Неверный формат tech_analyze_ids. Используйте числа через запятую (например: 1,2,3)'
)
# Проверяем существование спутника
try:
satellite = Satellite.objects.get(id=satellite_id)
except Satellite.DoesNotExist:
raise CommandError(f'Спутник с ID {satellite_id} не найден')
# Получаем теханализы для спутника
tech_analyzes_qs = TechAnalyze.objects.filter(satellite=satellite)
# Фильтруем по ID теханализов если указаны
if tech_analyze_ids:
tech_analyzes_qs = tech_analyzes_qs.filter(id__in=tech_analyze_ids)
tech_analyzes = list(tech_analyzes_qs)
ta_count = len(tech_analyzes)
if ta_count == 0:
if tech_analyze_ids:
raise CommandError(f'Нет теханализов для спутника "{satellite.name}" с указанными ID {tech_analyze_ids}')
else:
raise CommandError(f'Нет теханализов для спутника "{satellite.name}"')
self.stdout.write(f'Спутник: {satellite.name}')
self.stdout.write(f'Теханализов: {ta_count}')
if tech_analyze_ids:
self.stdout.write(f'ID теханализов: {tech_analyze_ids}')
self.stdout.write(f'Пользователь: {custom_user}')
self.stdout.write(f'Период: {start_str} - {end_str} (только будние дни)')
self.stdout.write(f'Время: {time_start_str.strip()} - {time_end_str.strip()}')
# Удаляем существующие отметки если указан флаг
if clear:
delete_qs = ObjectMark.objects.filter(tech_analyze__satellite=satellite)
if tech_analyze_ids:
delete_qs = delete_qs.filter(tech_analyze_id__in=tech_analyze_ids)
deleted_count = delete_qs.delete()[0]
self.stdout.write(
self.style.WARNING(f'Удалено существующих отметок: {deleted_count}')
)
# Генерируем отметки
total_marks = 0
marks_to_create = []
workdays_count = 0
current_date = start_date
# Включаем конечную дату в диапазон
end_date_inclusive = end_date + timedelta(days=1)
while current_date < end_date_inclusive:
# Проверяем, что это будний день (0=пн, 4=пт)
if current_date.weekday() < 5:
workdays_count += 1
# Генерируем случайное время в указанном диапазоне
random_total_minutes = random.randint(start_minutes, end_minutes)
random_hour = random_total_minutes // 60
random_minute = random_total_minutes % 60
random_second = random.randint(0, 59)
mark_time = current_date.replace(
hour=random_hour,
minute=random_minute,
second=random_second,
microsecond=0
)
# Создаём отметки для всех теханализов с одинаковым timestamp
for ta in tech_analyzes:
marks_to_create.append(ObjectMark(
tech_analyze=ta,
mark=True, # Всегда True
timestamp=mark_time,
created_by=custom_user,
))
total_marks += 1
current_date += timedelta(days=1)
# Bulk create для производительности
self.stdout.write(f'Рабочих дней: {workdays_count}')
self.stdout.write(f'Создание {total_marks} отметок...')
# Создаём партиями по 1000
batch_size = 1000
for i in range(0, len(marks_to_create), batch_size):
batch = marks_to_create[i:i + batch_size]
ObjectMark.objects.bulk_create(batch)
self.stdout.write(f' Создано: {min(i + batch_size, len(marks_to_create))}/{total_marks}')
self.stdout.write(
self.style.SUCCESS(
f'Успешно создано {total_marks} отметок для {ta_count} теханализов за {workdays_count} рабочих дней'
)
)