I have a RabbitMQ web service and a Celery background worker.
RabbitMQ is running as expected but the Celery worker cannot connect and I receive this error message:
consumer: Cannot connect to amqp://<USER>:**@<HOST>:<PORT>//: timed out.
Trying again in 2.00 seconds... (1/100)
Both services are on the same region and account (they are defined in the same blueprint).
I am creating the Celery broker URL like this:
broker = f"amqp://{username}:{password}@{hostport}//"
username
and password
are set using an env group and appear to be correct.
hostport
is set in the blueprint like this:
- key: BROKER_HOSTPORT
fromService:
type: web
name: rabbitmq
property: hostport
Full render.yaml
:
services:
- type: web
name: rabbitmq
region: ohio
env: docker
rootDir: rabbitmq
envVars:
- key: RABBITMQ_ERLANG_COOKIE
generateValue: true
- fromGroup: broker-settings
disk:
name: rabbitmq
mountPath: /var/lib/rabbitmq
sizeGB: 10
- type: worker
name: ai-worker
region: ohio
env: python
buildCommand: "pip install -r worker/requirements.txt"
startCommand: "make worker"
envVars:
- key: BROKER_HOSTPORT
fromService:
type: web
name: rabbitmq
property: hostport
- key: CELERY_RESULT_BACKEND
fromService:
type: redis
name: celery-redis
property: connectionString
- key: PYTHON_VERSION
value: 3.9.13
- key: OPENAI_API_KEY
sync: false
- key: PROMPTLAYER_API_KEY
sync: false
- key: MEDIUM_INTEGRATION_TOKEN
sync: false
- fromGroup: broker-settings
- type: web
name: flower
region: ohio
plan: free
env: python
buildCommand: "pip install -r worker/requirements.txt"
startCommand: "make flower"
envVars:
- key: BROKER_HOSTPORT
fromService:
type: web
name: rabbitmq
property: hostport
- key: PYTHON_VERSION
value: 3.9.13
- fromGroup: broker-settings
- type: redis
name: celery-redis
region: ohio
plan: starter # we choose a plan with persistence to ensure tasks are not lost upon restart
ipAllowList: [] # only allow internal connections
Celery worker client.py
:
from celery import Celery, group
from dotenv import load_dotenv
import os
load_dotenv()
broker = os.environ["CELERY_BROKER_URL"] if "CELERY_BROKER_URL" in os.environ else None
if not broker:
hostport = os.environ["BROKER_HOSTPORT"] if "BROKER_HOSTPORT" in os.environ else "localhost:5672"
username = os.environ["RABBITMQ_DEFAULT_USER"] if "RABBITMQ_DEFAULT_USER" in os.environ else "guest"
password = os.environ["RABBITMQ_DEFAULT_PASS"] if "RABBITMQ_DEFAULT_PASS" in os.environ else "**"
broker = f"amqp://{username}:{password}@{hostport}//"
backend = os.environ["CELERY_RESULT_BACKEND"] if "CELERY_RESULT_BACKEND" in os.environ else "redis://"
client = Celery('client',
broker=broker,
backend=backend,
include=['worker.tasks.generate', 'worker.tasks.publish'],
setup_defaults={"time_limit": 120, "soft_time_limit": 120})
# Optional configuration, see the application user guide.
client.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
client.start()
Any suggestions?