[jboss-user] [JBoss Messaging] - JMS Reconnect Issue
bfach
do-not-reply at jboss.com
Fri Oct 10 11:50:50 EDT 2008
Hi,
I am using JBoss 4.2.3.GA with 1.4.0.SP3. I upgraded both in hopes to fix the following issue.
In the event that a standalone jms client is connected to multiple JBoss AS via the ClusterConnectionFactory and a server dies, the client will failover to the remaining servers.
The problem I am trying to solve is if you have 1 server in the cluster at the time. I have found that using an exception listener works reliably if you are using ConnectionFactory but not for ClusteredConnectionFactory.
If ConnectionFactory is looked up, the connection is reestablished if the server goes down and comes back. If ClusteredConnectionFactory is used, the connection is reestablished the first time the single server dies but fails on the second attempt.
Here is the code:
public class JMSConnection implements Runnable {
protected static Logger log = Logger.getLogger(JMSConnection.class);
protected static AtomicInteger msgcount = new AtomicInteger();
private Session session;
private Queue queue;
private String dest;
private BlockingQueue bq;
private final static ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(1);
private Connection connection = null;
protected static String sourceName; // the name of the client
final static int NUM_RETRIES = 1000000000;
final static int RETRY_INTERVAL = 1000;
private long id = System.nanoTime();
volatile boolean doSend = true;
ConnectionFactory connectionFactory;
MessageProducer producer;
static {
/* The delay between */
final long delay = 1;
/* The period to send */
final long period = 1;
/*
* Set logging level to debug to have the scheduler report the number of
* messages being sent by each thread
*/
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
if (log.isInfoEnabled()) {
// if (connected) {
log.info(sourceName + ": sent " + msgcount + " per "
+ period + " seconds.");
msgcount.set(0);
// } else {
// log.info("Problem with JMS Connection from "
// + sourceName
// + "! Please check jboss application server");
// }
}
}
}, delay, period, TimeUnit.SECONDS);
}
/**
* // * // * @param dest // * The name of the destination // * @param bq //
* * The blocking queue // * @param m_sourceName // * The source name // * @throws
* Exception //
*/
public JMSConnection(String dest, BlockingQueue bq,
String m_sourceName) throws Exception {
this.dest = dest;
this.bq = bq;
sourceName = m_sourceName;
init();
log.info("JMS Connection");
}
/**
* // * // * @throws Exception //
*/
private void init() throws Exception {
setUpJMS();
}
public boolean setUpJMS() {
InitialContext ic;
try {
ic = new InitialContext();
connectionFactory = (ConnectionFactory) ic
.lookup("ClusteredConnectionFactory");
queue = (Queue) ic.lookup("queue/RouteManagerQueue");
connection = connectionFactory.createConnection();
try {
log.debug("Connection created ...");
// KEY - register for exception callbacks
connection.setExceptionListener(new ExceptionListenerImpl());
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
log.debug("Session created ...");
producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
log.debug("Producer created ...");
return true;
} catch (Exception e) {
// We failed so close the connection
try {
connection.close();
} catch (JMSException ignored) {
// Pointless
}
// Rethrow the initial problem to where we will log it
throw e;
} finally {
// And close the initial context
// We don't want to wait for the garbage collector to close it
// otherwise we'll have useless hanging network connections
ic.close();
}
} catch (Exception e) {
log.error("Error setting up JMS", e);
return false;
}
}
public void run() {
int cnt = 0;
while (doSend) {
try {
Serializable s = bq.take();
ObjectMessage message = session.createObjectMessage();
message.setObject(s);
producer.send(message);
msgcount.incrementAndGet();
log.info("Sending a attribute from " + id);
} catch (Exception e) {
log.error("Problem sending message " + e.getMessage(), e);
}
}
}
private class ExceptionListenerImpl implements ExceptionListener {
public void onException(JMSException e) {
for (int i = 0; i < NUM_RETRIES; i++) {
log
.warn("Connection has problems, trying to re-create it, attempt "
+ (i + 1) + " ...");
try {
connection.close(); // unregisters the ExceptionListener
} catch (Exception e2) {
// I will get an Exception anyway, since the connection to
// the server is
// broken, but close() frees up resources associated with
// the connection
}
boolean setupOK = setUpJMS();
if (setupOK) {
log.info("Connection re-established");
return;
} else {
log.warn("Re-creating connection failed, retrying ...");
}
}
log.error("Cannot re-establish connection, giving up ...");
doSend = false;
}
}
I am using the remoting jar that comes with 4.2.3.GA, and the messaging-cleint jar that comes with 1.4.0.SP3
Thanks in advance,
View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4181558#4181558
Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4181558
More information about the jboss-user
mailing list