您现在的位置是:首页 > 编程 > 

et使用EasyetQ简化与RabbitMQ的交互

2025-07-25 10:14:48
et使用EasyetQ简化与RabbitMQ的交互 EasyetQ是一个为.ET环境设计的RabbitMQ客户端API,旨在简化与RabbitMQ的交互。关于RabbitMq的更多知识点在:安装EasyetQ你可以通过uGet包管理器来安装EasyetQ。在Package Manager Cole中运行以下命令:代码语言:javascript代码运行次数:0运行复制PM>

et使用EasyetQ简化与RabbitMQ的交互

EasyetQ是一个为.ET环境设计的RabbitMQ客户端API,旨在简化与RabbitMQ的交互。

关于RabbitMq的更多知识点在:

安装EasyetQ

你可以通过uGet包管理器来安装EasyetQ。在Package Manager Cole中运行以下命令:

代码语言:javascript代码运行次数:0运行复制
PM> Install-Package EasyetQ

这将同时安装EasyetQ和其依赖的RabbitMQ.Client库。 建议使用DI安装,EasyetQ.DI.Microsof包含EasyetQ,同时依赖ewtoft.Json

代码语言:javascript代码运行次数:0运行复制
<PackageReference Include="EasyetQ.DI.Microsoft" Version="7.8.0" /> 
<PackageReference Include="ewtoft.Json" Version="1.0." />

注册连接RabbitMQ

代码语言:javascript代码运行次数:0运行复制
var connectionString = "host=111.111.11.111;virtualHost=/;username=admin;password=12456;timeout=60";
 //链接注册
builder.Services.RegisterEasyetQ("host=8.15.70.182;virtualHost=/;username=zhaoke;password=1212;publisherConfirms=true");
//发布注册
builder.Services.AddTransient<MQPublish>();
//订阅注册
builder.Services.AddTransient<MQSubscribe>(); 
//添加消息处理
builder.Services.AddHostedService<SubscribeWorker>();

发布消息

EasyetQ支持发布/订阅模式,你可以通过创建一个.ET类来定义消息,然后使用Publish方法发布消息。例如:

代码语言:javascript代码运行次数:0运行复制
public class TextMessage
{
    public string Text { get; set; }
}

bus.Publish(new TextMessage { Text = "Hello World" });

EasyetQ会根据消息类型自动创建交换机和队列,并使用ewtoft.Json序列化消息为JSO格式。

MQPublish的封装
代码语言:javascript代码运行次数:0运行复制
using EasyetQ.Topology;
using EasyetQ;

/// <summary>
/// 发布消息
/// </summary>
public class MQPublish
{
    private readonly IBus bus;

    public MQPublish(IBus bus)
    {
        this.bus = bus;
    }
    /// <summary>
    /// 发布消息
    /// </summary>
    /// <param name="routingKey"></param>
    /// <param name="data"></param>
    public async Task PublishMessageAsync(string routingKey, object data)
    {
        Cole.WriteLine($"MQ消息推送,routingKey :{routingKey} , 推送数据 :{System.Text.Json.JsonSerializer.Serialize(data)}");

        var message = new Message<object>(data);
        var advancedBus = bus.Advanced;
        advancedBus.QueueDeclare(routingKey);
        await advancedBus.PublishAsync(Exchange.Default, routingKey, false, message);
    }
}

订阅消息

订阅消息时,你需要指定一个订阅ID和一个处理消息的委托。

代码语言:javascript代码运行次数:0运行复制
bus.Subscribe<TextMessage>("subscriptionId", message =>
{
    Cole.WriteLine("Received message: " + message.Text);
});

当有消息发布到对应的交换机和队列时,你的订阅就会收到消息。

封装MQSubscribe
代码语言:javascript代码运行次数:0运行复制
public class MQSubscribe
{
    // MQ消息总线
    private readonly IBus bus;
    public MQSubscribe(IBus bus)
    {
        this.bus = bus;
    }

    /// <summary>
    /// 处理消息的总入口
    /// </summary>
    /// <returns></returns>
    public Task Init()
    { 
        SubscribeTSysLogVis();
        //程序不结束,等待输入
        Cole.WriteLine($"已启动(处理消息) {DateTime.Utcow}");

        return Task.CompletedTask;
    }

    private Task SubscribeTSysLogVis()
    {
        var advancedBus = bus.Advanced;
        //订阅TSysLogVis日志 - 请不要在两次发布之间重复使用它
        var queue = advancedBus.QueueDeclare("TSysLogVis");

        advancedBus.Cume(queue, async (body, properties, info) =>
        {
            try
            {
                var message = Encoding.UTF8.GetString(body.ToArray());
                //var data = JsonConvert.DeserializeObject<TSysLogVis>(message);
                Cole.WriteLine($"消息处理 {} : {message}");
                //db.Insertable(data).SplitTable().ExecuteReturnSnowflakeId();
            }
            catch (Exception ex)
            {
                // 处理异常,例如记录日志或重新抛出
                Cole.Error.WriteLine($"处理消息时发生异常: {ex}");
            }
        }); 
        return Task.CompletedTask;
    }
}

SubscribeWorker

启用订阅服务即可

代码语言:javascript代码运行次数:0运行复制
    public classSubscribeWorker : BackgroundService
    {
        privatereadonly MQSubscribe _Service;

        public SubscribeWorker(MQSubscribe  Service)
        {
            _Service = Service;
        }

        // 执行逻辑
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            await _Service.Init(); 
        }
    }
本文参与 腾讯云自媒体同步曝光计划,分享自。原始发表:2025-01-09,如有侵权请联系 cloudcommunity@tencent 删除交换机日志异常rabbitmq队列

#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格

本文地址:http://www.dnpztj.cn/biancheng/1187156.html

相关标签:无
上传时间: 2025-07-22 13:39:41
留言与评论(共有 6 条评论)
本站网友 西班牙人赛程
29分钟前 发表
原始发表:2025-01-09
本站网友 铜价走势图
18分钟前 发表
推送数据 :{System.Text.Json.JsonSerializer.Serialize(data)}"); var message = new Message<object>(data); var advancedBus = bus.Advanced; advancedBus.QueueDeclare(routingKey); await advancedBus.PublishAsync(Exchange.Default
本站网友 和玺
0秒前 发表
等待输入 Cole.WriteLine($"已启动(处理消息) {DateTime.Utcow}"); return Task.CompletedTask; } private Task SubscribeTSysLogVis() { var advancedBus = bus.Advanced; //订阅TSysLogVis日志 - 请不要在两次发布之间重复使用它 var queue = advancedBus.QueueDeclare("TSysLogVis"); advancedBus.Cume(queue
本站网友 orfila
22分钟前 发表
message); } }订阅消息订阅消息时
本站网友 小书房
17分钟前 发表
旨在简化与RabbitMQ的交互