Elarion

Consuming events

Declare event consumers as handlers or service methods, for both inline domain events and after-commit integration events, and keep them idempotent.

[ConsumeEvent] is the single, unified way to subscribe — you never pick a bus or a plane on the consumer. The same attribute consumes either a domain or an integration event; the event type's marker (IDomainEvent / IIntegrationEvent) decides which plane delivers it, and the consumer's return type decides the role. So a consumer reads identically whether the event runs inline or after commit. (For the two-plane model itself, see the events overview.)

There are two ways to write the consumer. Prefer the handler form — it makes the consumer a first-class unit of business logic with the full decorator pipeline; the method form is a lightweight alternative for a small side effect on a service you already have.

Handler form (preferred)

A consumer is a class implementing IHandler<TEvent, Result<T>> — its request type is the event — annotated with a class-level [ConsumeEvent]. The simplest case uses the IHandler<TEvent> sugar over Result<Unit>, which returns the non-generic Result:

using Elarion.Abstractions;
using Elarion.Abstractions.Messaging;

[ConsumeEvent]
public sealed class SendInvoiceEmail(IEmailSender email) : IHandler<InvoiceCreated> {
    public async ValueTask<Result> HandleAsync(InvoiceCreated e, CancellationToken ct) {
        await email.SendAsync(e.ClientEmail, ct);
        return Result.Success();
    }
}

Because it is a handler, it runs through the full handler decorator pipeline — tracing, resilience, validation, cache-invalidation — exactly like a command or query handler, and it is discovered, registered, and feature-gated by its module like every other handler. The event bus simply becomes one more trigger for the same unit of business logic. Use [ConsumeEvent(Order = n)] to order fan-out subscribers; Order ascends, and equal-order consumers run in a stable, generator-determined sequence.

The role-by-return-type rule

The response type selects the consumer's role:

  • Result<Unit> (or the IHandler<TEvent> sugar) → a fan-out subscriber. Every matching consumer runs. A failed Result has no return channel to the publisher, so it surfaces as an EventConsumerFailedException, which each backend handles per its plane (the in-memory domain bus aggregates and rethrows, failing the command; the in-memory integration pump logs and isolates; the outbox dispatcher lets it propagate to trigger a retry).
  • Result<T> with T ≠ Unit → the single RequestAsync responder for the event, returning its typed result through the same pipeline. Domain plane only.
public sealed record PriceQuote(string Sku) : IDomainEvent;

[ConsumeEvent]
public sealed class QuotePrice(IPriceBook prices) : IHandler<PriceQuote, Result<Money>> {
    public ValueTask<Result<Money>> HandleAsync(PriceQuote q, CancellationToken ct) =>
        new(prices.Quote(q.Sku));
}

Integration events are fan-out only, so an integration handler must return Result / Result<Unit>; a non-Unit response is rejected as ELEVT005.

A domain-event handler runs nested in the command's pipeline. The domain plane dispatches inline in the publisher's scope, so a handler consumer's decorator pipeline runs inside the command that published the event — same scope, same DbContext, same transaction. Give domain-event handlers a read-only / minimal pipeline: a nested transaction or resilience decorator is wrong, since opening a nested transaction or retrying writes against the already-dirty DbContext would corrupt the command's unit of work. Tracing, validation, and cache-invalidation nest safely. Integration-event handlers are the opposite — they run on a fresh scope after commit, so the full pipeline (including a transaction decorator) is exactly right.

The clean way to get this right is a transaction decorator with an AppliesTo predicate that matches commands and integration-event handlers but not domain-event handlers — the generator then never attaches it to a domain-event handler, and the integration handler keeps it because its request is an IIntegrationEvent. One decorator, attached precisely at compile time, covers commands, queries, domain events, and integration events with no per-handler pipeline tags. (If you prefer to be explicit, a named read-only pipeline attribute that omits the transaction and resilience decorators works too, since pipeline attributes are most-specific-wins — see decorator pipelines.)

// Domain-event handler: shares the publisher's DbContext and transaction, so it must NOT
// open a nested transaction. With an AppliesTo predicate it needs no pipeline annotation at all.
[ConsumeEvent]
public sealed class RecalculateTotals(IAppDbContext db) : IHandler<InvoiceLineAdded> {
    public async ValueTask<Result> HandleAsync(InvoiceLineAdded e, CancellationToken ct) {
        // Shares the publisher's DbContext; this write commits atomically with the command.
        await db.Invoices
            .Where(i => i.Id == e.InvoiceId)
            .ExecuteUpdateAsync(s => s.SetProperty(i => i.Total, i => i.Total + e.Amount), ct);
        return Result.Success();
    }
}

Method form (alternative)

When you just need a side effect on a service that already exists, annotate an instance method on a [Service] class instead — no dedicated handler class, but also no decorator pipeline. The consumed event type is the method's message parameter, and the plane comes from that type's marker:

[Service]
public sealed class InvoiceNotifications(ILogger<InvoiceNotifications> logger) {
    [ConsumeEvent]
    public async ValueTask OnInvoiceCreated(InvoiceCreated e, CancellationToken ct) {
        await NotifyAsync(e.ClientEmail, ct);
    }
}

