Hex Version Hex Docs CI Status Coverage Status
Intercept and transform Ecto repository operations using a middleware pipeline pattern.
EctoMiddleware provides a clean, composable way to add cross-cutting concerns to your Ecto operations. Inspired by Absinthe's middleware and Plug, it allows you to transform data before and after database operations, or replace operations entirely.
- Transform data before it reaches the database (normalization, validation, enrichment)
- Transform data after database operations (logging, notifications, caching)
- Replace operations entirely (soft deletes, read-through caching, authorization)
- Halt execution with authorization checks or validation failures
- Telemetry integration for observability
- Backwards compatible with v1.x middleware
Add ecto_middleware to your dependencies in mix.exs:
def deps do [ {:ecto_middleware, "~> 2.0"} ] end
defmodule MyApp.Repo do use Ecto.Repo, otp_app: :my_app, adapter: Ecto.Adapters.Postgres use EctoMiddleware.Repo # Define middleware for specific operations @impl EctoMiddleware.Repo def middleware(action, resource) when is_insert(action, resource) do [NormalizeEmail, HashPassword, AuditLog] end def middleware(action, resource) when is_delete(action, resource) do [SoftDelete, AuditLog] end def middleware(_action, _resource) do [AuditLog] end end
defmodule NormalizeEmail do use EctoMiddleware @impl EctoMiddleware def process_before(changeset, _resolution) do case Ecto.Changeset.fetch_change(changeset, :email) do {:ok, email} -> {:cont, Ecto.Changeset.put_change(changeset, :email, String.downcase(email))} :error -> {:cont, changeset} end end end
# Middleware runs automatically! %User{} |> User.changeset(%{name: "Alice", email: "ALICE@EXAMPLE.COM"}) |> MyApp.Repo.insert() # => Email is normalized to "alice@example.com" before insertion
Implement process_before/2 to transform data before the database operation:
defmodule AddTimestamp do use EctoMiddleware @impl EctoMiddleware def process_before(changeset, _resolution) do {:cont, Ecto.Changeset.put_change(changeset, :processed_at, DateTime.utc_now())} end end
Implement process_after/2 to process data after the database operation:
defmodule NotifyAdmin do use EctoMiddleware @impl EctoMiddleware def process_after({:ok, user} = result, _resolution) do Task.start(fn -> send_notification(user) end) {:cont, result} end def process_after(result, _resolution) do {:cont, result} end end
Implement process/2 to wrap around the entire operation:
defmodule MeasureLatency do use EctoMiddleware @impl EctoMiddleware def process(resource, resolution) do start_time = System.monotonic_time() # Yields control to the next middleware (or Repo operation) in the chain # before resuming here. {result, _updated_resolution} = yield(resource, resolution) duration = System.monotonic_time() - start_time Logger.info("#{resolution.action} took #{duration}ns") result end end
Important: When using process/2, you must call yield/2 to continue the middleware chain.
Return {:halt, value} to stop the middleware chain and return a value immediately:
defmodule RequireAuth do use EctoMiddleware @impl EctoMiddleware def process(resource, resolution) do if authorized?(resolution) do {result, _} = yield(resource, resolution) result else {:halt, {:error, :unauthorized}} end end defp authorized?(resolution) do get_private(resolution, :current_user) != nil end end
EctoMiddleware provides guards to detect operations, especially useful for insert_or_update:
defmodule ConditionalMiddleware do use EctoMiddleware @impl EctoMiddleware def process_before(changeset, resolution) when is_insert(changeset, resolution) do {:cont, add_created_metadata(changeset)} end def process_before(changeset, resolution) when is_update(changeset, resolution) do {:cont, add_updated_metadata(changeset)} end def process_before(changeset, _resolution) do {:cont, changeset} end end
Returning bare values from middleware is supported, but to be explicit, return one of:
{:cont, value}- Continue to next middleware{:halt, value}- Stop execution, returnvalue{:cont, value, updated_resolution}- Continue with updated resolution{:halt, value, updated_resolution}- Stop with updated resolution
Bare values are always treated as {:cont, value}.
EctoMiddleware emits telemetry events for observability:
-
[:ecto_middleware, :pipeline, :start]- Pipeline execution starts- Measurements:
%{system_time: integer()} - Metadata:
%{repo: module(), action: atom(), pipeline_id: reference()}
- Measurements:
-
[:ecto_middleware, :pipeline, :stop]- Pipeline execution completes- Measurements:
%{duration: integer()} - Metadata:
%{repo: module(), action: atom()}
- Measurements:
-
[:ecto_middleware, :pipeline, :exception]- Pipeline execution fails- Measurements:
%{duration: integer()} - Metadata:
%{repo: module(), action: atom(), kind: atom(), reason: term()}
- Measurements:
-
[:ecto_middleware, :middleware, :start]- Individual middleware starts- Measurements:
%{system_time: integer()} - Metadata:
%{middleware: module(), pipeline_id: reference()}
- Measurements:
-
[:ecto_middleware, :middleware, :stop]- Individual middleware completes- Measurements:
%{duration: integer()} - Metadata:
%{middleware: module(), result: :cont | :halt}
- Measurements:
-
[:ecto_middleware, :middleware, :exception]- Individual middleware fails- Measurements:
%{duration: integer()} - Metadata:
%{middleware: module(), kind: atom(), reason: term()}
- Measurements:
Example handler:
:telemetry.attach( "log-slow-middleware", [:ecto_middleware, :middleware, :stop], fn _event, %{duration: duration}, %{middleware: middleware}, _config -> if duration > 1_000_000 do # 1ms Logger.warn("Slow middleware: #{inspect(middleware)} took #{duration}ns") end end, nil )
V1 middleware continue to work but emit deprecation warnings. See the Migration Guide for details.
Key differences:
- Use
EctoMiddleware.Repoinstead ofEctoMiddlewarein Repo modules - Implement
process_before/2,process_after/2, orprocess/2instead ofmiddleware/2 - No need for
EctoMiddleware.Supermarker - Return
{:cont, value}or{:halt, value}instead of bare values
During migration, you can silence warnings temporarily:
# In config/config.exs config :ecto_middleware, :silence_deprecation_warnings, true
This should only be used during migration - deprecated APIs will be removed in v3.0.
EctoHooks - Adds before_* and after_* callbacks to Ecto schemas, similar to the old Ecto.Model callbacks. Implemented entirely using EctoMiddleware.
See the implementation for real-world middleware examples.
Contributions are welcome! Please feel free to submit a Pull Request.
MIT License. See LICENSE for details.