using Application.Dto; using Application.Dto.Event...
Создано: 9 февраля 2026 г.
Отвечено с использованием GPT-5.2 Thinking от Chat01
Создано: 9 февраля 2026 г.
Отвечено с использованием GPT-5.2 Thinking от Chat01
using Application.Dto;
using Application.Dto.Events;
using Application.Interfaces.External;
using Infrastructure.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using System.Text;
using System.Text.Json;
namespace Infrastructure.RabbitMQ
{
public sealed class RabbitMqUserCreatedEventPublisher : IMessageQueueService, IAsyncDisposable
{
private readonly RabbitMqOptions _options;
private readonly ConnectionFactory _factory;
textprivate IConnection? _connection; private readonly SemaphoreSlim _connectionLock = new(1, 1); public RabbitMqUserCreatedEventPublisher(IOptions<RabbitMqOptions> options) { _options = options.Value; _factory = new ConnectionFactory { HostName = _options.HostName, Port = _options.Port, UserName = _options.UserName, Password = _options.Password, AutomaticRecoveryEnabled = true }; } private async Task<IConnection> GetConnectionAsync(CancellationToken token) { if (_connection != null && _connection.IsOpen) return _connection; await _connectionLock.WaitAsync(token); try { if (_connection != null && _connection.IsOpen) return _connection; _connection = await _factory.CreateConnectionAsync(token); return _connection; } finally { _connectionLock.Release(); } } public async Task<EventPublishResult> PublishUserCreatedAsync(Guid userId, CancellationToken cancellationToken = default) { var messageId = Guid.NewGuid().ToString("N"); try { cancellationToken.ThrowIfCancellationRequested(); var connection = await GetConnectionAsync(cancellationToken); var channelOpts = new CreateChannelOptions( publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true ); await using var channel = await connection.CreateChannelAsync(channelOpts, cancellationToken); await DeclareTopologyAsync(channel, cancellationToken); var evt = new UserCreatedEvent { UserId = userId, OccurredAt = DateTimeOffset.UtcNow }; var json = JsonSerializer.Serialize(new { MessageId = messageId, Event = evt }); var body = Encoding.UTF8.GetBytes(json); var props = new BasicProperties { Persistent = true, ContentType = "application/json", MessageId = messageId, CorrelationId = userId.ToString() }; await channel.BasicPublishAsync( exchange: _options.UserCreatedExchange, routingKey: _options.UserCreatedRoutingKey, mandatory: true, basicProperties: props, body: body, cancellationToken: cancellationToken ); return EventPublishResult.Success(messageId); } catch (PublishException ex) when (ex.IsReturn) { return EventPublishResult.Failure( $"Unroutable (returned). SeqNo={ex.PublishSequenceNumber}.", messageId ); } catch (PublishException ex) { return EventPublishResult.Failure( $"Not confirmed by broker. SeqNo={ex.PublishSequenceNumber}.", messageId ); } catch (OperationCanceledException) { return EventPublishResult.Failure("Publish cancelled."); } catch (Exception ex) { return EventPublishResult.Failure($"RabbitMQ Error: {ex.Message}"); } } private async Task DeclareTopologyAsync(IChannel channel, CancellationToken token) { await channel.ExchangeDeclareAsync( exchange: _options.UserCreatedExchange, type: ExchangeType.Direct, durable: true, autoDelete: false, cancellationToken: token ); await channel.QueueDeclareAsync( queue: _options.UserCreatedQueue, durable: true, exclusive: false, autoDelete: false, cancellationToken: token ); await channel.QueueBindAsync( queue: _options.UserCreatedQueue, exchange: _options.UserCreatedExchange, routingKey: _options.UserCreatedRoutingKey, cancellationToken: token ); } public async ValueTask DisposeAsync() { if (_connection != null) { try { await _connection.CloseAsync(); await _connection.DisposeAsync(); } catch { } } _connectionLock.Dispose(); } }
} это код для отправки guid в rabbitmq
вот код для получения id из rabbitmq
using Application.Dto.Events;
using Application.Interfaces.Services;
using Infrastructure.DataBase.PostgreSQL;
using Infrastructure.RabbitMQ.Options;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Text.Json;
using WorkerCodeStarts.Messaging.RabbitMq;
namespace WorkerCodeStarts.Features.CodeExecution.Consumers;
public sealed class CodeExecutionRequestedConsumer : BackgroundService
{
private readonly RabbitMqOptions _opts;
private readonly RabbitMqConnection _rmq;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<CodeExecutionRequestedConsumer> _logger;
textprivate IChannel? _channel; private static readonly JsonSerializerOptions JsonOpts = new() { PropertyNameCaseInsensitive = true }; public CodeExecutionRequestedConsumer(IOptions<RabbitMqOptions> options, RabbitMqConnection rmq, IServiceScopeFactory scopeFactory, ILogger<CodeExecutionRequestedConsumer> logger) { _opts = options.Value; _rmq = rmq; _scopeFactory = scopeFactory; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { // 1) connect + channel var conn = await _rmq.GetAsync(stoppingToken); _channel = await conn.CreateChannelAsync(cancellationToken: stoppingToken); // 2) topology (идемпотентно, можно безопасно вызывать в консьюмере) await DeclareTopologyAsync(_channel, stoppingToken); // 3) ограничим “сколько сообщений в работе одновременно” await _channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false, cancellationToken: stoppingToken); var consumer = new AsyncEventingBasicConsumer(_channel); consumer.ReceivedAsync += OnMessageAsync; // В v7 это extension-метод BasicConsumeAsync await _channel.BasicConsumeAsync( queue: _opts.UserCreatedQueue, autoAck: false, consumer: consumer, cancellationToken: stoppingToken); _logger.LogInformation("Rabbit consumer started. Queue={Queue}", _opts.UserCreatedQueue); // держим сервис живым await Task.Delay(Timeout.Infinite, stoppingToken); } private async Task OnMessageAsync(object sender, BasicDeliverEventArgs ea) { if (_channel == null) return; var ct = _stoppingToken; var tag = ea.DeliveryTag; try { var text = Encoding.UTF8.GetString(ea.Body.ToArray()).Trim(); if (!Guid.TryParse(text, out var userId) || userId == Guid.Empty) { _logger.LogWarning("Bad message (not a Guid). Body={Body}", text); await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; } await using var scope = _scopeFactory.CreateAsyncScope(); var service = scope.ServiceProvider.GetRequiredService<IWorkUserCreationService>(); await service.CreateFromUserAsync(userId, ct); await _channel.BasicAckAsync(tag, multiple: false, cancellationToken: ct); } catch (Exception ex) { _logger.LogError(ex, "Error while processing message. DeliveryTag={Tag}", tag); await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); } } private async Task DeclareTopologyAsync(IChannel ch, CancellationToken ct) { await ch.ExchangeDeclareAsync( exchange: _opts.UserCreatedExchange, type: ExchangeType.Direct, durable: true, autoDelete: false, cancellationToken: ct); await ch.QueueDeclareAsync( queue: _opts.UserCreatedQueue, durable: true, exclusive: false, autoDelete: false, cancellationToken: ct); await ch.QueueBindAsync( queue: _opts.UserCreatedQueue, exchange: _opts.UserCreatedExchange, routingKey: _opts.UserCreatedRoutingKey, cancellationToken: ct); } public override async Task StopAsync(CancellationToken cancellationToken) { try { if (_channel != null) { await _channel.CloseAsync(cancellationToken); await _channel.DisposeAsync(); } } catch { /* ignore */ } await base.StopAsync(cancellationToken); }
} мне нужно, чтобы код, который получает id соответствовал отправляющему
У тебя отправитель не шлёт “просто Guid строкой” — он шлёт JSON вида:
json{ "MessageId": "...", "Event": { "UserId": "...", "OccurredAt": "..." } }
А консьюмер сейчас делает Guid.TryParse(text) и ожидает, что text — это чистый GUID. Поэтому надо либо:
Event.UserId из JSON (то, что ты хочешь),Ниже — вариант (1) консьюмер соответствует твоему отправителю (плюс я поправил _stoppingToken, который у тебя в коде не объявлен).
csharpusing Application.Dto.Events; using Application.Interfaces.Services; using Infrastructure.RabbitMQ.Options; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Text.Json; using WorkerCodeStarts.Messaging.RabbitMq; namespace WorkerCodeStarts.Features.CodeExecution.Consumers; public sealed class CodeExecutionRequestedConsumer : BackgroundService { private readonly RabbitMqOptions _opts; private readonly RabbitMqConnection _rmq; private readonly IServiceScopeFactory _scopeFactory; private readonly ILogger<CodeExecutionRequestedConsumer> _logger; private IChannel? _channel; private CancellationToken _stoppingToken; private static readonly JsonSerializerOptions JsonOpts = new() { PropertyNameCaseInsensitive = true }; // Envelope соответствует: JsonSerializer.Serialize(new { MessageId = ..., Event = evt }) private sealed class Envelope<T> { public string? MessageId { get; set; } public T? Event { get; set; } } public CodeExecutionRequestedConsumer( IOptions<RabbitMqOptions> options, RabbitMqConnection rmq, IServiceScopeFactory scopeFactory, ILogger<CodeExecutionRequestedConsumer> logger) { _opts = options.Value; _rmq = rmq; _scopeFactory = scopeFactory; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _stoppingToken = stoppingToken; var conn = await _rmq.GetAsync(stoppingToken); _channel = await conn.CreateChannelAsync(cancellationToken: stoppingToken); await DeclareTopologyAsync(_channel, stoppingToken); await _channel.BasicQosAsync( prefetchSize: 0, prefetchCount: 1, global: false, cancellationToken: stoppingToken); var consumer = new AsyncEventingBasicConsumer(_channel); consumer.ReceivedAsync += OnMessageAsync; await _channel.BasicConsumeAsync( queue: _opts.UserCreatedQueue, autoAck: false, consumer: consumer, cancellationToken: stoppingToken); _logger.LogInformation("Rabbit consumer started. Queue={Queue}", _opts.UserCreatedQueue); await Task.Delay(Timeout.Infinite, stoppingToken); } private async Task OnMessageAsync(object sender, BasicDeliverEventArgs ea) { if (_channel == null) return; var ct = _stoppingToken; var tag = ea.DeliveryTag; try { // 1) пробуем распарсить JSON envelope как у паблишера var json = Encoding.UTF8.GetString(ea.Body.ToArray()); Envelope<UserCreatedEvent>? envelope = null; try { envelope = JsonSerializer.Deserialize<Envelope<UserCreatedEvent>>(json, JsonOpts); } catch (JsonException) { // необязательно: поддержка старого формата "чистый guid" } Guid userId = Guid.Empty; if (envelope?.Event?.UserId is Guid idFromJson && idFromJson != Guid.Empty) { userId = idFromJson; } else { // fallback (если вдруг пришёл старый формат): "просто guid строкой" var text = json.Trim(); if (!Guid.TryParse(text, out userId) || userId == Guid.Empty) { _logger.LogWarning("Bad message. Body={Body}", text); await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; } } await using var scope = _scopeFactory.CreateAsyncScope(); var service = scope.ServiceProvider.GetRequiredService<IWorkUserCreationService>(); await service.CreateFromUserAsync(userId, ct); await _channel.BasicAckAsync(tag, multiple: false, cancellationToken: ct); } catch (Exception ex) { _logger.LogError(ex, "Error while processing message. DeliveryTag={Tag}", tag); await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); } } private async Task DeclareTopologyAsync(IChannel ch, CancellationToken ct) { await ch.ExchangeDeclareAsync( exchange: _opts.UserCreatedExchange, type: ExchangeType.Direct, durable: true, autoDelete: false, cancellationToken: ct); await ch.QueueDeclareAsync( queue: _opts.UserCreatedQueue, durable: true, exclusive: false, autoDelete: false, cancellationToken: ct); await ch.QueueBindAsync( queue: _opts.UserCreatedQueue, exchange: _opts.UserCreatedExchange, routingKey: _opts.UserCreatedRoutingKey, cancellationToken: ct); } public override async Task StopAsync(CancellationToken cancellationToken) { try { if (_channel != null) { await _channel.CloseAsync(cancellationToken); await _channel.DisposeAsync(); } } catch { /* ignore */ } await base.StopAsync(cancellationToken); } }
UserId внутрь Event, а не в корень сообщения и не как строку.envelope.Event.UserId именно из такого JSON.Если хочешь — могу ещё сделать общий контракт UserCreatedEnvelope в Application.Dto (чтобы и паблишер, и консьюмер использовали один и тот же тип, а не анонимный new { ... }), но даже текущий вариант уже будет работать 1-в-1 с твоей отправкой.
такого не будет "просто guid строкой", исправь код
Ок — убираем любые попытки парсить “просто GUID строкой” и принимаем только тот JSON, который реально шлёт паблишер: { MessageId, Event: { UserId, OccurredAt } }.
Вот исправленный консьюмер (строгий JSON, без fallback):
csharpusing Application.Dto.Events; using Application.Interfaces.Services; using Infrastructure.RabbitMQ.Options; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Text.Json; using WorkerCodeStarts.Messaging.RabbitMq; namespace WorkerCodeStarts.Features.CodeExecution.Consumers; public sealed class CodeExecutionRequestedConsumer : BackgroundService { private readonly RabbitMqOptions _opts; private readonly RabbitMqConnection _rmq; private readonly IServiceScopeFactory _scopeFactory; private readonly ILogger<CodeExecutionRequestedConsumer> _logger; private IChannel? _channel; private CancellationToken _stoppingToken; private static readonly JsonSerializerOptions JsonOpts = new() { PropertyNameCaseInsensitive = true }; // Соответствует: JsonSerializer.Serialize(new { MessageId = messageId, Event = evt }) private sealed class Envelope<T> { public string? MessageId { get; set; } public T? Event { get; set; } } public CodeExecutionRequestedConsumer( IOptions<RabbitMqOptions> options, RabbitMqConnection rmq, IServiceScopeFactory scopeFactory, ILogger<CodeExecutionRequestedConsumer> logger) { _opts = options.Value; _rmq = rmq; _scopeFactory = scopeFactory; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _stoppingToken = stoppingToken; var conn = await _rmq.GetAsync(stoppingToken); _channel = await conn.CreateChannelAsync(cancellationToken: stoppingToken); await DeclareTopologyAsync(_channel, stoppingToken); await _channel.BasicQosAsync( prefetchSize: 0, prefetchCount: 1, global: false, cancellationToken: stoppingToken); var consumer = new AsyncEventingBasicConsumer(_channel); consumer.ReceivedAsync += OnMessageAsync; await _channel.BasicConsumeAsync( queue: _opts.UserCreatedQueue, autoAck: false, consumer: consumer, cancellationToken: stoppingToken); _logger.LogInformation("Rabbit consumer started. Queue={Queue}", _opts.UserCreatedQueue); await Task.Delay(Timeout.Infinite, stoppingToken); } private async Task OnMessageAsync(object sender, BasicDeliverEventArgs ea) { if (_channel == null) return; var ct = _stoppingToken; var tag = ea.DeliveryTag; try { var json = Encoding.UTF8.GetString(ea.Body.ToArray()); Envelope<UserCreatedEvent>? envelope; try { envelope = JsonSerializer.Deserialize<Envelope<UserCreatedEvent>>(json, JsonOpts); } catch (JsonException jex) { _logger.LogWarning(jex, "Bad message (invalid JSON). Body={Body}", json); await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; } var userId = envelope?.Event?.UserId ?? Guid.Empty; if (userId == Guid.Empty) { _logger.LogWarning( "Bad message (missing/empty Event.UserId). MessageId={MessageId} Body={Body}", envelope?.MessageId, json); await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; } await using var scope = _scopeFactory.CreateAsyncScope(); var service = scope.ServiceProvider.GetRequiredService<IWorkUserCreationService>(); await service.CreateFromUserAsync(userId, ct); await _channel.BasicAckAsync(tag, multiple: false, cancellationToken: ct); } catch (Exception ex) { _logger.LogError(ex, "Error while processing message. DeliveryTag={Tag}", tag); await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); } } private async Task DeclareTopologyAsync(IChannel ch, CancellationToken ct) { await ch.ExchangeDeclareAsync( exchange: _opts.UserCreatedExchange, type: ExchangeType.Direct, durable: true, autoDelete: false, cancellationToken: ct); await ch.QueueDeclareAsync( queue: _opts.UserCreatedQueue, durable: true, exclusive: false, autoDelete: false, cancellationToken: ct); await ch.QueueBindAsync( queue: _opts.UserCreatedQueue, exchange: _opts.UserCreatedExchange, routingKey: _opts.UserCreatedRoutingKey, cancellationToken: ct); } public override async Task StopAsync(CancellationToken cancellationToken) { try { if (_channel != null) { await _channel.CloseAsync(cancellationToken); await _channel.DisposeAsync(); } } catch { /* ignore */ } await base.StopAsync(cancellationToken); } }
Если хочешь “идеально 1-в-1”, то лучший следующий шаг — вынести envelope в общий DTO (например Application.Dto.Events.EventEnvelope<T>) и в паблишере сериализовать уже его, а не анонимный new { ... }. Тогда и отправка, и приём будут компилируемо согласованы.
_stoppingToken = stoppingToken;
var conn = await _rmq.GetAsync(stoppingToken);
_channel = await conn.CreateChannelAsync(cancellationToken: stoppingToken);
await DeclareTopologyAsync(_channel, stoppingToken);
await _channel.BasicQosAsync(
prefetchSize: 0,
prefetchCount: 1,
global: false,
cancellationToken: stoppingToken);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += OnMessageAsync;
await _channel.BasicConsumeAsync(
queue: _opts.UserCreatedQueue,
autoAck: false,
consumer: consumer,
cancellationToken: stoppingToken);
await Task.Delay(Timeout.Infinite, stoppingToken);
мне нужно подробное и простое объяснение каждой строчки
Ниже — простое объяснение каждой строки (и что она делает в RabbitMQ/BackgroundService).
csharp_stoppingToken = stoppingToken;
stoppingToken — это токен отмены, который ASP.NET/Host передаст твоему BackgroundService, когда приложение останавливается._stoppingToken, чтобы потом использовать внутри OnMessageAsync (там этого параметра уже нет, но отмену всё равно нужно уважать).csharpvar conn = await _rmq.GetAsync(stoppingToken);
IConnection).RabbitMqConnection — это твой обёрточный класс, который:
await — потому что создание соединения может быть асинхронным/долгим.stoppingToken, чтобы если сервис останавливается, подключение не пыталось создаваться дальше.csharp_channel = await conn.CreateChannelAsync(cancellationToken: stoppingToken);
IChannel) поверх соединения._channel, потому что он нужен:
OnMessageAsync) для Ack/Nack.csharpawait DeclareTopologyAsync(_channel, stoppingToken);
csharpawait _channel.BasicQosAsync( prefetchSize: 0, prefetchCount: 1, global: false, cancellationToken: stoppingToken);
Это настройка QoS / prefetch (сколько сообщений RabbitMQ выдаёт “в работу” консьюмеру заранее).
prefetchSize: 0
prefetchCount: 1
BasicAck, следующее сообщение тебе не прилетит.global: false
cancellationToken: stoppingToken
csharpvar consumer = new AsyncEventingBasicConsumer(_channel);
Task)._channel.csharpconsumer.ReceivedAsync += OnMessageAsync;
OnMessageAsync(object sender, BasicDeliverEventArgs ea).csharpawait _channel.BasicConsumeAsync( queue: _opts.UserCreatedQueue, autoAck: false, consumer: consumer, cancellationToken: stoppingToken);
Запускаешь потребление сообщений из очереди.
queue: _opts.UserCreatedQueue
autoAck: false
BasicAck — если обработал успешно,BasicNack/BasicReject — если ошибка.consumer: consumer
ReceivedAsync.cancellationToken: stoppingToken
csharpawait Task.Delay(Timeout.Infinite, stoppingToken);
Delay держит ExecuteAsync “живым” навсегда.ExecuteAsync дошёл бы до конца и сервис бы завершился.Timeout.Infinite = “ждать бесконечно”.stoppingToken позволяет прервать ожидание при остановке приложения:
Task.Delay бросит OperationCanceledException,BackgroundService нормально завершится.Если хочешь, я могу дальше так же построчно объяснить OnMessageAsync: где берётся тело, как правильно парсить JSON envelope, и что именно делают BasicAck/BasicNack (и чем отличается requeue: false/true).
private async Task OnMessageAsync(object sender, BasicDeliverEventArgs ea)
{
if (_channel == null) return;
textvar ct = _stoppingToken; var tag = ea.DeliveryTag; try { // 1) пробуем распарсить JSON envelope как у паблишера var json = Encoding.UTF8.GetString(ea.Body.ToArray()); Envelope<UserCreatedEvent>? envelope; try { envelope = JsonSerializer.Deserialize<Envelope<UserCreatedEvent>>(json, JsonOpts); } catch (JsonException) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; } Guid userId = envelope?.Event?.UserId ?? Guid.Empty; if (userId == Guid.Empty) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; } await using var scope = _scopeFactory.CreateAsyncScope(); var service = scope.ServiceProvider.GetRequiredService<IWorkUserCreationService>(); await service.CreateFromUserAsync(userId, ct); await _channel.BasicAckAsync(tag, multiple: false, cancellationToken: ct); } catch (Exception ex) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); }
}
теперь это
Ниже — простое и подробное объяснение каждой строки/блока твоего OnMessageAsync, включая что такое DeliveryTag, Ack/Nack, requeue и зачем scope.
csharpprivate async Task OnMessageAsync(object sender, BasicDeliverEventArgs ea)
sender — кто вызвал событие (обычно consumer).ea (BasicDeliverEventArgs) — “пакет” с данными сообщения: тело (Body), метаданные, DeliveryTag, routing, exchange и т.д.csharp{ if (_channel == null) return;
_channel не должен быть null, но это “страховка”.csharpvar ct = _stoppingToken;
ct в async-методы (Ack/Nack, CreateFromUserAsync), чтобы они могли корректно прерываться при остановке приложения.csharpvar tag = ea.DeliveryTag;
DeliveryTag — это идентификатор доставки сообщения внутри этого канала.Ack) или отклоняешь (Nack).csharptry { // 1) пробуем распарсить JSON envelope как у паблишера var json = Encoding.UTF8.GetString(ea.Body.ToArray());
try — всё внутри может упасть (JSON, сервис, база и т.д.).ea.Body — тело сообщения в виде байт.ToArray() — превращает в byte[].Encoding.UTF8.GetString(...) — превращает байты в строку JSON.csharpEnvelope<UserCreatedEvent>? envelope;
Envelope<UserCreatedEvent> — это твой “контейнер” вида:
MessageIdEvent (а внутри UserId, OccurredAt)csharptry { envelope = JsonSerializer.Deserialize<Envelope<UserCreatedEvent>>(json, JsonOpts); } catch (JsonException) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; }
json в объект Envelope<UserCreatedEvent>.JsonException.Что делаем в catch:
BasicNackAsync(...) говорит RabbitMQ: “сообщение НЕ обработано”.multiple: false — отклоняем только это сообщение (а не пачку до него).requeue: false — не возвращать обратно в очередь.
return; — прекращаем обработку, чтобы не делать лишнего.csharpGuid userId = envelope?.Event?.UserId ?? Guid.Empty;
UserId из envelope.Event.UserId.?. защищает от NullReferenceException:
envelope == null или Event == null, вернётся null?? Guid.Empty — если не удалось достать, будет Guid.Empty.csharpif (userId == Guid.Empty) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; }
UserId отсутствует или пустой — сообщение считаешь плохим.Nack без requeue, чтобы не гонять мусор по кругу.csharpawait using var scope = _scopeFactory.CreateAsyncScope();
await using — правильное освобождение async ресурсов.csharpvar service = scope.ServiceProvider.GetRequiredService<IWorkUserCreationService>();
GetRequiredService — если сервиса нет в DI, будет исключение (и попадём в общий catch).csharpawait service.CreateFromUserAsync(userId, ct);
userId.ct, чтобы при остановке процесса метод мог отмениться.csharpawait _channel.BasicAckAsync(tag, multiple: false, cancellationToken: ct);
ACK).multiple: false — подтверждаем только это сообщение.csharp} catch (Exception ex) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); } }
Nack и не возвращаешь сообщение обратно.ex у тебя сейчас не используется — обычно логируют, чтобы понимать причину.DeliveryTag нужен, чтобы ack/nack применился к конкретному сообщению.scope нужен для корректной работы scoped зависимостей (DbContext и т.п.).Если хочешь, я предложу улучшенную версию этого же метода:
MessageIdOperationCanceledException отдельноrequeue (когда true, когда false) + DLQ.private async Task DeclareTopologyAsync(IChannel ch, CancellationToken ct)
{
await ch.ExchangeDeclareAsync(
exchange: _opts.UserCreatedExchange,
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
cancellationToken: ct);
textawait ch.QueueDeclareAsync( queue: _opts.UserCreatedQueue, durable: true, exclusive: false, autoDelete: false, cancellationToken: ct); await ch.QueueBindAsync( queue: _opts.UserCreatedQueue, exchange: _opts.UserCreatedExchange, routingKey: _opts.UserCreatedRoutingKey, cancellationToken: ct);
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
try
{
if (_channel != null)
{
await _channel.CloseAsync(cancellationToken);
await _channel.DisposeAsync();
}
}
catch { /* ignore */ }
await base.StopAsync(cancellationToken);
}
теперь это
Ниже — построчно и простыми словами, что делает DeclareTopologyAsync и StopAsync.
csharpprivate async Task DeclareTopologyAsync(IChannel ch, CancellationToken ct) {
ch — канал RabbitMQ, через него делаются команды.ct — токен отмены (если сервис останавливается — можно прервать операции).csharpawait ch.ExchangeDeclareAsync( exchange: _opts.UserCreatedExchange, type: ExchangeType.Direct, durable: true, autoDelete: false, cancellationToken: ct);
Создаёт (или подтверждает) exchange.
exchange: _opts.UserCreatedExchange — имя exchange (из настроек).type: ExchangeType.Direct — тип Direct:
durable: true — exchange переживёт перезапуск RabbitMQ (сохранится на диске).autoDelete: false — exchange не удаляется автоматически, когда от него никто не зависит.cancellationToken: ct — если останавливаемся, операция может быть отменена.csharpawait ch.QueueDeclareAsync( queue: _opts.UserCreatedQueue, durable: true, exclusive: false, autoDelete: false, cancellationToken: ct);
Создаёт (или подтверждает) очередь.
queue: _opts.UserCreatedQueue — имя очереди.durable: true — очередь сохранится после рестарта RabbitMQ.exclusive: false — очередь не эксклюзивная:
autoDelete: false — очередь не удалится сама, когда последний консьюмер отключится.cancellationToken: ct — отмена при остановке.csharpawait ch.QueueBindAsync( queue: _opts.UserCreatedQueue, exchange: _opts.UserCreatedExchange, routingKey: _opts.UserCreatedRoutingKey, cancellationToken: ct);
Делает привязку (binding): “какие сообщения из exchange попадут в очередь”.
queue — какая очередь будет получать сообщения.exchange — из какого exchange брать сообщения.routingKey: _opts.UserCreatedRoutingKey — ключ маршрутизации:
routingKey совпадает.cancellationToken: ct — отмена при остановке.csharp}
Конец метода.
Важно: эти операции обычно идемпотентны — безопасно вызывать при старте консьюмера. Но параметры должны совпадать с уже существующими, иначе RabbitMQ может вернуть ошибку.
csharppublic override async Task StopAsync(CancellationToken cancellationToken) {
csharptry {
try, чтобы даже если закрытие упадёт — остановка приложения не “сломалась”.csharpif (_channel != null) { await _channel.CloseAsync(cancellationToken); await _channel.DisposeAsync(); }
Если канал существует:
CloseAsync(...) — закрывает канал:
DisposeAsync() — освобождает ресурсы клиента (память/сокеты/внутренние объекты).cancellationToken, чтобы не зависнуть на закрытии во время остановки.csharp} catch { /* ignore */ }
csharpawait base.StopAsync(cancellationToken); }
BackgroundService.StopAsync.ExecuteAsync).Если хочешь, могу объяснить что будет, если durable у exchange/queue сделать false, и почему важно, чтобы DeclareTopologyAsync в паблишере и консьюмере совпадали (иначе возможны ошибки “precondition failed”).
JsonOpts тебе нужен для десериализации JSON:
csharpprivate static readonly JsonSerializerOptions JsonOpts = new() { PropertyNameCaseInsensitive = true };
PropertyNameCaseInsensitive = true делает так, что при чтении JSON регистр имён полей не важен.
То есть эти варианты будут считаться одинаковыми:
"MessageId" / "messageId" / "MESSAGEID""Event" / "event""UserId" / "userId"Без этого System.Text.Json по умолчанию чувствителен к регистру, и если формат “поплыл” (другой сервис/язык/настройки) — envelope.Event может стать null.
Да, и это часто удобно, если ты читаешь/пишешь JSON в нескольких местах (publisher, consumer, сервисы).
Например, в проекте Application (или в отдельном Shared-проекте, который подключён и туда, и туда):
Application/Dto/Serialization/JsonDefaults.cs
csharpusing System.Text.Json; namespace Application.Dto.Serialization; public static class JsonDefaults { public static readonly JsonSerializerOptions Options = new() { PropertyNameCaseInsensitive = true }; }
И в консьюмере:
csharpusing Application.Dto.Serialization; var envelope = JsonSerializer.Deserialize<Envelope<UserCreatedEvent>>(json, JsonDefaults.Options);
PropertyNameCaseInsensitiveЕсли ты жёстко фиксируешь имена полей атрибутами, то регистр уже не важен, потому что ты явно сказал как называется поле в JSON:
csharpusing System.Text.Json.Serialization; public sealed class Envelope<T> { [JsonPropertyName("MessageId")] public string? MessageId { get; set; } [JsonPropertyName("Event")] public T? Event { get; set; } }
Тогда PropertyNameCaseInsensitive можно убрать, если уверен, что JSON всегда строго такой.
Да. Это даже хорошо:
JsonSerializerOptions на каждое сообщениеЕсли скажешь, где у тебя ещё используется JsonSerializer (только в воркере или и в API тоже), я предложу самый аккуратный вариант размещения, чтобы не смешивать слои (Application/Infrastructure/Worker).
using Infrastructure.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace WorkerCodeStarts.Messaging.RabbitMq;
public sealed class RabbitMqConnection : IAsyncDisposable
{
private readonly ConnectionFactory _factory;
textprivate IConnection? _connection; private readonly SemaphoreSlim _lock = new(1, 1); public RabbitMqConnection(IOptions<RabbitMqOptions> options) { var o = options.Value; _factory = new ConnectionFactory { HostName = o.HostName, Port = o.Port, UserName = o.UserName, Password = o.Password, AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true }; } public async Task<IConnection> GetAsync(CancellationToken ct) { if (_connection is { IsOpen: true }) return _connection; await _lock.WaitAsync(ct); try { if (_connection is { IsOpen: true }) return _connection; _connection = await _factory.CreateConnectionAsync(ct); return _connection; } finally { _lock.Release(); } } public async ValueTask DisposeAsync() { try { if (_connection != null) { await _connection.CloseAsync(); await _connection.DisposeAsync(); } } catch { /* ignore */ } _lock.Dispose(); }
}
using Application.Dto.Events;
using Application.Interfaces.Services;
using Infrastructure.RabbitMQ.Options;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Text.Json;
using WorkerCodeStarts.Messaging.RabbitMq;
namespace WorkerCodeStarts.Features.CodeExecution.Consumers;
public sealed class CodeExecutionRequestedConsumer : BackgroundService
{
private readonly RabbitMqOptions _opts;
private readonly RabbitMqConnection _rmq;
private readonly IServiceScopeFactory _scopeFactory;
textprivate IChannel? _channel; private CancellationToken _stoppingToken; private static readonly JsonSerializerOptions JsonOpts = new() { PropertyNameCaseInsensitive = true }; public CodeExecutionRequestedConsumer(IOptions<RabbitMqOptions> options, RabbitMqConnection rmq, IServiceScopeFactory scopeFactory) { _opts = options.Value; _rmq = rmq; _scopeFactory = scopeFactory; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _stoppingToken = stoppingToken; var conn = await _rmq.GetAsync(stoppingToken); _channel = await conn.CreateChannelAsync(cancellationToken: stoppingToken); await DeclareTopologyAsync(_channel, stoppingToken); await _channel.BasicQosAsync( prefetchSize: 0, prefetchCount: 1, global: false, cancellationToken: stoppingToken); var consumer = new AsyncEventingBasicConsumer(_channel); consumer.ReceivedAsync += OnMessageAsync; await _channel.BasicConsumeAsync( queue: _opts.UserCreatedQueue, autoAck: false, consumer: consumer, cancellationToken: stoppingToken); await Task.Delay(Timeout.Infinite, stoppingToken); } private async Task OnMessageAsync(object sender, BasicDeliverEventArgs ea) { if (_channel == null) return; var ct = _stoppingToken; var tag = ea.DeliveryTag; try { // 1) пробуем распарсить JSON envelope как у паблишера var json = Encoding.UTF8.GetString(ea.Body.ToArray()); Envelope<UserCreatedEvent>? envelope; try { envelope = JsonSerializer.Deserialize<Envelope<UserCreatedEvent>>(json, JsonOpts); } catch (JsonException) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; } Guid userId = envelope?.Event?.UserId ?? Guid.Empty; if (userId == Guid.Empty) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; } await using var scope = _scopeFactory.CreateAsyncScope(); var service = scope.ServiceProvider.GetRequiredService<IWorkUserCreationService>(); await service.CreateFromUserAsync(userId, ct); await _channel.BasicAckAsync(tag, multiple: false, cancellationToken: ct); } catch (Exception ex) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); } } private async Task DeclareTopologyAsync(IChannel ch, CancellationToken ct) { await ch.ExchangeDeclareAsync( exchange: _opts.UserCreatedExchange, type: ExchangeType.Direct, durable: true, autoDelete: false, cancellationToken: ct); await ch.QueueDeclareAsync( queue: _opts.UserCreatedQueue, durable: true, exclusive: false, autoDelete: false, cancellationToken: ct); await ch.QueueBindAsync( queue: _opts.UserCreatedQueue, exchange: _opts.UserCreatedExchange, routingKey: _opts.UserCreatedRoutingKey, cancellationToken: ct); } public override async Task StopAsync(CancellationToken cancellationToken) { try { if (_channel != null) { await _channel.CloseAsync(cancellationToken); await _channel.DisposeAsync(); } } catch { /* ignore */ } await base.StopAsync(cancellationToken); }
}
у меня есть два эимх скрипта. Это нормально? есть ли смылс?
Если ты имеешь в виду две реализации работы с RabbitMQ (в паблишере ты создаёшь ConnectionFactory + держишь _connection, и отдельно в воркере у тебя есть RabbitMqConnection), то:
Если у тебя 2 разных процесса/сервиса:
Тогда у каждого процесса будет своё соединение с RabbitMQ (их невозможно “разделить” между процессами). Это ок и правильно.
Если в одном и том же приложении/процессе у тебя есть и паблишер, и консьюмер (или несколько паблишеров), то держать две разные реализации соединения/фабрики — обычно бессмысленно и вредно:
TopologyRecoveryEnabled = true, а в другом месте может быть иначе)Сделай один общий класс соединения (типа твоего RabbitMqConnection) и используй его везде:
RabbitMqConnection регистрируешь как Singleton_rmq.GetAsync() и создаёт channel на отправку (как ты и делаешь await using var channel ...)Это прям “каноничный” подход в RabbitMQ:
Смысл есть только если:
Если это одна кодовая база и можно переиспользовать — лучше вынести RabbitMqConnection в общий слой (например Infrastructure.RabbitMQ) и инжектить его и в паблишер, и в воркер.
Если скажешь, паблишер (RabbitMqUserCreatedEventPublisher) находится в другом проекте/экзешнике или в этом же — я сразу дам точный вариант DI-регистрации и как переписать паблишер так, чтобы он использовал RabbitMqConnection (без дублирования _factory/_connection/_lock).
это всё находится в одном проекте, на одном скриншоте структура WorkerCodeStarts, на другом всего проекта, суть в том, что web использует другие слои для публикации id в rabbitmq, а WorkerCodeStarts читает от туда id, то что я скидывал тебе выше находится в данный момент в одном проекте WorkerCodeStarts
Понял. У тебя одно решение, но два запускаемых приложения:
На уровне рантайма (когда всё запущено) у них всё равно будут разные Connection’ы — потому что это разные процессы. Это нормально и правильно.
На уровне кода держать две разные реализации подключения (ConnectionFactory + _connection + lock) в Web и в Worker — обычно смысла нет. Это дублирование и риск “расхождения” настроек.
✅ Лучше: иметь один общий класс RabbitMqConnection в общем слое (например, Infrastructure.RabbitMQ) и использовать его и в Web, и в Worker.
RabbitMqConnection из WorkerCodeStarts в Infrastructure (общий проект)Например: Infrastructure/RabbitMQ/RabbitMqConnection.cs
И оставь одну реализацию на всю solution (код у тебя норм).
csharpservices.AddSingleton<RabbitMqConnection>();
_connection/_factory/_lock, а брал connection из RabbitMqConnectionТо есть вместо твоего RabbitMqUserCreatedEventPublisher с собственным ConnectionFactory — так:
csharppublic sealed class RabbitMqUserCreatedEventPublisher : IMessageQueueService { private readonly RabbitMqOptions _options; private readonly RabbitMqConnection _rmq; public RabbitMqUserCreatedEventPublisher(IOptions<RabbitMqOptions> options, RabbitMqConnection rmq) { _options = options.Value; _rmq = rmq; } public async Task<EventPublishResult> PublishUserCreatedAsync(Guid userId, CancellationToken ct = default) { var messageId = Guid.NewGuid().ToString("N"); try { var conn = await _rmq.GetAsync(ct); var channelOpts = new CreateChannelOptions( publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true ); await using var channel = await conn.CreateChannelAsync(channelOpts, ct); await DeclareTopologyAsync(channel, ct); var evt = new UserCreatedEvent { UserId = userId, OccurredAt = DateTimeOffset.UtcNow }; var json = JsonSerializer.Serialize(new { MessageId = messageId, Event = evt }); var body = Encoding.UTF8.GetBytes(json); var props = new BasicProperties { Persistent = true, ContentType = "application/json", MessageId = messageId, CorrelationId = userId.ToString() }; await channel.BasicPublishAsync( exchange: _options.UserCreatedExchange, routingKey: _options.UserCreatedRoutingKey, mandatory: true, basicProperties: props, body: body, cancellationToken: ct ); return EventPublishResult.Success(messageId); } catch (Exception ex) { return EventPublishResult.Failure($"RabbitMQ Error: {ex.Message}", messageId); } } private async Task DeclareTopologyAsync(IChannel channel, CancellationToken token) { await channel.ExchangeDeclareAsync(_options.UserCreatedExchange, ExchangeType.Direct, durable: true, autoDelete: false, cancellationToken: token); await channel.QueueDeclareAsync(_options.UserCreatedQueue, durable: true, exclusive: false, autoDelete: false, cancellationToken: token); await channel.QueueBindAsync(_options.UserCreatedQueue, _options.UserCreatedExchange, _options.UserCreatedRoutingKey, cancellationToken: token); } }
Что изменилось по смыслу:
RabbitMqConnection отвечает за одно соединение на процессЕсли ты про два разных класса/подхода к connection — то нет, не нужны.
Если хочешь, я скажу прямо по твоей структуре: в какой проект лучше положить RabbitMqConnection (Infrastructure или отдельный Shared), и как правильно оформить DependencyInjection.cs, чтобы и Web, и Worker подключали RabbitMQ одинаково.
using Application.Dto;
using Application.Dto.Events;
using Application.Interfaces.External;
using Infrastructure.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using System.Text;
using System.Text.Json;
namespace Infrastructure.RabbitMQ
{
public sealed class RabbitMqUserCreatedEventPublisher : IMessageQueueService, IAsyncDisposable
{
private readonly RabbitMqOptions _options;
private readonly ConnectionFactory _factory;
textprivate IConnection? _connection; private readonly SemaphoreSlim _connectionLock = new(1, 1); public RabbitMqUserCreatedEventPublisher(IOptions<RabbitMqOptions> options) { _options = options.Value; _factory = new ConnectionFactory { HostName = _options.HostName, Port = _options.Port, UserName = _options.UserName, Password = _options.Password, AutomaticRecoveryEnabled = true }; } private async Task<IConnection> GetConnectionAsync(CancellationToken token) { if (_connection != null && _connection.IsOpen) return _connection; await _connectionLock.WaitAsync(token); try { if (_connection != null && _connection.IsOpen) return _connection; _connection = await _factory.CreateConnectionAsync(token); return _connection; } finally { _connectionLock.Release(); } } public async Task<EventPublishResult> PublishUserCreatedAsync(Guid userId, CancellationToken cancellationToken = default) { var messageId = Guid.NewGuid().ToString("N"); try { cancellationToken.ThrowIfCancellationRequested(); var connection = await GetConnectionAsync(cancellationToken); var channelOpts = new CreateChannelOptions( publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true ); await using var channel = await connection.CreateChannelAsync(channelOpts, cancellationToken); await DeclareTopologyAsync(channel, cancellationToken); UserCreatedEvent evt = new UserCreatedEvent { UserId = userId, OccurredAt = DateTimeOffset.UtcNow }; var json = JsonSerializer.Serialize(new { MessageId = messageId, Event = evt }); var body = Encoding.UTF8.GetBytes(json); var props = new BasicProperties { Persistent = true, ContentType = "application/json", MessageId = messageId, CorrelationId = userId.ToString() }; await channel.BasicPublishAsync( exchange: _options.UserCreatedExchange, routingKey: _options.UserCreatedRoutingKey, mandatory: true, basicProperties: props, body: body, cancellationToken: cancellationToken ); return EventPublishResult.Success(messageId); } catch (PublishException ex) when (ex.IsReturn) { return EventPublishResult.Failure( $"Unroutable (returned). SeqNo={ex.PublishSequenceNumber}.", messageId ); } catch (PublishException ex) { return EventPublishResult.Failure( $"Not confirmed by broker. SeqNo={ex.PublishSequenceNumber}.", messageId ); } catch (OperationCanceledException) { return EventPublishResult.Failure("Publish cancelled."); } catch (Exception ex) { return EventPublishResult.Failure($"RabbitMQ Error: {ex.Message}"); } } private async Task DeclareTopologyAsync(IChannel channel, CancellationToken token) { await channel.ExchangeDeclareAsync( exchange: _options.UserCreatedExchange, type: ExchangeType.Direct, durable: true, autoDelete: false, cancellationToken: token ); await channel.QueueDeclareAsync( queue: _options.UserCreatedQueue, durable: true, exclusive: false, autoDelete: false, cancellationToken: token ); await channel.QueueBindAsync( queue: _options.UserCreatedQueue, exchange: _options.UserCreatedExchange, routingKey: _options.UserCreatedRoutingKey, cancellationToken: token ); } public async ValueTask DisposeAsync() { if (_connection != null) { try { await _connection.CloseAsync(); await _connection.DisposeAsync(); } catch { } } _connectionLock.Dispose(); } }
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Infrastructure.RabbitMQ.Options
{
public sealed class RabbitMqOptions
{
public string HostName { get; set; } = "localhost";
public int Port { get; set; } = 5672;
public string UserName { get; set; } = "guest";
public string Password { get; set; } = "guest";
textpublic string UserCreatedExchange { get; set; } = "users.exchange"; public string UserCreatedQueue { get; set; } = "users.created"; public string UserCreatedRoutingKey { get; set; } = "user.created"; public int ConfirmTimeoutSeconds { get; set; } = 5; }
}
выше это из Infrastructure, ниже это из WorkerCodeStarts
using Infrastructure.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace WorkerCodeStarts.Messaging.RabbitMq;
public sealed class RabbitMqConnection : IAsyncDisposable
{
private readonly ConnectionFactory _factory;
textprivate IConnection? _connection; private readonly SemaphoreSlim _lock = new(1, 1); public RabbitMqConnection(IOptions<RabbitMqOptions> options) { var o = options.Value; _factory = new ConnectionFactory { HostName = o.HostName, Port = o.Port, UserName = o.UserName, Password = o.Password, AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true }; } public async Task<IConnection> GetAsync(CancellationToken ct) { if (_connection is { IsOpen: true }) return _connection; await _lock.WaitAsync(ct); try { if (_connection is { IsOpen: true }) return _connection; _connection = await _factory.CreateConnectionAsync(ct); return _connection; } finally { _lock.Release(); } } public async ValueTask DisposeAsync() { try { if (_connection != null) { await _connection.CloseAsync(); await _connection.DisposeAsync(); } } catch { /* ignore */ } _lock.Dispose(); }
}
using Application.Dto.Events;
using Application.Interfaces.Services;
using Infrastructure.RabbitMQ.Options;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Text.Json;
using WorkerCodeStarts.Messaging.RabbitMq;
namespace WorkerCodeStarts.Features.CodeExecution.Consumers;
public sealed class CodeExecutionRequestedConsumer : BackgroundService
{
private readonly RabbitMqOptions _opts;
private readonly RabbitMqConnection _rmq;
private readonly IServiceScopeFactory _scopeFactory;
textprivate IChannel? _channel; private CancellationToken _stoppingToken; private static readonly JsonSerializerOptions JsonOpts = new() { PropertyNameCaseInsensitive = true }; public CodeExecutionRequestedConsumer(IOptions<RabbitMqOptions> options, RabbitMqConnection rmq, IServiceScopeFactory scopeFactory) { _opts = options.Value; _rmq = rmq; _scopeFactory = scopeFactory; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _stoppingToken = stoppingToken; var conn = await _rmq.GetAsync(stoppingToken); _channel = await conn.CreateChannelAsync(cancellationToken: stoppingToken); await DeclareTopologyAsync(_channel, stoppingToken); await _channel.BasicQosAsync( prefetchSize: 0, prefetchCount: 1, global: false, cancellationToken: stoppingToken); var consumer = new AsyncEventingBasicConsumer(_channel); consumer.ReceivedAsync += OnMessageAsync; await _channel.BasicConsumeAsync( queue: _opts.UserCreatedQueue, autoAck: false, consumer: consumer, cancellationToken: stoppingToken); await Task.Delay(Timeout.Infinite, stoppingToken); } private async Task OnMessageAsync(object sender, BasicDeliverEventArgs ea) { if (_channel == null) return; var ct = _stoppingToken; var tag = ea.DeliveryTag; try { // 1) пробуем распарсить JSON envelope как у паблишера var json = Encoding.UTF8.GetString(ea.Body.ToArray()); Envelope<UserCreatedEvent>? envelope; try { envelope = JsonSerializer.Deserialize<Envelope<UserCreatedEvent>>(json, JsonOpts); } catch (JsonException) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; } Guid userId = envelope?.Event?.UserId ?? Guid.Empty; if (userId == Guid.Empty) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); return; } await using var scope = _scopeFactory.CreateAsyncScope(); var service = scope.ServiceProvider.GetRequiredService<IWorkUserCreationService>(); await service.CreateFromUserAsync(userId, ct); await _channel.BasicAckAsync(tag, multiple: false, cancellationToken: ct); } catch (Exception ex) { await _channel.BasicNackAsync(tag, multiple: false, requeue: false, cancellationToken: ct); } } private async Task DeclareTopologyAsync(IChannel ch, CancellationToken ct) { await ch.ExchangeDeclareAsync( exchange: _opts.UserCreatedExchange, type: ExchangeType.Direct, durable: true, autoDelete: false, cancellationToken: ct); await ch.QueueDeclareAsync( queue: _opts.UserCreatedQueue, durable: true, exclusive: false, autoDelete: false, cancellationToken: ct); await ch.QueueBindAsync( queue: _opts.UserCreatedQueue, exchange: _opts.UserCreatedExchange, routingKey: _opts.UserCreatedRoutingKey, cancellationToken: ct); } public override async Task StopAsync(CancellationToken cancellationToken) { try { if (_channel != null) { await _channel.CloseAsync(cancellationToken); await _channel.DisposeAsync(); } } catch { /* ignore */ } await base.StopAsync(cancellationToken); }
}
Да, сейчас у тебя две разные “реализации подключения”:
RabbitMqUserCreatedEventPublisher сам держит ConnectionFactory, _connection, SemaphoreSlim, и сам же закрывает connection в DisposeAsync.RabbitMqConnection делает ровно то же самое (factory+lazy connection+lock+dispose).Как два процесса (Web и Worker) — нормально, у них всё равно будут разные соединения к RabbitMQ (это разные приложения).
Как код — смысла почти нет, потому что:
TopologyRecoveryEnabled=true, в паблишере — нет)Сделай один общий класс RabbitMqConnection в общем слое (например Infrastructure.RabbitMQ) и используй его и в Web, и в Worker.
Если паблишер начнёт использовать общий RabbitMqConnection, то паблишер НЕ должен закрывать connection (иначе он может “убить” соединение, которым ещё пользуются другие части приложения).
То есть RabbitMqUserCreatedEventPublisher лучше сделать без IAsyncDisposable.
RabbitMqConnection в Infrastructure.RabbitMQНапример: Infrastructure/RabbitMQ/RabbitMqConnection.cs (код почти как у тебя в Worker).
csharpservices.AddSingleton<RabbitMqConnection>();
RabbitMqConnectionИ удали из паблишера: _factory, _connection, _connectionLock, GetConnectionAsync, DisposeAsync.
Пример (по твоему коду):
csharpusing Application.Dto; using Application.Dto.Events; using Application.Interfaces.External; using Infrastructure.RabbitMQ.Options; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using System.Text; using System.Text.Json; namespace Infrastructure.RabbitMQ; public sealed class RabbitMqUserCreatedEventPublisher : IMessageQueueService { private readonly RabbitMqOptions _options; private readonly RabbitMqConnection _rmq; public RabbitMqUserCreatedEventPublisher(IOptions<RabbitMqOptions> options, RabbitMqConnection rmq) { _options = options.Value; _rmq = rmq; } public async Task<EventPublishResult> PublishUserCreatedAsync(Guid userId, CancellationToken cancellationToken = default) { var messageId = Guid.NewGuid().ToString("N"); try { cancellationToken.ThrowIfCancellationRequested(); var connection = await _rmq.GetAsync(cancellationToken); var channelOpts = new CreateChannelOptions( publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true ); await using var channel = await connection.CreateChannelAsync(channelOpts, cancellationToken); await DeclareTopologyAsync(channel, cancellationToken); var evt = new UserCreatedEvent { UserId = userId, OccurredAt = DateTimeOffset.UtcNow }; var json = JsonSerializer.Serialize(new { MessageId = messageId, Event = evt }); var body = Encoding.UTF8.GetBytes(json); var props = new BasicProperties { Persistent = true, ContentType = "application/json", MessageId = messageId, CorrelationId = userId.ToString() }; await channel.BasicPublishAsync( exchange: _options.UserCreatedExchange, routingKey: _options.UserCreatedRoutingKey, mandatory: true, basicProperties: props, body: body, cancellationToken: cancellationToken ); return EventPublishResult.Success(messageId); } catch (PublishException ex) when (ex.IsReturn) { return EventPublishResult.Failure($"Unroutable (returned). SeqNo={ex.PublishSequenceNumber}.", messageId); } catch (PublishException ex) { return EventPublishResult.Failure($"Not confirmed by broker. SeqNo={ex.PublishSequenceNumber}.", messageId); } catch (OperationCanceledException) { return EventPublishResult.Failure("Publish cancelled."); } catch (Exception ex) { return EventPublishResult.Failure($"RabbitMQ Error: {ex.Message}"); } } private async Task DeclareTopologyAsync(IChannel channel, CancellationToken token) { await channel.ExchangeDeclareAsync(_options.UserCreatedExchange, ExchangeType.Direct, durable: true, autoDelete: false, cancellationToken: token); await channel.QueueDeclareAsync(_options.UserCreatedQueue, durable: true, exclusive: false, autoDelete: false, cancellationToken: token); await channel.QueueBindAsync(_options.UserCreatedQueue, _options.UserCreatedExchange, _options.UserCreatedRoutingKey, cancellationToken: token); } }
CodeExecutionRequestedConsumer продолжает использовать RabbitMqConnection. Это ок.
ConfirmTimeoutSeconds у тебя в RabbitMqOptions сейчас нигде не используетсяЛибо удали, либо реально используй (например, если будешь явно ждать подтверждения с таймаутом — но в твоём варианте подтверждения уже “встроены” через publisherConfirmationsEnabled и исключения PublishException).
Если хочешь — я могу прямо подсказать куда именно в твоей структуре переместить файл RabbitMqConnection.cs и как будут выглядеть DependencyInjection.cs в Web и WorkerCodeStarts, чтобы всё собиралось без циклических зависимостей.
меня есть сервис, который записывает инфу в бд и отправляет id этой инфы в rabbitqm, ещё в будущем будет отправлять статус в redis - asp.net web api, сейчас мне нужно создать worker, который будет брать id из rabbitmq, брать по этому id инфу из бд и записывать новую инфу (другая таблица) с этим же id в бд, в будущем он должен будет запускать код в изолированном docker контейнере с лимитами и записывать эту инфу в бд, ставить статусы в redis. всё запускается в docker, воркеров может быть больше одного, учти эту информацию
Понял контекст. У тебя классическая схема producer (Web API) → очередь → worker(ы), дальше будет статусы в Redis и запуск кода в изолированном контейнере. Ниже — как это лучше спроектировать с учётом Docker, нескольких воркеров и того, что RabbitMQ даёт доставку “как минимум один раз”.
{MessageId, EntityId, ...}EntityIdEntityId)AckСообщение может прийти повторно (если воркер упал до Ack, если был reconnect и т.п.). Поэтому воркер обязан быть идемпотентным.
Самый простой и надёжный способ:
Например:
UserId (или EntityId) сделать UNIQUEТогда даже если одно и то же сообщение придёт 2 раза:
Пара альтернатив:
MessageId обработанных сообщений (Inbox таблица) и не обрабатывать повторноUNIQUE(entityId) на целевой таблицеПроблема: если ты сохранил в БД, но не смог отправить в RabbitMQ, воркер никогда не узнает.
Решение уровня “правильно по-взрослому”: Outbox pattern
outboxЭто даёт гарантию “не потеряешь” даже при падении/рестарте.
Если пока не хочешь outbox — хотя бы:
autoAck: false — правильно (ты сам решаешь Ack/Nack)prefetchCount: 1 — безопасно, но медленно (обработка строго по одному)
Сейчас у тебя при любой ошибке: BasicNack(requeue:false) — это означает:
Для продакшена обычно делают так:
(A) “плохое сообщение” (JSON битый, нет ID, ID пустой)
→ Nack(requeue:false) сразу (это мусор)
(B) “временная ошибка” (БД недоступна, Redis недоступен, контейнер не стартанул)
→ лучше делать повторы:
Nack(requeue:true) (простое, но может крутиться бесконечно)Сделай простой state-machine по ключу, например:
job:{entityId} или codeexec:{entityId}status, updatedAt, attempt, errorСтатусы:
queued (Web после публикации)processing (worker взял в работу)succeeded / failedВажно: статус в Redis — это не источник истины, источник истины — БД. Redis только для “быстро показать пользователю”.
Тут главное — безопасность и лимиты.
--cpus)--memory)Если ты в docker-compose и воркеру нужен запуск контейнеров:
/var/run/docker.sock (быстро, но рискованно)Контракт сообщения: { MessageId, Event: { EntityId, OccurredAt } } (как у тебя) — ок
MessageId пригодится для дедупликации/трассировкиИдемпотентность воркера через UNIQUE(entityId) в целевой таблице B
DLQ + retry стратегия (хотя бы базовую)
Outbox в Web (когда дойдёшь до устойчивости)
Если хочешь, я могу накидать конкретно под твой кейс:
outbox + inbox (если понадобится),Важно в Docker окружении
Если ты в docker-compose и воркеру нужен запуск контейнеров:
что ты имеешь ввиду насчёт 2 ? воркер у меня создан как отдельный проект, ты хочешь, чтобы создал отдельный проект как web(asp.net web api) и создал там такую же чистую архитектуру?
насчёт 3 я вообще не шарю за Kubernetes и как с ним работать.
Про пункт (2) “executor как отдельный сервис” я имел в виду не “переписать всё на чистую архитектуру”, а разделить ответственность:
id из RabbitMQ → читает БД → создаёт запись “запуск” → просит “исполнителя” выполнить код → сохраняет результат → ставит статус в Redis.Зачем так делать? Потому что запуск контейнеров из контейнера — это особая зона риска.
Если ты примонтируешь Docker socket внутрь воркера, воркер фактически получает почти root-доступ к хосту:
Даже если код “пользовательский” будет запускаться в отдельном контейнере, сам воркер становится очень опасной точкой.
Это отдельный контейнер (может быть отдельный проект, да), который:
Worker общается с ним по HTTP/gRPC или через отдельную очередь.
Worker → Executor:
executionIdExecutor → Worker:
Отдельный проект/контейнер — да, если ты реально будешь выполнять пользовательский/чужой код и хочешь спать спокойно.
Но чистую архитектуру “как в Web” — не обязательно. Executor может быть очень маленьким:
POST /executeWorker же остаётся в твоей текущей архитектуре.
web-api (пишет в БД, отправляет id в RabbitMQ)worker (читает id, оркестрация)executor (имеет docker.sock, запускает sandbox контейнеры)rabbitmq, postgres, redisТолько executor получает:
-v /var/run/docker.sock:/var/run/docker.sockWorker не получает docker.sock → меньше риск.
Kubernetes по сути даёт готовый механизм “запусти изолированную штуку с лимитами” (Pod/Job) без прямого доступа к docker.sock. Но раз ты не знаком — вообще не обязательно туда идти сейчас. С docker-compose + executor сервисом можно жить долго.
Если хочешь, набросаю конкретный скелет Executor API (минимальный ASP.NET endpoint + запуск контейнера с лимитами и таймаутом) и как воркер будет его вызывать, без лишней архитектуры.
using AlgoPlatform.Application.Abstractions;
using System.Diagnostics;
using System.Text;
namespace AlgoPlatform.Infrastructure.Execution.Docker
{
public sealed class DockerCliCodeRunner : ICodeRunner
{
private const string Image = "mcr.microsoft.com/dotnet/sdk:9.0-alpine";
<Project Sdk="Microsoft.NET.Sdk"> <PropertyGroup> <OutputType>Exe</OutputType> <TargetFramework>net8.0</TargetFramework> <ImplicitUsings>enable</ImplicitUsings> <Nullable>enable</Nullable> </PropertyGroup> <ItemGroup> <Reference Include="AlgoTracing"> <HintPath>/opt/algotracing/AlgoTracing.dll</HintPath> <Private>true</Private> </Reference> </ItemGroup> </Project> """; var csprojB64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(csprojXml));textpublic async Task<(int ExitCode, string Stdout, string Stderr)> RunAsync( string code, string? input, CancellationToken ct) { // --- подготовка нагрузки (код, csproj, AlgoTracing.dll) --- // C#-код пользователя var codeB64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(code ?? string.Empty)); // Тот же csproj, что и в Program.cs var csprojXml = """
text// Библиотека AlgoTracing.dll лежит рядом с API var algoDllPath = Path.Combine(AppContext.BaseDirectory, "AlgoTracing.dll"); if (!File.Exists(algoDllPath)) { throw new FileNotFoundException( "AlgoTracing.dll не найден рядом с приложением API. " + "Убедись, что csproj API имеет ProjectReference на AlgoTracing.", algoDllPath); } var algoDllBytes = await File.ReadAllBytesAsync(algoDllPath, ct); var algoDllB64 = Convert.ToBase64String(algoDllBytes); // Входные данные (опционально) var inputB64 = input is null ? string.Empty : Convert.ToBase64String(Encoding.UTF8.GetBytes(input)); var dockerPath = ResolveDockerPath(); var dockerConfig = EnsureMinimalDockerConfig(); // --- shell-скрипт внутри контейнера --- // 1. создаём /tmp/app и /opt/algotracing // 2. разворачиваем App.csproj, Program.cs и AlgoTracing.dll // 3. если есть INPUT_B64 — пишем input.txt // 4. запускаем dotnet run (с редиректом stdin из input.txt, если он есть) var sh = "set -euo pipefail; " + "mkdir -p /tmp/app /opt/algotracing && " + "printf \"%s\" \"$CSPROJ_B64\" | base64 -d > /tmp/app/App.csproj && " + "printf \"%s\" \"$CODE_B64\" | base64 -d > /tmp/app/Program.cs && " + "printf \"%s\" \"$ALGO_DLL_B64\" | base64 -d > /opt/algotracing/AlgoTracing.dll && " + "if [ -n \"${INPUT_B64:-}\" ]; then printf \"%s\" \"$INPUT_B64\" | base64 -d > /tmp/app/input.txt; fi && " + "cd /tmp/app && " + "if [ -f input.txt ]; then dotnet run -c Release < input.txt; else dotnet run -c Release; fi"; var args = $"run --rm --pull=missing --network none " + $"--cpus 0.5 --memory 256m --pids-limit 64 " + $"-e CODE_B64 -e CSPROJ_B64 -e ALGO_DLL_B64 -e INPUT_B64 " + $"{Image} sh -lc \"{sh}\""; var psi = new ProcessStartInfo(dockerPath, args) { RedirectStandardOutput = true, RedirectStandardError = true, UseShellExecute = false, CreateNoWindow = true }; // Чистая конфигурация Docker, не зависящая от пользователя psi.Environment["DOCKER_CONFIG"] = dockerConfig; psi.Environment["CODE_B64"] = codeB64; psi.Environment["CSPROJ_B64"] = csprojB64; psi.Environment["ALGO_DLL_B64"] = algoDllB64; psi.Environment["INPUT_B64"] = inputB64; using var proc = new Process { StartInfo = psi, EnableRaisingEvents = false }; var stdoutBuilder = new StringBuilder(); var stderrBuilder = new StringBuilder(); proc.OutputDataReceived += (_, e) => { if (e.Data is null) return; stdoutBuilder.AppendLine(e.Data); }; proc.ErrorDataReceived += (_, e) => { if (e.Data is null) return; stderrBuilder.AppendLine(e.Data); }; proc.Start(); proc.BeginOutputReadLine(); proc.BeginErrorReadLine(); // --- таймаут + отмена через CancellationToken --- var waitTask = proc.WaitForExitAsync(); var timeoutTask = Task.Delay(TimeSpan.FromSeconds(120)); var cancelTask = Task.Delay(Timeout.InfiniteTimeSpan, ct); var finished = await Task.WhenAny(waitTask, timeoutTask, cancelTask); if (finished == timeoutTask) { try { proc.Kill(entireProcessTree: true); } catch { /* ignore */ } stderrBuilder.AppendLine("Execution timed out after 120 seconds."); } else if (finished == cancelTask) { try { proc.Kill(entireProcessTree: true); } catch { /* ignore */ } // Пробрасываем отмену наверх throw new OperationCanceledException(ct); } // Дожидаемся фактического завершения процесса (после Kill тоже) await waitTask; var stdout = stdoutBuilder.ToString(); var stderr = stderrBuilder.ToString(); return (proc.ExitCode, stdout, stderr); } // --- утилиты (1:1 с Program.cs) --- private static string EnsureMinimalDockerConfig() { var root = Path.Combine(Path.GetTempPath(), "docker-config-empty"); Directory.CreateDirectory(root); var cfg = Path.Combine(root, "config.json"); if (!File.Exists(cfg)) File.WriteAllText(cfg, "{}"); return root; } private static string ResolveDockerPath() { var candidates = OperatingSystem.IsWindows() ? new[] { "docker", @"C:\Program Files\Docker\Docker\resources\bin\docker.exe", @"C:\Program Files\Rancher Desktop\resources\win32\bin\docker.exe" } : new[] { "docker", "/usr/bin/docker", "/usr/local/bin/docker" }; foreach (var c in candidates) { try { var psi = new ProcessStartInfo(c, "--version") { RedirectStandardOutput = true, RedirectStandardError = true, UseShellExecute = false, CreateNoWindow = true }; using var p = Process.Start(psi); if (p == null) continue; p.WaitForExit(1500); if (p.ExitCode == 0) return c; } catch { // ignore and try next } } throw new InvalidOperationException( "Docker CLI не найден. Запустите Docker Desktop и/или добавьте docker.exe в PATH."); } }
}
вот что есть сейчас. это небезопасно?
Если код пользовательский/чужой (потенциально вредоносный) — то в строгом смысле да, это небезопасно. Это “нормально для прототипа”, но не “песочница уровня продакшена”. Почему и что поправить:
--network none — контейнеру нельзя в сеть (хорошо).--cpus 0.5 --memory 256m --pids-limit 64 — базовые лимиты (хорошо).Контейнер — это изоляция на уровне Linux namespaces/cgroups, но при уязвимостях ядра/рантайма возможен container escape. Для запуска “чужого кода” обычно добавляют дополнительную изоляцию (gVisor/Kata/Firecracker).
Чтобы docker run работал внутри контейнера воркера/сервиса, обычно монтируют:
/var/run/docker.sock:/var/run/docker.sock
Это почти root-доступ к хосту для процесса, у которого есть этот сокет. Даже если пользовательский код внутри отдельного контейнера, компрометация твоего сервиса-раннера = компрометация хоста.
👉 Именно поэтому я и предлагал “вариант 2” (executor-сервис): доступ к docker.sock даёшь только маленькому executor’у, а не воркеру, не web-api и не всему приложению.
Ты делаешь proc.Kill() для процесса docker CLI. Это убьёт клиент, но контейнер не всегда гарантированно остановится (зависит от ситуации). В итоге могут остаться контейнеры, которые жрут CPU/RAM.
✅ Надёжнее: запускать с --name или --cidfile, и при таймауте делать docker rm -f <id>.
Ты копишь весь stdout/stderr в StringBuilder. Если код напечатает гигабайты — воркер может OOMнуться.
✅ Ограничивай объём вывода (например, максимум N KB/MB, дальше “truncated”).
sh -lc ...Инъекция команд у тебя практически закрыта тем, что входные данные идут base64 и ты пишешь через printf "%s" "$VAR", но в целом CLI+shell менее надёжен, чем Docker API/SDK (сложнее корректно обрабатывать таймауты, ошибки, CID и т.д.).
docker.sockdocker run добавить жёсткие флаги:
--read-only--tmpfs /tmp:rw,nosuid,nodev,noexec,size=64m (или без noexec, если нужно)--cap-drop ALL--security-opt no-new-privileges--user 65534:65534 (не root)--pull=missing → --pull=neverImage лучше ...@sha256:<digest>--cidfile и на таймауте делать docker rm -f $(cat cidfile)Если скажешь, где именно будет жить DockerCliCodeRunner (в воркере внутри контейнера?) и как ты планируешь давать ему доступ к Docker (docker.sock или docker-in-docker), я предложу конкретную схему docker-compose и самый маленький executor API без “второй чистой архитектуры”.