24 lines
879 B
Python
24 lines
879 B
Python
from django.core.management.base import BaseCommand
|
|
from mainapp.tasks import test_celery_connection, add_numbers
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = 'Test Celery functionality'
|
|
|
|
def handle(self, *args, **options):
|
|
self.stdout.write('Testing Celery connection...')
|
|
|
|
# Test simple task
|
|
result = test_celery_connection.delay("Hello from test command!")
|
|
self.stdout.write(f'Task ID: {result.id}')
|
|
|
|
# Wait for result
|
|
task_result = result.get(timeout=10)
|
|
self.stdout.write(self.style.SUCCESS(f'Task result: {task_result}'))
|
|
|
|
# Test math task
|
|
math_result = add_numbers.delay(10, 20)
|
|
sum_result = math_result.get(timeout=10)
|
|
self.stdout.write(self.style.SUCCESS(f'10 + 20 = {sum_result}'))
|
|
|
|
self.stdout.write(self.style.SUCCESS('All tests passed!')) |