Elarion

Events

An in-process eventing subsystem split by its relationship to the database transaction — inline domain events and after-commit integration events, with a durable EF Core outbox for reliable delivery.

Elarion's event bus is organized around one question: when does the event run relative to the database transaction? That split — not a verb like publish or notify — is the whole API. There are two planes, each its own interface in Elarion.Abstractions.Messaging, and an event binds to exactly one of them through a marker interface.

PlaneInterfaceMarkerRuns
DomainIDomainEventBusIDomainEventinline, in the caller's scope and transaction
IntegrationIIntegrationEventBusIIntegrationEventafter the transaction commits, on a separate scope

Consumers are reflection-free: a source generator discovers them, validates the signatures, and emits the registration and invocation delegates. There is no runtime assembly scanning or reflective method dispatch.

Declaring an event and a consumer

[ConsumeEvent] is the single, unified way to subscribe — you never pick a bus or a plane on the consumer. The same attribute consumes either a domain or an integration event; the event type's marker (IDomainEvent / IIntegrationEvent) decides which plane delivers it, and the consumer's return type decides the role. So a consumer reads identically whether the event runs inline or after commit.

An event is a plain type carrying exactly one marker:

using Elarion.Abstractions.Messaging;

public sealed record InvoiceCreated(Guid InvoiceId, string ClientEmail) : IIntegrationEvent;

There are two ways to write the consumer. Prefer the handler form — it makes the consumer a first-class unit of business logic with the full decorator pipeline; the method form is a lightweight alternative for a small side effect on a service you already have.

Handler form (preferred)

A consumer is a class implementing IHandler<TEvent> — its request type is the event — annotated with a class-level [ConsumeEvent]. It returns the non-generic Result (the IHandler<T> sugar over Result<Unit>):

using Elarion.Abstractions;
using Elarion.Abstractions.Messaging;

[ConsumeEvent]
public sealed class SendInvoiceEmail(IEmailSender email) : IHandler<InvoiceCreated> {
    public async ValueTask<Result> HandleAsync(InvoiceCreated e, CancellationToken ct) {
        await email.SendAsync(e.ClientEmail, ct);
        return Result.Success();
    }
}

Because it is a handler, it runs through the full handler decorator pipeline — tracing, resilience, validation, cache-invalidation — exactly like a command or query handler, and it is discovered, registered, and feature-gated by its module like every other handler. The event bus simply becomes one more trigger for the same unit of business logic. A failed Result from a subscriber surfaces as an EventConsumerFailedException, which each plane handles its own way (the domain bus rethrows and fails the command; the integration tier logs and retries). Use [ConsumeEvent(Order = n)] to order fan-out subscribers.

To answer a request instead, return a typed Result<T> (T ≠ Unit) — that handler is the single RequestAsync responder for the event, returning its result through the same pipeline:

public sealed record PriceQuote(string Sku) : IDomainEvent;

[ConsumeEvent]
public sealed class QuotePrice(IPriceBook prices) : IHandler<PriceQuote, Result<Money>> {
    public ValueTask<Result<Money>> HandleAsync(PriceQuote q, CancellationToken ct) =>
        new(prices.Quote(q.Sku));
}

Integration events are fan-out only, so an integration handler must return Result / Result<Unit>; a non-Unit response is reported as ELEVT005.

A domain-event handler runs nested in the command's pipeline. The domain plane dispatches inline in the publisher's scope (Plane A), so a handler consumer's decorator pipeline runs inside the command that published the event — same scope, same DbContext, same transaction. Give domain-event handlers a read-only / minimal pipeline: they are already inside the command's unit of work and resilience scope, so a nested transaction or resilience decorator is wrong — opening a nested transaction, or retrying writes against the already-dirty DbContext, would corrupt the command's unit of work. Tracing, validation, and cache-invalidation nest safely.

Integration-event handlers are the opposite: they run on a fresh scope after commit (their own DbContext and transaction), so the full pipeline — including a transaction decorator — is exactly right. There the pipeline running again is the new unit of work.

Handling it — let the transaction decorator declare where it attaches. The clean default is a transaction decorator with an AppliesTo predicate that matches commands and integration-event handlers but not domain-event handlers — so the handler below gets no transaction decorator at all and runs inside the publisher's transaction, with no pipeline annotation. If you'd rather be explicit, a named read-only pipeline that omits the transaction (and resilience) decorators works too, since pipeline attributes are most-specific-wins (decorator pipelines):

using Elarion.Abstractions.Pipeline;

// Only validation — no transaction, no resilience.
[DecoratorList(typeof(ValidationDecorator<,>))]
[AttributeUsage(AttributeTargets.Assembly | AttributeTargets.Class)]
public sealed class ReadOnlyPipelineAttribute : Attribute;

