#!/usr/bin/env python """ Скрипт для тестирования Celery подключения и задач. Запуск: python test_celery.py """ import os import django os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dbapp.settings.production') django.setup() from celery import current_app from dbapp.celery import debug_task def test_celery_connection(): """Проверка подключения к Celery""" print("=" * 60) print("ТЕСТ CELERY ПОДКЛЮЧЕНИЯ") print("=" * 60) # Проверка конфигурации print(f"\n1. Broker URL: {current_app.conf.broker_url}") print(f"2. Result Backend: {current_app.conf.result_backend}") # Проверка подключения к брокеру try: inspect = current_app.control.inspect() stats = inspect.stats() if stats: print(f"\n3. ✓ Активные workers: {list(stats.keys())}") for worker, info in stats.items(): print(f" - {worker}: {info}") else: print("\n3. ✗ Нет активных workers!") print(" Убедитесь, что Celery worker запущен:") print(" docker-compose -f docker-compose.prod.yaml logs worker") except Exception as e: print(f"\n3. ✗ Ошибка подключения к брокеру: {e}") return False # Проверка зарегистрированных задач registered_tasks = list(current_app.tasks.keys()) print(f"\n4. Зарегистрированные задачи ({len(registered_tasks)}):") for task in sorted(registered_tasks): if not task.startswith('celery.'): print(f" - {task}") # Тест простой задачи print("\n5. Тестирование задачи...") try: result = debug_task.delay() print(f" Task ID: {result.id}") print(f" Waiting for result...") output = result.get(timeout=10) print(f" ✓ Результат: {output}") except Exception as e: print(f" ✗ Ошибка выполнения задачи: {e}") return False print("\n" + "=" * 60) print("✓ ВСЕ ТЕСТЫ ПРОЙДЕНЫ") print("=" * 60) return True if __name__ == "__main__": test_celery_connection()