from django.core.management.base import BaseCommand from mainapp.tasks import test_celery_connection, add_numbers class Command(BaseCommand): help = 'Test Celery functionality' def handle(self, *args, **options): self.stdout.write('Testing Celery connection...') # Test simple task result = test_celery_connection.delay("Hello from test command!") self.stdout.write(f'Task ID: {result.id}') # Wait for result task_result = result.get(timeout=10) self.stdout.write(self.style.SUCCESS(f'Task result: {task_result}')) # Test math task math_result = add_numbers.delay(10, 20) sum_result = math_result.get(timeout=10) self.stdout.write(self.style.SUCCESS(f'10 + 20 = {sum_result}')) self.stdout.write(self.style.SUCCESS('All tests passed!'))