…then apply it to the domain-event handler, which now shares the command's transaction instead of opening a nested one:

public sealed record InvoiceLineAdded(Guid InvoiceId, decimal Amount) : IDomainEvent;

[ConsumeEvent]
[ReadOnlyPipeline]   // overrides the assembly/module default; drops the transaction decorator
public sealed class RecalculateTotals(IAppDbContext db) : IHandler<InvoiceLineAdded> {
    public async ValueTask<Result> HandleAsync(InvoiceLineAdded e, CancellationToken ct) {
        // Shares the publisher's DbContext; this write commits atomically with the command.
        await db.Invoices
            .Where(i => i.Id == e.InvoiceId)
            .ExecuteUpdateAsync(s => s.SetProperty(i => i.Total, i => i.Total + e.Amount), ct);
        return Result.Success();
    }
}

An integration-event handler needs no annotation either: its request is an IIntegrationEvent, which the predicate matches, so it opens its own transaction on its fresh post-commit scope. One decorator, attached precisely at compile time, covers commands, queries, domain events, and integration events with no per-handler pipeline tags.

Method form (alternative)

When you just need a side effect on a service that already exists, annotate an instance method on a [Service] class instead — no dedicated handler class, but also no decorator pipeline:

[Service]
public sealed class InvoiceNotifications(ILogger<InvoiceNotifications> logger) {
    [ConsumeEvent]
    public async ValueTask OnInvoiceCreated(InvoiceCreated e, CancellationToken ct) {
        await NotifyAsync(e.ClientEmail, ct);
    }
}

Here the return type selects the role:

  • void / Task / ValueTask → a fan-out subscriber (every matching consumer runs).
  • Result<TResponse> / Task<Result<TResponse>> / ValueTask<Result<TResponse>> → a responder for IDomainEventBus.RequestAsync (exactly one allowed; domain plane only).

Beyond the message parameter, a method-form consumer may optionally declare — in any order — an IEventContext (or IEventContext<TEvent>) for the correlation id, and a CancellationToken. Both are supplied by the runtime; omit what you don't need, and use [ConsumeEvent(Order = n)] to order fan-out subscribers. (The handler form takes the event and an optional CancellationToken.)

What the CancellationToken means differs by plane. For domain consumers it is the originating command's token (cancelling the command cancels the consumer). For integration consumers it is the delivery host's shutdown token — delivery is decoupled from and after the command, so the token signals "wind down," not "the request was cancelled."

Plane A — domain events (inline)

IDomainEventBus dispatches within the caller's DI scope, and therefore within the caller's transaction. Consumers share the scoped DbContext, their writes commit atomically with the command, and a consumer failure fails the command. This is the right plane for in-aggregate invariants and synchronous side effects that must be part of the same unit of work. A handler-form consumer here runs its decorator pipeline nested inside the command's — keep that pipeline read-only/minimal (see the warning under Handler form).

For example, a command that adds an invoice line publishes a domain event, and the RecalculateTotals handler shown above reacts to it inline, in the same transaction — the line and the recalculated total commit together or not at all:

// A command handler publishes the domain event inside its own transaction…
public sealed class AddInvoiceLine(IAppDbContext db, IDomainEventBus domainEvents)
    : IHandler<AddInvoiceLine.Command> {
    public sealed record Command {
        public required Guid InvoiceId { get; init; }
        public required decimal Amount { get; init; }
    }

    public async ValueTask<Result> HandleAsync(Command c, CancellationToken ct) {
        db.InvoiceLines.Add(new InvoiceLine { InvoiceId = c.InvoiceId, Amount = c.Amount });

        // …dispatched inline, so RecalculateTotals runs now — same scope, same transaction.
        await domainEvents.PublishAsync(new InvoiceLineAdded(c.InvoiceId, c.Amount), ct);

        await db.SaveChangesAsync(ct);   // the line and the consumer's total update commit together
        return Result.Success();
    }
}

// Request/response is the other domain role — exactly one responder answers:
var price = await domainEvents.RequestAsync<PriceQuote, Money>(new PriceQuote(sku), ct);

Domain events are never broker-portable — they are an in-process, in-transaction mechanism by definition.

Plane B — integration events (after commit)

IIntegrationEventBus records the event in the caller's unit of work and delivers it after the transaction commits, on a fresh scope, retried independently. A consumer failure never fails the command, and a rollback discards the event. This is the plane for independent, after-the-fact reactions — notifications, read-model projections, outbound webhooks, cross-module fan-out.

