Conduit is a high-performance, low-latency reactive framework built on top of the LMAX Disruptor. It provides a type-safe, fluent API for building event-driven data processing pipelines that can handle multiple input sources with different data types.
Architecture
Core Components
1. Dispatcher
The Dispatcher interface is responsible for dispatching events to registered listeners.
publicclassFinancialProcessorextendsDisruptorNode3<Price,Rate,Volume>{privatedoublecurrentPrice;privatedoublecurrentRate;privatelongcurrentVolume;@OverrideprotectedvoidonEvent1(Priceprice){currentPrice=price.getValue();calculateMetrics();}@OverrideprotectedvoidonEvent2(Raterate){currentRate=rate.getValue();calculateMetrics();}@OverrideprotectedvoidonEvent3(Volumevolume){currentVolume=volume.getAmount();calculateMetrics();}privatevoidcalculateMetrics(){// Combine all inputs to calculate trading metricsdoublemetric=currentPrice*currentRate*currentVolume;System.out.println("Calculated metric: "+metric);}}// SetupEventDispatcher<Price>priceDispatcher=EventDispatcher.create();EventDispatcher<Rate>rateDispatcher=EventDispatcher.create();EventDispatcher<Volume>volumeDispatcher=EventDispatcher.create();FinancialProcessorprocessor=newFinancialProcessor();processor.subscribe1(priceDispatcher).subscribe2(rateDispatcher).subscribe3(volumeDispatcher);processor.start();
Node Types Comparison
Plain Node (Node2)
Characteristics:
- Each event handler runs on the dispatcher's thread
- If two dispatchers call from different threads, handlers can run concurrently
- No built-in synchronization
Best for:
- Independent event handlers with no shared state
- Maximum parallelism across multiple CPU cores
- CPU-bound processing where each handler can run independently
Disruptor Node (DisruptorNode2)
Characteristics:
- All events processed in a single thread (lock-free)
- Events from all sources go through the Disruptor ring buffer
- Guaranteed ordering and no race conditions
Best for:
- Handlers that share state
- Low-latency requirements
- Need to coordinate or order events from different sources
- Lightweight, fast processing
Fluent API
The Conduit framework provides a fluent API for easy pipeline construction:
publicclassCustomNodeextendsDisruptorNode2<Integer,String>{publicCustomNode(){super(1024,// Ring buffer size (must be power of 2)newBusySpinWaitStrategy()// Wait strategy);}// ... event handlers}
Wait Strategies
Choose based on your latency vs. CPU usage trade-off:
Strategy
Latency
CPU Usage
Use Case
BusySpinWaitStrategy
Lowest
Highest
Ultra-low latency trading
YieldingWaitStrategy
Low
High
Low latency, some CPU yield
SleepingWaitStrategy
Medium
Medium
Balanced applications
BlockingWaitStrategy
Highest
Lowest
CPU-conscious applications
Advanced Patterns
Pipeline Chaining
Create complex processing pipelines by chaining nodes:
// Stage 1: Data ingestionDisruptorNode2<RawData1,RawData2>ingestionNode=newIngestionNode();// Stage 2: Data enrichmentEventDispatcher<EnrichedData>enrichedDispatcher=EventDispatcher.create();DisruptorNode1<EnrichedData>enrichmentNode=newEnrichmentNode();// Stage 3: AnalyticsDisruptorNode1<AnalyticsResult>analyticsNode=newAnalyticsNode();// Connect the pipelineingestionNode.subscribe1(rawData1Dispatcher).subscribe2(rawData2Dispatcher);ingestionNode.start();enrichmentNode.subscribe1(enrichedDispatcher);enrichmentNode.start();analyticsNode.subscribe1(analyticsDispatcher);analyticsNode.start();
EventDispatcher<MarketData>marketDataDispatcher=EventDispatcher.create();// Multiple nodes subscribe to the same dispatcherDisruptorNode1<MarketData>riskNode=newRiskCalculationNode();DisruptorNode1<MarketData>analyticsNode=newAnalyticsNode();DisruptorNode1<MarketData>auditNode=newAuditNode();riskNode.subscribe1(marketDataDispatcher).start();analyticsNode.subscribe1(marketDataDispatcher).start();auditNode.subscribe1(marketDataDispatcher).start();
publicclassRobustNodeextendsDisruptorNode2<Integer,String>{@OverrideprotectedvoidonEvent1(Integerevent){try{// Process eventprocessInteger(event);}catch(Exceptione){handleError(e,event);}}privatevoidhandleError(Exceptione,Objectevent){// Log error, send to dead letter queue, etc.logger.error("Error processing event: "+event,e);}}
Best Practices
Choose the Right Node Type
Use DisruptorNode for low-latency, ordered processing
Use plain Node for independent, parallel processing
Keep Event Handlers Fast
Avoid blocking operations (I/O, network calls)
Offload heavy computations to worker threads if needed
Pre-allocate Resources
Initialize objects in constructor, not in event handlers