docker启动rabbitmq以及使用方式详解

2023-12-01 0 230
目录
  • 搜索rabbitmq镜像
  • 下载镜像
  • 启动容器
  • 打印容器
  • 访问RabbitMQ Management
  • 编写生产者类
  • 消费者
  • 工作队列
    • RabbitMqUtils工具类
    • 启动2个工作线程
    • 启动发送线程
  • 消息应答机制
    • 生产者
    • 消费者
    • 模拟
    • 不公平分发
  • 总结

    搜索rabbitmq镜像

    docker search rabbitmq:management

    docker启动rabbitmq以及使用方式详解

    下载镜像

    docker pull rabbitmq:management

    docker启动rabbitmq以及使用方式详解

    启动容器

    docker run -d –hostname localhost –name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

    docker启动rabbitmq以及使用方式详解

    docker启动rabbitmq以及使用方式详解

    打印容器

    docker logs rabbitmq

    docker启动rabbitmq以及使用方式详解

    docker启动rabbitmq以及使用方式详解

    访问RabbitMQ Management

    http://localhost:15672

    账户密码默认:guest

    docker启动rabbitmq以及使用方式详解

    编写生产者类

    package com.xun.rabbitmqdemo.example;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Producer {
    private final static String QUEUE_NAME = \”hello\”;
    public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(\”guest\”);
    factory.setPassword(\”guest\”);
    factory.setHost(\”localhost\”);
    factory.setPort(5672);
    factory.setVirtualHost(\”/\”);

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();
    /**
    * 生成一个queue队列
    * 1、队列名称 QUEUE_NAME
    * 2、队列里面的消息是否持久化(默认消息存储在内存中)
    * 3、该队列是否只供一个Consumer消费 是否共享 设置为true可以多个消费者消费
    * 4、是否自动删除 最后一个消费者断开连接后 该队列是否自动删除
    * 5、其他参数
    */
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    String message = \”Hello world!\”;
    /**
    * 发送一个消息
    * 1、发送到哪个exchange交换机
    * 2、路由的key
    * 3、其他的参数信息
    * 4、消息体
    */
    channel.basicPublish(\”\”,QUEUE_NAME,null,message.getBytes());
    System.out.println(\” [x] Sent \’\”+message+\”\’\”);

    channel.close();
    connection.close();
    }
    }

    运行该方法,可以看到控制台的打印

    docker启动rabbitmq以及使用方式详解

    name=hello的队列收到Message

    docker启动rabbitmq以及使用方式详解

    消费者

    package com.xun.rabbitmqdemo.example;

    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Receiver {
    private final static String QUEUE_NAME = \”hello\”;
    public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(\”guest\”);
    factory.setPassword(\”guest\”);
    factory.setHost(\”localhost\”);
    factory.setPort(5672);
    factory.setVirtualHost(\”/\”);
    factory.setConnectionTimeout(600000);//milliseconds
    factory.setRequestedHeartbeat(60);//seconds
    factory.setHandshakeTimeout(6000);//milliseconds
    factory.setRequestedChannelMax(5);
    factory.setNetworkRecoveryInterval(500);

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    System.out.println(\”Waiting for messages. \”);

    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
    String message = new String(body, \”UTF-8\”);
    System.out.println(\” [x] Received \’\” + message + \”\’\”);
    }
    };
    channel.basicConsume(QUEUE_NAME,true,consumer);
    }
    }

    docker启动rabbitmq以及使用方式详解

    docker启动rabbitmq以及使用方式详解

    工作队列

    RabbitMqUtils工具类

    package com.xun.rabbitmqdemo.utils;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    public class RabbitMqUtils {
    public static Channel getChannel() throws Exception{
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(\”localhost\”);
    factory.setUsername(\”guest\”);
    factory.setPassword(\”guest\”);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    return channel;
    }
    }

    启动2个工作线程

    package com.xun.rabbitmqdemo.workQueue;

    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

    public class Work01 {
    private static final String QUEUE_NAME = \”hello\”;
    public static void main(String[] args) throws Exception{
    Channel channel = RabbitMqUtils.getChannel();
    DeliverCallback deliverCallback = (consumerTag,delivery)->{
    String receivedMessage = new String(delivery.getBody());
    System.out.println(\”接收消息:\”+receivedMessage);
    };
    CancelCallback cancelCallback = (consumerTag)->{
    System.out.println(consumerTag+\”消费者取消消费接口回调逻辑\”);
    };
    System.out.println(\”C1 消费者启动等待消费….\”);
    /**
    * 消费者消费消息
    * 1、消费哪个队列
    * 2、消费成功后是否自动应答
    * 3、消费的接口回调
    * 4、消费未成功的接口回调
    */
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
    }

    package com.xun.rabbitmqdemo.workQueue;

    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

    public class Work02 {
    private static final String QUEUE_NAME = \”hello\”;
    public static void main(String[] args) throws Exception{
    Channel channel = RabbitMqUtils.getChannel();
    DeliverCallback deliverCallback = (consumerTag,delivery)->{
    String receivedMessage = new String(delivery.getBody());
    System.out.println(\”接收消息:\”+receivedMessage);
    };
    CancelCallback cancelCallback = (consumerTag)->{
    System.out.println(consumerTag+\”消费者取消消费接口回调逻辑\”);
    };
    System.out.println(\”C2 消费者启动等待消费….\”);
    channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
    }

    启动工作线程

    docker启动rabbitmq以及使用方式详解

    启动发送线程

    package com.xun.rabbitmqdemo.workQueue;

    import com.rabbitmq.client.Channel;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import java.util.Scanner;

    public class Task01 {
    private static final String QUEUE_NAME = \”hello\”;
    public static void main(String[] args) throws Exception{
    try(Channel channel= RabbitMqUtils.getChannel();){
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    //从控制台接收消息
    Scanner scanner = new Scanner(System.in);
    while(scanner.hasNext()){
    String message = scanner.next();
    channel.basicPublish(\”\”,QUEUE_NAME,null,message.getBytes());
    System.out.println(\”发送消息完成:\”+message);
    }
    }
    }
    }

    启动发送线程,此时发送线程等待键盘输入

    docker启动rabbitmq以及使用方式详解

    发送4个消息

    docker启动rabbitmq以及使用方式详解

    docker启动rabbitmq以及使用方式详解

    docker启动rabbitmq以及使用方式详解

    可以看到2个工作线程按照顺序分别接收message。

    消息应答机制

    rabbitmq将message发送给消费者后,就会将该消息标记为删除。

    但消费者在处理message过程中宕机,会导致消息的丢失。

    因此需要设置手动应答。

    生产者

    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import java.util.Scanner;

    public class Task02 {
    private static final String TASK_QUEUE_NAME = \”ack_queue\”;
    public static void main(String[] args) throws Exception{
    try(Channel channel = RabbitMqUtils.getChannel()){
    channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
    Scanner scanner = new Scanner(System.in);
    System.out.println(\”请输入信息\”);
    while(scanner.hasNext()){
    String message = scanner.nextLine();
    channel.basicPublish(\”\”,TASK_QUEUE_NAME,null,message.getBytes());
    System.out.println(\”生产者task02发出消息\”+ message);
    }
    }
    }
    }

    消费者

    package com.xun.rabbitmqdemo.workQueue;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import com.xun.rabbitmqdemo.utils.SleepUtils;

    public class Work03 {
    private static final String ACK_QUEUE_NAME = \”ack_queue\”;
    public static void main(String[] args) throws Exception{
    Channel channel = RabbitMqUtils.getChannel();
    System.out.println(\”Work03 等待接收消息处理时间较短\”);
    DeliverCallback deliverCallback = (consumerTag,delivery)->{
    String message = new String(delivery.getBody());
    SleepUtils.sleep(1);
    System.out.println(\”接收到消息:\”+message);
    /**
    * 1、消息的标记tag
    * 2、是否批量应答
    */
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    };
    CancelCallback cancelCallback = (consumerTag)->{
    System.out.println(consumerTag+\”消费者取消消费接口回调逻辑\”);
    };
    //采用手动应答
    boolean autoAck = false;
    channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
    }

    package com.xun.rabbitmqdemo.workQueue;
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
    import com.xun.rabbitmqdemo.utils.SleepUtils;

    public class Work04 {
    private static final String ACK_QUEUE_NAME = \”ack_queue\”;
    public static void main(String[] args) throws Exception{
    Channel channel = RabbitMqUtils.getChannel();
    System.out.println(\”Work04 等待接收消息处理时间较长\”);
    DeliverCallback deliverCallback = (consumerTag,delivery)->{
    String message = new String(delivery.getBody());
    SleepUtils.sleep(30);
    System.out.println(\”接收到消息:\”+message);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    };
    CancelCallback cancelCallback = (consumerTag)->{
    System.out.println(consumerTag+\”消费者取消消费接口回调逻辑\”);
    };
    //采用手动应答
    boolean autoAck = false;
    channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
    }

    工具类SleepUtils

    package com.xun.rabbitmqdemo.utils;
    public class SleepUtils {
    public static void sleep(int second){
    try{
    Thread.sleep(1000*second);
    }catch (InterruptedException _ignored){
    Thread.currentThread().interrupt();
    }
    }
    }

    模拟

    docker启动rabbitmq以及使用方式详解

    docker启动rabbitmq以及使用方式详解

    docker启动rabbitmq以及使用方式详解

    work04等待30s后发出ack

    docker启动rabbitmq以及使用方式详解

    在work04处理message时手动停止线程,可以看到message:dd被rabbitmq交给了work03

    docker启动rabbitmq以及使用方式详解

    docker启动rabbitmq以及使用方式详解

    docker启动rabbitmq以及使用方式详解

    不公平分发

    上面的轮询分发,生产者依次向消费者按顺序发送消息,但当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。不公平分发:

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);

    通过此配置,当消费者未处理完当前消息,rabbitmq会优先将该message分发给空闲消费者。

    docker启动rabbitmq以及使用方式详解

    总结

    到此这篇关于docker启动rabbitmq以及使用的文章就介绍到这了,更多相关docker启动rabbitmq及使用内容请搜索悠久资源网以前的文章或继续浏览下面的相关文章希望大家以后多多支持悠久资源网!

    收藏 (0) 打赏

    感谢您的支持,我会继续努力的!

    打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
    点赞 (0)

    悠久资源 Linux服务器 docker启动rabbitmq以及使用方式详解 https://www.u-9.cn/server/linux/1316.html

    常见问题

    相关文章

    发表评论
    暂无评论
    官方客服团队

    为您解决烦忧 - 24小时在线 专业服务