Troubleshoot stream (fanout) queues¶
Beginning with Yaook operator version 0.20250227.0, all queues which are created by OpenStack and contain the suffix _fanout are of type stream, unless the overwrite
rabbit_stream_fanout=false is set inside the OpenStack manifests. You can confirm the type by running:
rabbitmqctl list_queues name type | grep <queue_name>
If you want to know more about what RabbitmMQ streams are, head to the RabbitMQ documentation.
Commands provided in this guide need to be run inside the rabbitmq Kubernetes container.
Repair the RabbitMQ stream coordinator¶
If you encounter repeated OpenStack error messages like the following, the stream coordinator [1] is possibly not working properly:
"message": "Failed to consume message from queue: (0, 0): (541) INTERNAL_ERROR",
To confirm that the stream coordinator is unhealthy, run:
rabbitmq-diagnostics coordinator_status
With a healthy coordinator cluster, the command will print all cluster nodes with exactly one of them
being in “Raft State” leader. If it returns an error or does not terminate, the coordinator cluster is broken and needs to be rebuilt.
Single noproc members will not cause stream operations to fail as long as there is still a leader, even though a quorum loss may be more likely in that case. If the coordinator is functional, but
you still see the error message above, read the next section.
The probably easiest and most reliable way to achieve a restart of the coordinator is by resetting the coordinator process on all nodes and triggering the coordinator start by declaring a nonexistent stream queue.
nodes=$(rabbitmqadmin list nodes -f bash -u "${RABBITMQ_DEFAULT_USER}" -p "${RABBITMQ_DEFAULT_PASS}")
reset_command=""
for node in $nodes; do
reset_command+="ra:force_delete_server(coordination, {rabbit_stream_coordinator, '${node}'}),"
done
reset_command="${reset_command%?}."
rabbitmqctl eval "${reset_command}"
queue_name="dont-mind-me-just-triggering-a-coordinator-start"
rabbitmqadmin declare queue name="${queue_name}" durable=true auto_delete=false arguments='{"x-queue-type": "stream"}' -u "${RABBITMQ_DEFAULT_USER}" -p "${RABBITMQ_DEFAULT_PASS}"
rabbitmqctl delete_queue "${queue_name}"
Again, but this time confirm that the coordinator is running:
rabbitmq-diagnostics coordinator_status
If the coordinator is still unhealthy, we recommend to rebuild the entire RabbitMQ cluster [2].
Otherwise, you should confirm that you see no more Failed to consume message from queue messages inside the logs. If this is still the case keep reading the next section.
Resolve “stream_not_found” errors¶
This problem displays the same Failed to consume message from queue OpenStack logs as they appear in case of a dysfunctional stream coordinator, but the following messages can also be encountered in the RabbitMQ logs:
errorContext: child_terminated
reason: {{stream_not_found,
{resource,<<"/">>,queue,<<"barbican.workers_fanout">>}},
If you already tried to delete the queue using rabbitmqctl delete_queue, you might have noticed that this does not work.
In that case, using an internal delete call should still be functional and delete the queue data on all nodes. Afterwards, we will have to recreate the queue
along with its exchange and binding manually:
stream="insert-your-queue-name-here"
if [[ ${stream} != *fanout ]]; then
echo "Error: The recreation is only supported for stream queues created by OpenStack with the suffix '_fanout.' Not doing anything."
echo "If this queue was created by OpenStack, the documentation you consulted may be out of date."
else
rabbitmqctl eval 'rabbit_db_queue:delete({resource,<<"/">>,queue,<<"'${stream}'">>},normal).'
rabbitmqadmin declare queue name="${stream}" durable=true auto_delete=false arguments='{"x-queue-type": "stream"}' -u "${RABBITMQ_DEFAULT_USER}" -p "${RABBITMQ_DEFAULT_PASS}"
rabbitmqadmin declare exchange name="${stream}" type=fanout durable=true auto_delete=true -u "${RABBITMQ_DEFAULT_USER}" -p "${RABBITMQ_DEFAULT_PASS}"
rabbitmqadmin declare binding source="${stream}" destination="${stream}" routing_key="${stream%_fanout}" -u "${RABBITMQ_DEFAULT_USER}" -p "${RABBITMQ_DEFAULT_PASS}"
fi
If you want to recreate all stream queues, you can run:
streams=$(rabbitmqctl eval "Q=rabbit_db_queue:list()."| cut -d "\"" -f 4 | grep "fanout$")
for stream in $streams; do
if [[ ${stream} != *fanout ]]; then
echo "Error: The recreation is only supported for stream queues created by OpenStack with the suffix '_fanout.' Skipping queue '${stream}'."
echo "If this queue was created by OpenStack, the documentation you consulted may be out of date."
else
rabbitmqctl eval 'rabbit_db_queue:delete({resource,<<"/">>,queue,<<"'${stream}'">>},normal).'
rabbitmqadmin declare queue name="${stream}" durable=true auto_delete=false arguments='{"x-queue-type": "stream"}' -u "${RABBITMQ_DEFAULT_USER}" -p "${RABBITMQ_DEFAULT_PASS}"
rabbitmqadmin declare exchange name="${stream}" type=fanout durable=true auto_delete=true -u "${RABBITMQ_DEFAULT_USER}" -p "${RABBITMQ_DEFAULT_PASS}"
rabbitmqadmin declare binding source="${stream}" destination="${stream}" routing_key="${stream%_fanout}" -u "${RABBITMQ_DEFAULT_USER}" -p "${RABBITMQ_DEFAULT_PASS}"
fi
done
This problem is caused by inconsistent information inside the RabbitMQ database, e.g. the queue might still exist inside MNESIA_DURABLE_TABLE but be missing inside the MNESIA_TABLE,
there can, however, also be other causes. Because stream queues are not critical for OpenStack API operations, they can be deleted and recreated without backing up messages.