Here too the return type selects the role:

  • void / Task / ValueTask → a fan-out subscriber (every matching consumer runs).
  • Result<TResponse> / Task<Result<TResponse>> / ValueTask<Result<TResponse>> → the single responder for IDomainEventBus.RequestAsync (exactly one allowed; domain plane only).

Beyond the message parameter, a method-form consumer may optionally declare — in any order — an IEventContext (or IEventContext<TEvent>) for the correlation id and message, and a CancellationToken. Both are supplied by the runtime; omit what you don't need, and use [ConsumeEvent(Order = n)] to order fan-out subscribers. (The handler form takes the event and an optional CancellationToken.)

A class-level [ConsumeEvent] on a non-handler class, or a method-form consumer not on a [Service], is reported (ELEVT001/ELEVT005); an invalid signature is ELEVT002; a second RequestAsync responder for the same event is ELEVT004.

What the CancellationToken means differs by plane. For domain consumers it is the originating command's token (cancelling the command cancels the consumer). For integration consumers it is the delivery host's shutdown token — delivery is decoupled from and after the command, so the token signals "wind down," not "the request was cancelled."

Handling duplicates

Integration delivery is at-least-once: a consumer can see the same message more than once — a worker crash after the consumer ran but before the message was finalized, a fan-out consumer that threw and retried the whole message, or a lease that expired under multiple instances. There is no exactly-once delivery; you build exactly-once effect by making the consumer idempotent. Every redelivery carries the same IEventContext.CorrelationId, so that id is your stable deduplication key.

Pick the cheapest mechanism the side effect allows — an inbox table is the last resort, not the default.

1. Make the operation idempotent (prefer this). A conditional state transition or an upsert is self-deduplicating — running it twice is the same as once, with no extra bookkeeping:

[ConsumeEvent]
public async ValueTask OnInvoicePaid(InvoicePaid e, CancellationToken ct) {
    // "Mark paid only if not already paid" — a second delivery updates zero rows.
    await db.Invoices
        .Where(i => i.Id == e.InvoiceId && i.Status != InvoiceStatus.Paid)
        .ExecuteUpdateAsync(s => s.SetProperty(i => i.Status, InvoiceStatus.Paid), ct);
}

2. Delegate to the downstream's idempotency key. External systems that charge cards or send mail take a dedup key; pass the correlation id and let them collapse duplicates. Declare an IEventContext to receive it:

[ConsumeEvent]
public async ValueTask OnInvoiceCreated(
    InvoiceCreated e, IEventContext context, CancellationToken ct) {
    await payments.ChargeAsync(
        e.InvoiceId, idempotencyKey: context.CorrelationId.ToString("N"), ct);
}

3. Guard with a local dedup table — a lightweight inbox. Only when the effect is neither idempotent nor has a downstream key: record the handled id in the same transaction as the consumer's write. Because the consumer runs in-process on your DbContext, this is a unique-constrained insert, not a message broker's inbox machinery:

[ConsumeEvent]
public async ValueTask OnInvoiceCreated(
    InvoiceCreated e, IEventContext context, CancellationToken ct) {
    db.ProcessedEvents.Add(new ProcessedEvent {
        Consumer = nameof(InvoiceNotifications),
        CorrelationId = context.CorrelationId,   // unique index on (Consumer, CorrelationId)
    });
    DoTheSideEffect(e);
    try {
        await db.SaveChangesAsync(ct);
    }
    catch (DbUpdateException) {
        // The unique constraint rejected a duplicate — already processed, so let the message finalize.
    }
}

The case where a guard earns its keep: when a message has several fan-out consumers and one throws, the whole message is retried, so consumers that already succeeded run again. Idempotent operations (1) absorb this for free; a consumer that genuinely cannot be made idempotent should keep its own guard keyed by (consumer, CorrelationId) so it skips work it already did.

Subscribing without the generator (advanced)

[ConsumeEvent] is the normal path, but the runtime ultimately subscribes to whatever EventSubscriptionDescriptors are registered in DI — the generator simply emits those registrations. The descriptor type is public (Elarion.Abstractions.Messaging), so you can register one by hand for tests or advanced host wiring. It is the same unified shape for both planes: Plane selects domain vs integration, and a set ResponseType/InvokeRequestAsync (responder) vs InvokeAsync (fan-out subscriber) selects the role:

using Elarion.Abstractions.Messaging;
using Microsoft.Extensions.DependencyInjection;

builder.Services.AddScoped<InvoiceNotifications>();
builder.Services.AddSingleton(new EventSubscriptionDescriptor {
    EventType   = typeof(InvoiceCreated),
    Plane       = EventPlane.Integration,          // or EventPlane.Domain — same descriptor, either plane
    ServiceType = typeof(InvoiceNotifications),
    InvokeAsync = static (sp, evt, ctx, ct) =>
        sp.GetRequiredService<InvoiceNotifications>()
          .OnInvoiceCreated((InvoiceCreated)evt, ct),
});

This is composition-time registration. The EventSubscriptionRegistry is built once from the registered descriptors and then frozen, so the set of consumers is fixed when the container is built. A hand-registered descriptor is also always active — module feature-gating happens at the generated Add{Module}EventConsumers call site, not in the descriptor itself.

No dynamic, post-startup subscription today. You cannot add or remove consumers at runtime after the host is built — the frozen registry is what keeps dispatch reflection-free and trimming/AOT-safe.

On this page