Files
dbstorage/dbapp/check_redis.py

97 lines
3.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
"""
Скрипт для проверки подключения к Redis.
Запуск: python check_redis.py
"""
import os
import sys
try:
import redis
except ImportError:
print("❌ Redis библиотека не установлена")
print("Установите: pip install redis")
sys.exit(1)
def check_redis():
"""Проверка подключения к Redis"""
print("=" * 60)
print("ПРОВЕРКА REDIS")
print("=" * 60)
# Получаем URL из переменных окружения
broker_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
cache_url = os.getenv("REDIS_URL", "redis://localhost:6379/1")
print(f"\n1. Broker URL: {broker_url}")
print(f"2. Cache URL: {cache_url}")
# Проверка broker (database 0)
print("\n3. Проверка Celery Broker (db 0)...")
try:
r_broker = redis.from_url(broker_url)
r_broker.ping()
print(" ✓ Подключение успешно")
# Проверка ключей
keys = r_broker.keys("*")
print(f" ✓ Ключей в базе: {len(keys)}")
# Проверка очереди celery
queue_length = r_broker.llen("celery")
print(f" ✓ Задач в очереди 'celery': {queue_length}")
except redis.ConnectionError as e:
print(f" ✗ Ошибка подключения: {e}")
return False
except Exception as e:
print(f" ✗ Ошибка: {e}")
return False
# Проверка cache (database 1)
print("\n4. Проверка Django Cache (db 1)...")
try:
r_cache = redis.from_url(cache_url)
r_cache.ping()
print(" ✓ Подключение успешно")
# Проверка ключей
keys = r_cache.keys("*")
print(f" ✓ Ключей в базе: {len(keys)}")
except redis.ConnectionError as e:
print(f" ✗ Ошибка подключения: {e}")
return False
except Exception as e:
print(f" ✗ Ошибка: {e}")
return False
# Тест записи/чтения
print("\n5. Тест записи/чтения...")
try:
test_key = "test:celery:connection"
test_value = "OK"
r_broker.set(test_key, test_value, ex=10) # TTL 10 секунд
result = r_broker.get(test_key)
if result and result.decode() == test_value:
print(f" ✓ Запись/чтение работает")
r_broker.delete(test_key)
else:
print(f" ✗ Ошибка: ожидалось '{test_value}', получено '{result}'")
return False
except Exception as e:
print(f" ✗ Ошибка: {e}")
return False
print("\n" + "=" * 60)
print("ВСЕ ПРОВЕРКИ ПРОЙДЕНЫ")
print("=" * 60)
return True
if __name__ == "__main__":
success = check_redis()
sys.exit(0 if success else 1)