await integrationEvents.PublishAsync(new InvoiceCreated(invoice.Id, client.Email), ct);

Because delivery happens later on its own scope, integration consumers run at-least-once and without an ordering guarantee — write them to be idempotent and order-independent.

This is the only broker-portable plane: an alternative backend implements only IIntegrationEventBus. Two backends ship.

Choosing an integration backend

Recommended: the EF Core outbox for anything that must not be lost. The in-memory tier is a best-effort convenience for events you can afford to drop on a crash.

In-memory (Elarion.Messaging.InMemory)EF Core outbox (Elarion.Messaging.Outbox)
Durabilitybest-effort; flushed-but-undelivered events are lost on crashat-least-once; survives restarts
How it capturesa per-scope buffer flushed after commita row written in the same transaction as your data
Deliverya hosted channel pumpa hosted worker that polls, leases, dispatches, retries
Use whenevents are advisory and loss is acceptableevents drive real side effects that must happen

Register the in-memory tier with AddInMemoryEventBus() from Elarion.Messaging.InMemory. This wires the domain plane plus the in-memory integration plane, whose after-commit delivery is commit-gated by the database transaction without a hand-written decorator: the package's EF Core interceptors are registered automatically and flush buffered events after the DbContext commits and discard them on rollback.

builder.Services.AddInMemoryEventBus();   // domain plane + in-memory integration plane (interceptors included)

The rest of this page covers the durable outbox.

The EF Core outbox

The outbox makes integration delivery durable by writing each event as a row in your DbContext, committed atomically with the business data. A background worker delivers it after commit.

1. Add the table to the context that owns your business entities, in OnModelCreating:

protected override void OnModelCreating(ModelBuilder modelBuilder) {
    base.OnModelCreating(modelBuilder);
    modelBuilder.UseElarionOutbox();   // adds elarion_outbox_messages + a partial pending index
}

2. Register the tier in the host, generic over that context:

builder.Services.AddElarionOutbox<BillingDbContext>(o => {
    o.SerializerOptions = serializerOptions;   // reuse the host's source-generated options for AOT
    // o.PollingInterval = TimeSpan.FromSeconds(1);
    // o.LeaseDuration   = TimeSpan.FromMinutes(2);
    // o.BatchSize       = 100;
    // o.RetentionPeriod = TimeSpan.FromDays(7);   // null = keep delivered rows forever
});

This registers the durable IIntegrationEventBus, the storage, the dispatcher, and the hosted delivery worker. The consumer descriptors are registered separately — automatically and feature-gated when you use [GenerateModuleBootstrapper] (see Module gating), or explicitly via the generated Add{Assembly}EventConsumers().

Publish before you save. PublishAsync only tracks the outbox row — it does not call SaveChanges. Your unit of work must persist it within the same transaction as the business data, so publish before the SaveChanges that commits the command:

db.Invoices.Add(invoice);
await integrationEvents.PublishAsync(new InvoiceCreated(invoice.Id, client.Email), ct);
await db.SaveChangesAsync(ct);   // persists the invoice and the outbox row atomically

If the transaction rolls back, both the invoice and the event vanish together — no half-published notifications. The in-memory tier's per-scope flush/discard buffer is irrelevant here; the database transaction provides the commit-gating directly.

The worker claims pending messages with a provider-neutral conditional update and a short lease, so running a couple of instances (or overlapping old/new during a deploy) never double-delivers. Defaults are tuned for low/single-instance deployments. The UseElarionOutbox pending index is a partial (filtered) index — supported on PostgreSQL, SQL Server, and SQLite; on MySQL, supply an unfiltered index in your own model.

Domain and integration together

The domain plane is always in-memory and inline. When the outbox owns the integration plane, register the domain plane on its own with AddInMemoryDomainEventBus() (from Elarion) — no in-memory integration pump, so only the outbox delivers Plane B:

builder.Services.AddInMemoryDomainEventBus();          // domain plane + registry
builder.Services.AddElarionOutbox<BillingDbContext>(); // integration plane (durable)

If you only publish integration events, you don't need the in-memory tier at all — the outbox and the consumer descriptors are enough.

Subscribing without the generator (advanced)

[ConsumeEvent] is the normal path, but the runtime ultimately subscribes to whatever EventSubscriptionDescriptors are registered in DI — the generator simply emits those registrations. The descriptor type is public (Elarion.Abstractions.Messaging), so you can register one by hand for tests or advanced host wiring. It is the same unified shape for both planes: Plane selects domain vs integration, and a set ResponseType/InvokeRequestAsync (responder) vs InvokeAsync (fan-out subscriber) selects the role — exactly as in Declaring a consumer:

