您现在的位置是:首页 > 编程 > 

dotet中通过依赖注入DI来管理RabbitMq.Client7.0的生命周期

2025-07-27 02:10:47
dotet中通过依赖注入DI来管理RabbitMq.Client7.0的生命周期 在 RabbitMQ.Client 7.0.0 版本中, IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel, IChannel 不再提供 CreateBasicProperties 方法。需要直接使用 BasicPro

dotet中通过依赖注入DI来管理RabbitMq.Client7.0的生命周期

在 RabbitMQ.Client 7.0.0 版本中, IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel, IChannel 不再提供 CreateBasicProperties 方法。需要直接使用 BasicProperties 类来创建消息属性。

前言

关于RabbitMq的更多知识点在:

下面是通过依赖注入(DI)来管理RabbitMQ客户端的生命周期

1. 安装RabbitMQ客户端库

首先,你需要安装RabbitMQ的.ET客户端库。这可以通过uGet包管理器来完成:

代码语言:javascript代码运行次数:0运行复制
Install-Package RabbitMQ.Client

2. 配置RabbitMQ连接字符串

在你的appsettings.json文件中,添加RabbitMQ的连接配置:

代码语言:javascript代码运行次数:0运行复制
{
  "RabbitMQ": {
    "Hostame": "localhost",
    "Port": 5672,
    "Userame": "guest",
    "Password": "guest"
  }
}

. 创建RabbitMQ服务配置类

创建一个配置类来封装RabbitMQ的连接信息:

代码语言:javascript代码运行次数:0运行复制
public class RabbitMQOpti
{
    public string Hostame { get; set; }
    public int Port { get; set; }
    public string Userame { get; set; }
    public string Password { get; set; }
}

4. 注册RabbitMQ服务

或程序启动时的配置方法中,注册RabbitMQ服务:

代码语言:javascript代码运行次数:0运行复制
// 绑定RabbitMQ配置
builder.Services.Configure<RabbitMQOpti>(builder.Configuration.GetSection("RabbitMQ"));

// 注册RabbitMQ连接工厂
builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>(sp =>
{
    var opti = sp.GetRequiredService<IOpti<RabbitMQOpti>>().Value;
    var factory = new ConnectionFactory() { Hostame = opti.Hostame, Port = opti.Port, Userame = opti.Userame, Password = opti.Password };
    return new RabbitMQConnection(factory);
}); 

// 添加RabbitMQService的服务注册
builder.Services.AddSingleton<RabbitMQService>();

5. 创建RabbitMQ连接和通道工厂

创建一个工厂类来管理RabbitMQ的连接和通道:

代码语言:javascript代码运行次数:0运行复制
    public interfaceIRabbitMQConnection : IDisposable
    {
        Task<IChannel> CreateChannel();
    }

    publicclassRabbitMQConnection : IRabbitMQConnection
    {
        privatereadonly ConnectionFactory _factory;
        privatereadonly IConnection _connection;
        privatebool _isDisposed;

        public RabbitMQConnection(ConnectionFactory factory)
        {
            _factory = factory ?? thrownew ArgumentullException(nameof(factory));
            _connection = factory.CreateConnectionAsync().Result;
        }

        public async Task<IChannel> CreateChannel()
        {
            EnsureotDisposed();
            returnawait _connection.CreateChannelAsync();
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (_isDisposed) return;

            if (disposing)
            {
                // Free any other managed objects here.
            }

            // Free any unmanaged objects here.
            _connection.Dispose();

            _isDisposed = true;
        }

        ~RabbitMQConnection()
        {
            Dispose(false);
        }

        private void EnsureotDisposed()
        {
            if (_isDisposed)
            {
                thrownew ObjectDisposedException(nameof(RabbitMQConnection));
            }
        }
    }

6. 使用RabbitMQ服务

在你的服务或消费者中,注入IRabbitMQConnection并使用它来创建模型(channel):

代码语言:javascript代码运行次数:0运行复制
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text.Json;
using System.Text;

