In-Memory Pipeline
Audience: application developers evaluating Pipelinez for the first time.
What This Covers
This guide walks through the smallest useful Pipelinez setup:
- define a record
- add a segment
- build an in-memory pipeline
- start it
- publish a record
- complete it cleanly
Prerequisites
- .NET 8 SDK
- either:
- a consumer project referencing the
Pipelinezpackage - or a local clone of this repository
- a consumer project referencing the
Package install command:
dotnet add package Pipelinez
Build The Solution
From the repository root:
dotnet build src/Pipelinez.sln
Minimal Example
using Pipelinez.Core;
using Pipelinez.Core.Record;
using Pipelinez.Core.Segment;
public sealed class OrderRecord : PipelineRecord
{
public required string Id { get; init; }
public required decimal Total { get; set; }
}
public sealed class NormalizeOrderSegment : PipelineSegment<OrderRecord>
{
public override Task<OrderRecord> ExecuteAsync(OrderRecord record)
{
record.Total = decimal.Round(record.Total, 2);
return Task.FromResult(record);
}
}
var pipeline = Pipeline<OrderRecord>.New("orders")
.WithInMemorySource(new object())
.AddSegment(new NormalizeOrderSegment(), new object())
.WithInMemoryDestination("in-memory")
.Build();
pipeline.OnPipelineRecordCompleted += (_, args) =>
{
Console.WriteLine($"{args.Record.Id} completed with total {args.Record.Total}");
};
await pipeline.StartPipelineAsync();
await pipeline.PublishAsync(new OrderRecord { Id = "A-100", Total = 42.155m });
await pipeline.CompleteAsync();
await pipeline.Completion;
What Happens
Pipeline<T>.New("orders")creates aPipelineBuilder<T>.WithInMemorySource(...)adds a manual source that acceptsPublishAsync(...).AddSegment(...)inserts a transform step.WithInMemoryDestination(...)adds a sink.Build()validates the pipeline and links the runtime graph.StartPipelineAsync()activates the runtime.PublishAsync(...)sends a record into the source.CompleteAsync()stops accepting new work and lets in-flight records finish.Completioncompletes when the full pipeline run is done.
Important Behaviors
PublishAsync(...)beforeStartPipelineAsync()throws.CompleteAsync()beforeStartPipelineAsync()throws.- calling
StartPipelineAsync()twice throws. - awaiting
Completionis the safest way to know all downstream work is done.
Next Steps
- read Lifecycle
- read Error Handling
- read Performance
- read Kafka Pipeline when you want to move beyond in-memory flow