PostgreSQL Destination
Audience: application developers who want to write pipeline output or dead-letter records into PostgreSQL.
What This Covers
This guide shows the shape of the PostgreSQL transport as it exists today:
- PostgreSQL destination writes
- PostgreSQL dead-letter writes
- consumer-owned schema and table mapping
- custom SQL through Dapper-backed execution
Current Scope
The first PostgreSQL transport implementation is destination-only.
It supports:
WithPostgreSqlDestination(...)WithPostgreSqlDeadLetterDestination(...)
It does not currently support a PostgreSQL source.
Package
The PostgreSQL transport lives in:
src/Pipelinez.PostgreSql
If you are working from source, reference that project directly or pack it locally with the repository packaging workflow.
Minimal Destination Example
using Pipelinez.Core;
using Pipelinez.Core.Record;
using Pipelinez.PostgreSql;
using Pipelinez.PostgreSql.Configuration;
using Pipelinez.PostgreSql.Mapping;
var pipeline = Pipeline<OrderRecord>.New("orders")
.WithInMemorySource(new object())
.WithPostgreSqlDestination(
new PostgreSqlDestinationOptions
{
ConnectionString = "Host=localhost;Database=pipelinez;Username=postgres;Password=postgres"
},
PostgreSqlTableMap<OrderRecord>.ForTable("app", "processed_orders")
.Map("order_id", record => record.Id)
.MapJson("payload", record => record)
.Map("processed_at_utc", _ => DateTimeOffset.UtcNow))
.Build();
public sealed class OrderRecord : PipelineRecord
{
public required string Id { get; init; }
}
Custom SQL Example
var pipeline = Pipeline<OrderRecord>.New("orders")
.WithInMemorySource(new object())
.WithPostgreSqlDestination(
new PostgreSqlDestinationOptions
{
ConnectionString = "Host=localhost;Database=pipelinez;Username=postgres;Password=postgres"
},
record => new PostgreSqlCommandDefinition(
"""
INSERT INTO app.processed_orders (order_id, payload)
VALUES (@order_id, @payload::jsonb)
""",
new
{
order_id = record.Id,
payload = JsonSerializer.Serialize(record)
}))
.Build();
Dead-Letter Mapping Example
var pipeline = Pipeline<OrderRecord>.New("orders")
.WithInMemorySource(new object())
.WithPostgreSqlDestination(
destinationOptions,
PostgreSqlTableMap<OrderRecord>.ForTable("app", "processed_orders")
.Map("order_id", record => record.Id)
.MapJson("payload", record => record))
.WithPostgreSqlDeadLetterDestination(
deadLetterOptions,
PostgreSqlTableMap<PipelineDeadLetterRecord<OrderRecord>>.ForTable("audit", "order_failures")
.Map("order_id", deadLetter => deadLetter.Record.Id)
.Map("fault_component", deadLetter => deadLetter.Fault.ComponentName)
.MapJson("record_json", deadLetter => deadLetter.Record)
.Map("dead_lettered_at_utc", deadLetter => deadLetter.DeadLetteredAtUtc))
.WithErrorHandler(_ => PipelineErrorAction.DeadLetter)
.Build();
Connection Configuration
The PostgreSQL transport supports:
ConnectionStringConfigureConnectionStringConfigureDataSource- externally supplied
NpgsqlDataSource
That gives consumers full control over pooling, timeouts, type mappings, and driver configuration without forcing Pipelinez to own connection setup.
Next Steps
- read PostgreSQL Transport
- read Architecture: PostgreSQL
- read Dead-Lettering