11package org .reactivecommons .async .rabbit .listeners ;
22
33import com .rabbitmq .client .AMQP ;
4- import com .rabbitmq .client .Channel ;
5- import com .rabbitmq .client .ShutdownSignalException ;
64import lombok .extern .java .Log ;
75import org .reactivecommons .async .commons .DiscardNotifier ;
86import org .reactivecommons .async .commons .FallbackStrategy ;
1210import org .reactivecommons .async .rabbit .RabbitMessage ;
1311import org .reactivecommons .async .rabbit .communications .ReactiveMessageListener ;
1412import org .reactivecommons .async .rabbit .communications .TopologyCreator ;
15- import reactor .core .Disposable ;
1613import reactor .core .publisher .Flux ;
1714import reactor .core .publisher .Mono ;
1815import reactor .core .scheduler .Scheduler ;
2825import java .util .List ;
2926import java .util .Optional ;
3027import java .util .concurrent .ConcurrentHashMap ;
31- import java .util .concurrent .atomic .AtomicReference ;
3228import java .util .function .Function ;
3329import java .util .logging .Level ;
3430
@@ -53,8 +49,7 @@ public abstract class GenericMessageListener {
5349 private final DiscardNotifier discardNotifier ;
5450 private final String objectType ;
5551 private final CustomReporter customReporter ;
56- private final AtomicReference <Channel > channelRef = new AtomicReference <>();
57- private Disposable listenerSubscription ;
52+ private volatile Flux <AcknowledgableDelivery > messageFlux ;
5853
5954 public GenericMessageListener (String queueName , ReactiveMessageListener listener , boolean useDLQRetries ,
6055 boolean createTopology , long maxRetries , long retryDelay , DiscardNotifier discardNotifier ,
@@ -83,51 +78,43 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
8378 return Mono .empty ();
8479 }
8580
86- public synchronized void startListener () {
87- Channel current = channelRef .get ();
88- if (current != null && current .isOpen ()) {
89- log .warning ("Channel is already open, no need to restart listener" );
90- return ;
91- }
92- stopListener ();
81+ public void startListener () {
9382 log .log (Level .INFO , "Using max concurrency {0}, in queue: {1}" , new Object []{messageListener .getMaxConcurrency (), queueName });
9483 if (useDLQRetries ) {
9584 log .log (Level .INFO , "ATTENTION! Using DLQ Strategy for retries with {0} + 1 Max Retries configured!" , new Object []{maxRetries });
9685 } else {
9786 log .log (Level .INFO , "ATTENTION! Using infinite fast retries as Retry Strategy" );
9887 }
99- var baseSubscriber = new LoggerSubscriber <>(getClass ().getName ());
100- listenerSubscription = baseSubscriber ;
101- Flux .defer (this ::buildConsumeFlux )
102- .subscribe (baseSubscriber );
103- }
10488
105-
106- private Flux <AcknowledgableDelivery > buildConsumeFlux () {
107- ConsumeOptions options = new ConsumeOptions ()
89+ ConsumeOptions consumeOptions = new ConsumeOptions ()
10890 .qos (messageListener .getPrefetchCount ())
109- .channelCallback (channel -> {
110- channelRef .set (channel );
111- channel .addShutdownListener (this ::onChannelShutdown );
112- });
113-
114- Flux <AcknowledgableDelivery > source = createTopology
115- ? setUpBindings (messageListener .getTopologyCreator ())
116- .thenMany (receiver .consumeManualAck (queueName , options ))
117- : receiver .consumeManualAck (queueName , options );
91+ .channelCallback (channel -> channel .addShutdownListener (cause -> {
92+ log .log (Level .WARNING , cause , () -> "Channel shutdown detected in queue " + queueName
93+ + " channel open: " + channel .isOpen () +
94+ " connection open: " + channel .getConnection ().isOpen ());
95+ if (channel .getConnection ().isOpen () && !channel .isOpen ()) {
96+ log .warning ("Recovering listener for queue: " + queueName );
97+ onTerminate ();
98+ }
99+ }));
100+
101+ if (createTopology ) {
102+ this .messageFlux = setUpBindings (messageListener .getTopologyCreator ())
103+ .thenMany (receiver .consumeManualAck (queueName , consumeOptions )
104+ .transform (this ::consumeFaultTolerant ));
105+ } else {
106+ this .messageFlux = receiver .consumeManualAck (queueName , consumeOptions )
107+ .doOnError (err -> log .log (Level .SEVERE , "Error listening queue" , err ))
108+ .transform (this ::consumeFaultTolerant );
109+ }
118110
119- return source .transform (this ::consumeFaultTolerant );
120- }
121111
122- private void onChannelShutdown (ShutdownSignalException cause ) {
123- log .log (Level .SEVERE , "Channel shutdown detected in listener" , cause );
124- startListener ();
112+ onTerminate ();
125113 }
126114
127- public synchronized void stopListener () {
128- if (listenerSubscription != null && !listenerSubscription .isDisposed ()) {
129- listenerSubscription .dispose ();
130- }
115+ private void onTerminate () {
116+ messageFlux .doOnTerminate (this ::onTerminate )
117+ .subscribe (new LoggerSubscriber <>(getClass ().getName ()));
131118 }
132119
133120 protected Mono <AcknowledgableDelivery > handle (AcknowledgableDelivery msj , Instant initTime ) {
0 commit comments