publicclassRabbitMQService
{
    privatereadonly IRabbitMQConnection _connection;

    public RabbitMQService(IRabbitMQConnection connection)
    {
        _connection = connection ?? thrownew ArgumentullException(nameof(connection));
    }


    public async Task SendAsync(string exchange, string routingKey, object message, bool mandatory = false, CancellationToken cancellationToken = default)
    {
         
            try
            {
                usinar channel = _connection.CreateChannel();
                var mesjson = JsonSerializer.Serialize(message);
                Cole.WriteLine("发送消息:" + mesjson);
                var body = Encoding.UTF8.GetBytes(mesjson);
                var properties = new RabbitMQ.Client.BasicProperties
                {
                    Persistent = true// 设置消息持久化
                };
                channel.BasicPublishAsync(exchange, routingKey, false, properties, body, cancellationToken);

            }
            catch (OperationCanceledException ex)
            {
                Cole.WriteLine($"Operation was canceled: {ex.Message}");
                //throw; // Re-throw if you want to propagate the cancellation
            }
            catch (Exception ex)
            {
                Cole.WriteLine($"An error occurred: {ex.Message}");
                //throw; // Re-throw if you want to propagate the error
            }  
    }

    public async Task ReceiveAsync(string queueame, Func<IChannel, byte[], Task> callback, CancellationToken cancellationToken = default)
    {
        var channel = _connection.CreateChannel();
        await channel.QueueDeclareAsync(queue: queueame, durable: true, exclusive: false, autoDelete: false, arguments: null);

        var cumer = new AsyncEventingBasicCumer(channel);
        cumer.ReceivedAsync += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            try
            {
                // 直接传递 model 和 body 给 callback,不需要转换
                await callback(channel, body);
            }

            finally
            {
                //await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
            }
        };
        await channel.BasicCumeAsync(queue: queueame, autoAck: false, cumer: cumer, cancellationToken: cancellationToken);
        // Prevent the method from returning immediately
        await Task.Delay(-1, cancellationToken);
    }
}

7.生产端和消费端的使用

消费
代码语言:javascript代码运行次数:0运行复制
var app = builder.Build();


var rabbitMQService = app.Services.GetRequiredService<RabbitMQService>();
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;

// 启动消息接收
var receiveTask = rabbitMQService.ReceiveAsync("Test", async (channel, body) =>
{
    // 处理接收到的消息
    //string message = Encoding.UTF8.GetString(body);
    //Cole.WriteLine($"收到消息 message: {message}");
    //// 确认消息
    //await channel.BasicAckAsync(deliveryTag: default, multiple: false, cancellationToken);

}, cancellationToken);
生产端
本文参与 腾讯云自媒体同步曝光计划,分享自。原始发表:2025-01-08,如有侵权请联系 cloudcommunity@tencent 删除管理配置生命周期依赖注入rabbitmq

#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格

本文地址:http://www.dnpztj.cn/biancheng/1190960.html

相关标签:无
上传时间: 2025-07-22 21:40:35
留言与评论(共有 10 条评论)
本站网友 孕妇孕期知识
11分钟前 发表
你需要安装RabbitMQ的.ET客户端库
本站网友 别和自己过不去
2分钟前 发表
false
本站网友 佛山大众
12分钟前 发表
{ex.Message}"); //throw; // Re-throw if you want to propagate the cancellation } catch (Exception ex) { Cole.WriteLine($"An error occurred
本站网友 后入体位
19分钟前 发表
你需要安装RabbitMQ的.ET客户端库
本站网友 神书
14分钟前 发表
{message}"); //// 确认消息 //await channel.BasicAckAsync(deliveryTag
本站网友 血蛮
27分钟前 发表
false
本站网友 吸奶器的作用
18分钟前 发表
false
本站网友 搜索服务
25分钟前 发表
IDisposable { Task<IChannel> CreateChannel(); } publicclassRabbitMQConnection
本站网友 鹿茸草
21分钟前 发表
false