Flow Control
Audience: application developers publishing records manually into a busy pipeline.
What This Covers
- pipeline saturation behavior
PipelineFlowControlOptionsPipelinePublishOptionsPipelinePublishResult
Configure Flow Control
using Pipelinez.Core.FlowControl;
using Pipelinez.Core.Performance;
var pipeline = Pipeline<MyRecord>.New("orders")
.UsePerformanceOptions(new PipelinePerformanceOptions
{
SourceExecution = new PipelineExecutionOptions { BoundedCapacity = 100 },
DestinationExecution = new PipelineExecutionOptions { BoundedCapacity = 100 }
})
.UseFlowControlOptions(new PipelineFlowControlOptions
{
OverflowPolicy = PipelineOverflowPolicy.Wait,
PublishTimeout = TimeSpan.FromSeconds(5),
SaturationWarningThreshold = 0.8
})
.WithInMemorySource(new object())
.WithInMemoryDestination("config")
.Build();
Per-Publish Override
var result = await pipeline.PublishAsync(
new MyRecord(),
new PipelinePublishOptions
{
Timeout = TimeSpan.FromSeconds(2),
OverflowPolicyOverride = PipelineOverflowPolicy.Reject
});
if (!result.Accepted)
{
Console.WriteLine(result.Reason);
}
Overflow Policies
Waitwait for capacityRejectreject the publish immediately when saturatedCancelallow a cancellation token to end the wait
Observability
Flow-control signals are exposed through:
GetStatus().FlowControlStatusOnSaturationChangedOnPublishRejected- publish wait and rejection counters in
GetPerformanceSnapshot()