Consuming events
Declare event consumers as handlers or service methods, for both inline domain events and after-commit integration events, and keep them idempotent.
[ConsumeEvent] is the single, unified way to subscribe — you never pick a bus or a plane on the
consumer. The same attribute consumes either a domain or an integration event; the event type's marker
(IDomainEvent / IIntegrationEvent) decides which plane delivers it, and the consumer's return type
decides the role. So a consumer reads identically whether the event runs inline or after commit. (For the
two-plane model itself, see the events overview.)
There are two ways to write the consumer. Prefer the handler form — it makes the consumer a first-class unit of business logic with the full decorator pipeline; the method form is a lightweight alternative for a small side effect on a service you already have.
Handler form (preferred)
A consumer is a class implementing IHandler<TEvent, Result<T>> — its request type is the event —
annotated with a class-level [ConsumeEvent]. The simplest case uses the IHandler<TEvent> sugar over
Result<Unit>, which returns the non-generic Result:
using Elarion.Abstractions;
using Elarion.Abstractions.Messaging;
[ConsumeEvent]
public sealed class SendInvoiceEmail(IEmailSender email) : IHandler<InvoiceCreated> {
public async ValueTask<Result> HandleAsync(InvoiceCreated e, CancellationToken ct) {
await email.SendAsync(e.ClientEmail, ct);
return Result.Success();
}
}Because it is a handler, it runs through the full handler decorator pipeline — tracing, resilience,
validation, cache-invalidation — exactly like a command or query handler, and it is discovered,
registered, and feature-gated by its module like every other handler. The event bus simply becomes one
more trigger for the same unit of business logic. Use [ConsumeEvent(Order = n)] to order fan-out
subscribers; Order ascends, and equal-order consumers run in a stable, generator-determined sequence.
The role-by-return-type rule
The response type selects the consumer's role:
Result<Unit>(or theIHandler<TEvent>sugar) → a fan-out subscriber. Every matching consumer runs. A failedResulthas no return channel to the publisher, so it surfaces as anEventConsumerFailedException, which each backend handles per its plane (the in-memory domain bus aggregates and rethrows, failing the command; the in-memory integration pump logs and isolates; the outbox dispatcher lets it propagate to trigger a retry).Result<T>withT ≠ Unit→ the singleRequestAsyncresponder for the event, returning its typed result through the same pipeline. Domain plane only.
public sealed record PriceQuote(string Sku) : IDomainEvent;
[ConsumeEvent]
public sealed class QuotePrice(IPriceBook prices) : IHandler<PriceQuote, Result<Money>> {
public ValueTask<Result<Money>> HandleAsync(PriceQuote q, CancellationToken ct) =>
new(prices.Quote(q.Sku));
}Integration events are fan-out only, so an integration handler must return Result / Result<Unit>;
a non-Unit response is rejected as ELEVT005.
A domain-event handler runs nested in the command's pipeline. The domain plane dispatches inline in
the publisher's scope, so a handler consumer's decorator pipeline runs inside the command that
published the event — same scope, same DbContext, same transaction. Give domain-event handlers a
read-only / minimal pipeline: a nested transaction or resilience decorator is wrong, since
opening a nested transaction or retrying writes against the already-dirty DbContext would corrupt the
command's unit of work. Tracing, validation, and cache-invalidation nest safely. Integration-event
handlers are the opposite — they run on a fresh scope after commit, so the full pipeline (including
a transaction decorator) is exactly right.
The clean way to get this right is a
transaction decorator with an AppliesTo predicate
that matches commands and integration-event handlers but not domain-event handlers — the generator
then never attaches it to a domain-event handler, and the integration handler keeps it because its request
is an IIntegrationEvent. One decorator, attached precisely at compile time, covers commands, queries,
domain events, and integration events with no per-handler pipeline tags. (If you prefer to be explicit, a
named read-only pipeline attribute that omits the transaction and resilience decorators works too, since
pipeline attributes are most-specific-wins — see decorator pipelines.)
// Domain-event handler: shares the publisher's DbContext and transaction, so it must NOT
// open a nested transaction. With an AppliesTo predicate it needs no pipeline annotation at all.
[ConsumeEvent]
public sealed class RecalculateTotals(IAppDbContext db) : IHandler<InvoiceLineAdded> {
public async ValueTask<Result> HandleAsync(InvoiceLineAdded e, CancellationToken ct) {
// Shares the publisher's DbContext; this write commits atomically with the command.
await db.Invoices
.Where(i => i.Id == e.InvoiceId)
.ExecuteUpdateAsync(s => s.SetProperty(i => i.Total, i => i.Total + e.Amount), ct);
return Result.Success();
}
}Method form (alternative)
When you just need a side effect on a service that already exists, annotate an instance method on a
[Service] class instead — no dedicated handler class, but also no decorator pipeline. The consumed
event type is the method's message parameter, and the plane comes from that type's marker:
[Service]
public sealed class InvoiceNotifications(ILogger<InvoiceNotifications> logger) {
[ConsumeEvent]
public async ValueTask OnInvoiceCreated(InvoiceCreated e, CancellationToken ct) {
await NotifyAsync(e.ClientEmail, ct);
}
}Here too the return type selects the role:
void/Task/ValueTask→ a fan-out subscriber (every matching consumer runs).Result<TResponse>/Task<Result<TResponse>>/ValueTask<Result<TResponse>>→ the single responder forIDomainEventBus.RequestAsync(exactly one allowed; domain plane only).
Beyond the message parameter, a method-form consumer may optionally declare — in any order — an
IEventContext (or IEventContext<TEvent>) for the correlation id and message, and a CancellationToken.
Both are supplied by the runtime; omit what you don't need, and use [ConsumeEvent(Order = n)] to order
fan-out subscribers. (The handler form takes the event and an optional CancellationToken.)
A class-level [ConsumeEvent] on a non-handler class, or a method-form consumer not on a [Service], is
reported (ELEVT001/ELEVT005); an invalid signature is ELEVT002; a second RequestAsync responder for
the same event is ELEVT004.
What the CancellationToken means differs by plane. For domain consumers it is the originating
command's token (cancelling the command cancels the consumer). For integration consumers it is the
delivery host's shutdown token — delivery is decoupled from and after the command, so the token signals
"wind down," not "the request was cancelled."
Handling duplicates
Integration delivery is at-least-once: a consumer can see the same message more than once — a worker
crash after the consumer ran but before the message was finalized, a fan-out consumer that threw and
retried the whole message, or a lease that expired under multiple instances. There is no exactly-once
delivery; you build exactly-once effect by making the consumer idempotent. Every redelivery carries
the same IEventContext.CorrelationId, so that id is your stable deduplication key.
Pick the cheapest mechanism the side effect allows — an inbox table is the last resort, not the default.
1. Make the operation idempotent (prefer this). A conditional state transition or an upsert is self-deduplicating — running it twice is the same as once, with no extra bookkeeping:
[ConsumeEvent]
public async ValueTask OnInvoicePaid(InvoicePaid e, CancellationToken ct) {
// "Mark paid only if not already paid" — a second delivery updates zero rows.
await db.Invoices
.Where(i => i.Id == e.InvoiceId && i.Status != InvoiceStatus.Paid)
.ExecuteUpdateAsync(s => s.SetProperty(i => i.Status, InvoiceStatus.Paid), ct);
}2. Delegate to the downstream's idempotency key. External systems that charge cards or send mail take
a dedup key; pass the correlation id and let them collapse duplicates. Declare an IEventContext to
receive it:
[ConsumeEvent]
public async ValueTask OnInvoiceCreated(
InvoiceCreated e, IEventContext context, CancellationToken ct) {
await payments.ChargeAsync(
e.InvoiceId, idempotencyKey: context.CorrelationId.ToString("N"), ct);
}3. Guard with a local dedup table — a lightweight inbox. Only when the effect is neither idempotent
nor has a downstream key: record the handled id in the same transaction as the consumer's write.
Because the consumer runs in-process on your DbContext, this is a unique-constrained insert, not a
message broker's inbox machinery:
[ConsumeEvent]
public async ValueTask OnInvoiceCreated(
InvoiceCreated e, IEventContext context, CancellationToken ct) {
db.ProcessedEvents.Add(new ProcessedEvent {
Consumer = nameof(InvoiceNotifications),
CorrelationId = context.CorrelationId, // unique index on (Consumer, CorrelationId)
});
DoTheSideEffect(e);
try {
await db.SaveChangesAsync(ct);
}
catch (DbUpdateException) {
// The unique constraint rejected a duplicate — already processed, so let the message finalize.
}
}The case where a guard earns its keep: when a message has several fan-out consumers and one throws,
the whole message is retried, so consumers that already succeeded run again. Idempotent operations (1)
absorb this for free; a consumer that genuinely cannot be made idempotent should keep its own guard keyed
by (consumer, CorrelationId) so it skips work it already did.
Subscribing without the generator (advanced)
[ConsumeEvent] is the normal path, but the runtime ultimately subscribes to whatever
EventSubscriptionDescriptors are registered in DI — the generator simply emits those registrations.
The descriptor type is public (Elarion.Abstractions.Messaging), so you can register one by hand for tests
or advanced host wiring. It is the same unified shape for both planes: Plane selects domain vs
integration, and a set ResponseType/InvokeRequestAsync (responder) vs InvokeAsync (fan-out subscriber)
selects the role:
using Elarion.Abstractions.Messaging;
using Microsoft.Extensions.DependencyInjection;
builder.Services.AddScoped<InvoiceNotifications>();
builder.Services.AddSingleton(new EventSubscriptionDescriptor {
EventType = typeof(InvoiceCreated),
Plane = EventPlane.Integration, // or EventPlane.Domain — same descriptor, either plane
ServiceType = typeof(InvoiceNotifications),
InvokeAsync = static (sp, evt, ctx, ct) =>
sp.GetRequiredService<InvoiceNotifications>()
.OnInvoiceCreated((InvoiceCreated)evt, ct),
});This is composition-time registration. The EventSubscriptionRegistry is built once from the
registered descriptors and then frozen, so the set of consumers is fixed when the container is built. A
hand-registered descriptor is also always active — module feature-gating happens at the generated
Add{Module}EventConsumers call site, not in the descriptor itself.
No dynamic, post-startup subscription today. You cannot add or remove consumers at runtime after the host is built — the frozen registry is what keeps dispatch reflection-free and trimming/AOT-safe.
Events & messaging
An in-process eventing subsystem split by its relationship to the database transaction — inline domain events and after-commit integration events.
Event backends
Choose between the best-effort in-memory integration bus and the durable EF Core transactional outbox, and wire the one you pick.