using Elarion.Abstractions.Messaging;
using Microsoft.Extensions.DependencyInjection;

builder.Services.AddScoped<InvoiceNotifications>();
builder.Services.AddSingleton(new EventSubscriptionDescriptor {
    EventType   = typeof(InvoiceCreated),
    Plane       = EventPlane.Integration,          // or EventPlane.Domain — same descriptor, either plane
    ServiceType = typeof(InvoiceNotifications),
    InvokeAsync = static (sp, evt, ctx, ct) =>
        sp.GetRequiredService<InvoiceNotifications>()
          .OnInvoiceCreated((InvoiceCreated)evt, ct),
});

This is composition-time registration. The EventSubscriptionRegistry is built once from the registered descriptors and then frozen, so the set of consumers is fixed when the container is built. A hand-registered descriptor is also always active — module feature-gating happens at the generated Add{Module}EventConsumers call site, not in the descriptor itself.

No dynamic, post-startup subscription today. You cannot add or remove consumers at runtime after the host is built — the frozen registry is what keeps dispatch reflection-free and trimming/AOT-safe. Planned/under consideration: an opt-in dynamic-subscription seam for plugin-style scenarios that can tolerate the reflection/AOT trade-off. If you need it, open an issue describing the use case.

Handling duplicates

Delivery is at-least-once: a consumer can see the same message more than once — a worker crash after the consumer ran but before the message was finalized, a fan-out consumer that threw and retried the whole message, or a lease that expired under multiple instances. There is no exactly-once delivery; you build exactly-once effect by making the consumer idempotent. Every redelivery carries the same IEventContext.CorrelationId, so that id is your stable deduplication key.

Pick the cheapest mechanism the side effect allows — an inbox table is the last resort, not the default.

1. Make the operation idempotent (prefer this). A conditional state transition or an upsert is self-deduplicating — running it twice is the same as once, with no extra bookkeeping:

[ConsumeEvent]
public async ValueTask OnInvoicePaid(InvoicePaid e, CancellationToken ct) {
    // "Mark paid only if not already paid" — a second delivery updates zero rows.
    await db.Invoices
        .Where(i => i.Id == e.InvoiceId && i.Status != InvoiceStatus.Paid)
        .ExecuteUpdateAsync(s => s.SetProperty(i => i.Status, InvoiceStatus.Paid), ct);
}

2. Delegate to the downstream's idempotency key. External systems that charge cards or send mail take a dedup key; pass the correlation id and let them collapse duplicates. Declare an IEventContext to receive it:

[ConsumeEvent]
public async ValueTask OnInvoiceCreated(
    InvoiceCreated e, IEventContext context, CancellationToken ct) {
    await payments.ChargeAsync(
        e.InvoiceId, idempotencyKey: context.CorrelationId.ToString("N"), ct);
}

3. Guard with a local dedup table — a lightweight inbox. Only when the effect is neither idempotent nor has a downstream key: record the handled id in the same transaction as the consumer's write. Because the consumer runs in-process on your DbContext, this is a unique-constrained insert, not a message broker's inbox machinery:

[ConsumeEvent]
public async ValueTask OnInvoiceCreated(
    InvoiceCreated e, IEventContext context, CancellationToken ct) {
    db.ProcessedEvents.Add(new ProcessedEvent {
        Consumer = nameof(InvoiceNotifications),
        CorrelationId = context.CorrelationId,   // unique index on (Consumer, CorrelationId)
    });
    DoTheSideEffect(e);
    try {
        await db.SaveChangesAsync(ct);
    }
    catch (DbUpdateException) {
        // The unique constraint rejected a duplicate — already processed, so let the message finalize.
    }
}

The case where a guard earns its keep: when a message has several fan-out consumers and one throws, the whole message is retried, so consumers that already succeeded run again. Idempotent operations (1) absorb this for free; a consumer that genuinely cannot be made idempotent should keep its own guard keyed by (consumer, CorrelationId) so it skips work it already did.

Module gating

When the host opts into [GenerateModuleBootstrapper], event consumers are associated with their owning module and registered through that module's generated ConfigureDefaultServices, gated by Modules:{Name}:Enabled. So disabling a module also stops its events from being consumed — no separate wiring, and consistent with how handlers, services, validators, and scheduled jobs gate. A consumer whose namespace falls under no module is reported (ELEVT003) so it is never silently dropped.

What to remember

  • Pick the plane by transaction relationship: domain when it must be part of the command, integration when it should happen reliably after the command.
  • Integration delivery is at-least-once and unordered — make consumers idempotent; see Handling duplicates.
  • With the outbox, publish before SaveChanges so the event commits with your data.

On this page