#!/usr/bin/env python """ Скрипт для проверки подключения к Redis. Запуск: python check_redis.py """ import os import sys try: import redis except ImportError: print("❌ Redis библиотека не установлена") print("Установите: pip install redis") sys.exit(1) def check_redis(): """Проверка подключения к Redis""" print("=" * 60) print("ПРОВЕРКА REDIS") print("=" * 60) # Получаем URL из переменных окружения broker_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") cache_url = os.getenv("REDIS_URL", "redis://localhost:6379/1") print(f"\n1. Broker URL: {broker_url}") print(f"2. Cache URL: {cache_url}") # Проверка broker (database 0) print("\n3. Проверка Celery Broker (db 0)...") try: r_broker = redis.from_url(broker_url) r_broker.ping() print(" ✓ Подключение успешно") # Проверка ключей keys = r_broker.keys("*") print(f" ✓ Ключей в базе: {len(keys)}") # Проверка очереди celery queue_length = r_broker.llen("celery") print(f" ✓ Задач в очереди 'celery': {queue_length}") except redis.ConnectionError as e: print(f" ✗ Ошибка подключения: {e}") return False except Exception as e: print(f" ✗ Ошибка: {e}") return False # Проверка cache (database 1) print("\n4. Проверка Django Cache (db 1)...") try: r_cache = redis.from_url(cache_url) r_cache.ping() print(" ✓ Подключение успешно") # Проверка ключей keys = r_cache.keys("*") print(f" ✓ Ключей в базе: {len(keys)}") except redis.ConnectionError as e: print(f" ✗ Ошибка подключения: {e}") return False except Exception as e: print(f" ✗ Ошибка: {e}") return False # Тест записи/чтения print("\n5. Тест записи/чтения...") try: test_key = "test:celery:connection" test_value = "OK" r_broker.set(test_key, test_value, ex=10) # TTL 10 секунд result = r_broker.get(test_key) if result and result.decode() == test_value: print(f" ✓ Запись/чтение работает") r_broker.delete(test_key) else: print(f" ✗ Ошибка: ожидалось '{test_value}', получено '{result}'") return False except Exception as e: print(f" ✗ Ошибка: {e}") return False print("\n" + "=" * 60) print("✓ ВСЕ ПРОВЕРКИ ПРОЙДЕНЫ") print("=" * 60) return True if __name__ == "__main__": success = check_redis() sys.exit(0 if success else 1)