dotet中通过依赖注入DI来管理RabbitMq.Client7.0的生命周期
dotet中通过依赖注入DI来管理RabbitMq.Client7.0的生命周期
在 RabbitMQ.Client 7.0.0 版本中, IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel, IChannel 不再提供 CreateBasicProperties 方法。需要直接使用 BasicPro
dotet中通过依赖注入DI来管理RabbitMq.Client7.0的生命周期
在 RabbitMQ.Client 7.0.0 版本中, IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel, IChannel 不再提供 CreateBasicProperties 方法。需要直接使用 BasicProperties 类来创建消息属性。
前言
关于RabbitMq的更多知识点在:
下面是通过依赖注入(DI)来管理RabbitMQ客户端的生命周期
1. 安装RabbitMQ客户端库
首先,你需要安装RabbitMQ的.ET客户端库。这可以通过uGet包管理器来完成:
代码语言:javascript代码运行次数:0运行复制Install-Package RabbitMQ.Client
2. 配置RabbitMQ连接字符串
在你的appsettings.json
文件中,添加RabbitMQ的连接配置:
{
"RabbitMQ": {
"Hostame": "localhost",
"Port": 5672,
"Userame": "guest",
"Password": "guest"
}
}
. 创建RabbitMQ服务配置类
创建一个配置类来封装RabbitMQ的连接信息:
代码语言:javascript代码运行次数:0运行复制public class RabbitMQOpti
{
public string Hostame { get; set; }
public int Port { get; set; }
public string Userame { get; set; }
public string Password { get; set; }
}
4. 注册RabbitMQ服务
在或程序启动时的配置方法中,注册RabbitMQ服务:
// 绑定RabbitMQ配置
builder.Services.Configure<RabbitMQOpti>(builder.Configuration.GetSection("RabbitMQ"));
// 注册RabbitMQ连接工厂
builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>(sp =>
{
var opti = sp.GetRequiredService<IOpti<RabbitMQOpti>>().Value;
var factory = new ConnectionFactory() { Hostame = opti.Hostame, Port = opti.Port, Userame = opti.Userame, Password = opti.Password };
return new RabbitMQConnection(factory);
});
// 添加RabbitMQService的服务注册
builder.Services.AddSingleton<RabbitMQService>();
5. 创建RabbitMQ连接和通道工厂
创建一个工厂类来管理RabbitMQ的连接和通道:
代码语言:javascript代码运行次数:0运行复制 public interfaceIRabbitMQConnection : IDisposable
{
Task<IChannel> CreateChannel();
}
publicclassRabbitMQConnection : IRabbitMQConnection
{
privatereadonly ConnectionFactory _factory;
privatereadonly IConnection _connection;
privatebool _isDisposed;
public RabbitMQConnection(ConnectionFactory factory)
{
_factory = factory ?? thrownew ArgumentullException(nameof(factory));
_connection = factory.CreateConnectionAsync().Result;
}
public async Task<IChannel> CreateChannel()
{
EnsureotDisposed();
returnawait _connection.CreateChannelAsync();
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_isDisposed) return;
if (disposing)
{
// Free any other managed objects here.
}
// Free any unmanaged objects here.
_connection.Dispose();
_isDisposed = true;
}
~RabbitMQConnection()
{
Dispose(false);
}
private void EnsureotDisposed()
{
if (_isDisposed)
{
thrownew ObjectDisposedException(nameof(RabbitMQConnection));
}
}
}
6. 使用RabbitMQ服务
在你的服务或消费者中,注入IRabbitMQConnection
并使用它来创建模型(channel):
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text.Json;
using System.Text;
publicclassRabbitMQService
{
privatereadonly IRabbitMQConnection _connection;
public RabbitMQService(IRabbitMQConnection connection)
{
_connection = connection ?? thrownew ArgumentullException(nameof(connection));
}
public async Task SendAsync(string exchange, string routingKey, object message, bool mandatory = false, CancellationToken cancellationToken = default)
{
try
{
usinar channel = _connection.CreateChannel();
var mesjson = JsonSerializer.Serialize(message);
Cole.WriteLine("发送消息:" + mesjson);
var body = Encoding.UTF8.GetBytes(mesjson);
var properties = new RabbitMQ.Client.BasicProperties
{
Persistent = true// 设置消息持久化
};
channel.BasicPublishAsync(exchange, routingKey, false, properties, body, cancellationToken);
}
catch (OperationCanceledException ex)
{
Cole.WriteLine($"Operation was canceled: {ex.Message}");
//throw; // Re-throw if you want to propagate the cancellation
}
catch (Exception ex)
{
Cole.WriteLine($"An error occurred: {ex.Message}");
//throw; // Re-throw if you want to propagate the error
}
}
public async Task ReceiveAsync(string queueame, Func<IChannel, byte[], Task> callback, CancellationToken cancellationToken = default)
{
var channel = _connection.CreateChannel();
await channel.QueueDeclareAsync(queue: queueame, durable: true, exclusive: false, autoDelete: false, arguments: null);
var cumer = new AsyncEventingBasicCumer(channel);
cumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray();
try
{
// 直接传递 model 和 body 给 callback,不需要转换
await callback(channel, body);
}
finally
{
//await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
}
};
await channel.BasicCumeAsync(queue: queueame, autoAck: false, cumer: cumer, cancellationToken: cancellationToken);
// Prevent the method from returning immediately
await Task.Delay(-1, cancellationToken);
}
}
7.生产端和消费端的使用
消费
代码语言:javascript代码运行次数:0运行复制var app = builder.Build();
var rabbitMQService = app.Services.GetRequiredService<RabbitMQService>();
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
// 启动消息接收
var receiveTask = rabbitMQService.ReceiveAsync("Test", async (channel, body) =>
{
// 处理接收到的消息
//string message = Encoding.UTF8.GetString(body);
//Cole.WriteLine($"收到消息 message: {message}");
//// 确认消息
//await channel.BasicAckAsync(deliveryTag: default, multiple: false, cancellationToken);
}, cancellationToken);
生产端
#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格
上传时间: 2025-07-22 21:40:35
推荐阅读
留言与评论(共有 10 条评论) |
本站网友 孕妇孕期知识 | 11分钟前 发表 |
你需要安装RabbitMQ的.ET客户端库 | |
本站网友 别和自己过不去 | 2分钟前 发表 |
false | |
本站网友 佛山大众 | 12分钟前 发表 |
{ex.Message}"); //throw; // Re-throw if you want to propagate the cancellation } catch (Exception ex) { Cole.WriteLine($"An error occurred | |
本站网友 后入体位 | 19分钟前 发表 |
你需要安装RabbitMQ的.ET客户端库 | |
本站网友 神书 | 14分钟前 发表 |
{message}"); //// 确认消息 //await channel.BasicAckAsync(deliveryTag | |
本站网友 血蛮 | 27分钟前 发表 |
false | |
本站网友 吸奶器的作用 | 18分钟前 发表 |
false | |
本站网友 搜索服务 | 25分钟前 发表 |
IDisposable { Task<IChannel> CreateChannel(); } publicclassRabbitMQConnection | |
本站网友 鹿茸草 | 21分钟前 发表 |
false |