[hornetq-commits] JBoss hornetq SVN: r10323 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Sat Mar 12 21:22:04 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-03-12 21:22:03 -0500 (Sat, 12 Mar 2011)
New Revision: 10323
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
Resending metadata on failover/reconnection
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-11 21:33:07 UTC (rev 10322)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-13 02:22:03 UTC (rev 10323)
@@ -13,6 +13,7 @@
package org.hornetq.core.client.impl;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -109,6 +110,8 @@
// Attributes ----------------------------------------------------------------------------
+ private Map<String, String> metadata = new HashMap<String, String>();
+
private final ClientSessionFactoryInternal sessionFactory;
private final String name;
@@ -155,7 +158,7 @@
private final boolean blockOnDurableSend;
private final int minLargeMessageSize;
-
+
private final boolean compressLargeMessages;
private volatile int initialMessagePacketSize;
@@ -184,7 +187,7 @@
private final String groupID;
private volatile boolean inClose;
-
+
private volatile SimpleString defaultAddress;
// Constructors ----------------------------------------------------------------------------
@@ -262,7 +265,7 @@
this.cacheLargeMessageClient = cacheLargeMessageClient;
this.minLargeMessageSize = minLargeMessageSize;
-
+
this.compressLargeMessages = compressLargeMessages;
this.initialMessagePacketSize = initialMessagePacketSize;
@@ -274,7 +277,7 @@
// ClientSession implementation
// -----------------------------------------------------------------
-
+
public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException
{
internalCreateQueue(address, queueName, null, false, false);
@@ -649,7 +652,7 @@
{
stop(true);
}
-
+
public void stop(final boolean waitForOnMessage) throws HornetQException
{
checkClosed();
@@ -689,7 +692,7 @@
{
return minLargeMessageSize;
}
-
+
public boolean isCompressLargeMessages()
{
return compressLargeMessages;
@@ -875,197 +878,222 @@
// Needs to be synchronized to prevent issues with occurring concurrently with close()
- public synchronized void handleFailover(final CoreRemotingConnection backupConnection)
+ public void handleFailover(final CoreRemotingConnection backupConnection)
{
- if (closed)
+ synchronized (this)
{
- return;
- }
+ if (closed)
+ {
+ return;
+ }
- boolean resetCreditManager = false;
+ boolean resetCreditManager = false;
- // We lock the channel to prevent any packets to be added to the resend
- // cache during the failover process
- channel.lock();
- try
- {
- channel.transferConnection(backupConnection);
+ // We lock the channel to prevent any packets to be added to the resend
+ // cache during the failover process
+ channel.lock();
+ try
+ {
+ channel.transferConnection(backupConnection);
- backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
+ backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
- remotingConnection = backupConnection;
+ remotingConnection = backupConnection;
- int lcid = channel.getLastConfirmedCommandID();
+ int lcid = channel.getLastConfirmedCommandID();
- Packet request = new ReattachSessionMessage(name, lcid);
+ Packet request = new ReattachSessionMessage(name, lcid);
- Channel channel1 = backupConnection.getChannel(1, -1);
+ Channel channel1 = backupConnection.getChannel(1, -1);
- ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
+ ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
- if (response.isReattached())
- {
- // The session was found on the server - we reattached transparently ok
+ if (response.isReattached())
+ {
+ // The session was found on the server - we reattached transparently ok
- channel.replayCommands(response.getLastConfirmedCommandID());
- }
- else
- {
+ channel.replayCommands(response.getLastConfirmedCommandID());
+ }
+ else
+ {
- // The session wasn't found on the server - probably we're failing over onto a backup server where the
- // session won't exist or the target server has been restarted - in this case the session will need to be
- // recreated,
- // and we'll need to recreate any consumers
+ // The session wasn't found on the server - probably we're failing over onto a backup server where the
+ // session won't exist or the target server has been restarted - in this case the session will need to be
+ // recreated,
+ // and we'll need to recreate any consumers
- // It could also be that the server hasn't been restarted, but the session is currently executing close, and
- // that
- // has already been executed on the server, that's why we can't find the session- in this case we *don't*
- // want
- // to recreate the session, we just want to unblock the blocking call
- if (!inClose)
- {
- Packet createRequest = new CreateSessionMessage(name,
- channel.getID(),
- version,
- username,
- password,
- minLargeMessageSize,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- confirmationWindowSize,
- defaultAddress == null ? null
- : defaultAddress.toString());
- boolean retry = false;
- do
+ // It could also be that the server hasn't been restarted, but the session is currently executing close,
+ // and
+ // that
+ // has already been executed on the server, that's why we can't find the session- in this case we *don't*
+ // want
+ // to recreate the session, we just want to unblock the blocking call
+ if (!inClose)
{
- try
+ Packet createRequest = new CreateSessionMessage(name,
+ channel.getID(),
+ version,
+ username,
+ password,
+ minLargeMessageSize,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ confirmationWindowSize,
+ defaultAddress == null ? null
+ : defaultAddress.toString());
+ boolean retry = false;
+ do
{
- channel1.sendBlocking(createRequest);
- retry = false;
- }
- catch (HornetQException e)
- {
- // the session was created while its server was starting, retry it:
- if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+ try
{
- ClientSessionImpl.log.warn("Server is starting, retry to create the session " + name);
- retry = true;
- // sleep a little bit to avoid spinning too much
- Thread.sleep(10);
+ channel1.sendBlocking(createRequest);
+ retry = false;
}
- else
+ catch (HornetQException e)
{
- throw e;
+ // the session was created while its server was starting, retry it:
+ if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+ {
+ ClientSessionImpl.log.warn("Server is starting, retry to create the session " + name);
+ retry = true;
+ // sleep a little bit to avoid spinning too much
+ Thread.sleep(10);
+ }
+ else
+ {
+ throw e;
+ }
}
}
- }
- while (retry);
+ while (retry);
- channel.clearCommands();
+ channel.clearCommands();
- for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
- {
- SessionQueueQueryResponseMessage queueInfo = entry.getValue().getQueueInfo();
-
- // We try and recreate any non durable queues, since they probably won't be there unless
- // they are defined in hornetq-configuration.xml
- // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
- if (!queueInfo.isDurable())
+ for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
{
- CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(),
- queueInfo.getName(),
- queueInfo.getFilterString(),
- false,
- queueInfo.isTemporary(),
- false);
+ SessionQueueQueryResponseMessage queueInfo = entry.getValue().getQueueInfo();
- sendPacketWithoutLock(createQueueRequest);
- }
+ // We try and recreate any non durable queues, since they probably won't be there unless
+ // they are defined in hornetq-configuration.xml
+ // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
+ if (!queueInfo.isDurable())
+ {
+ CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(),
+ queueInfo.getName(),
+ queueInfo.getFilterString(),
+ false,
+ queueInfo.isTemporary(),
+ false);
- SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
- entry.getValue()
- .getQueueName(),
- entry.getValue()
- .getFilterString(),
- entry.getValue()
- .isBrowseOnly(),
- false);
+ sendPacketWithoutLock(createQueueRequest);
+ }
- sendPacketWithoutLock(createConsumerRequest);
+ SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
+ entry.getValue()
+ .getQueueName(),
+ entry.getValue()
+ .getFilterString(),
+ entry.getValue()
+ .isBrowseOnly(),
+ false);
- int clientWindowSize = entry.getValue().getClientWindowSize();
+ sendPacketWithoutLock(createConsumerRequest);
- if (clientWindowSize != 0)
- {
- SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
- clientWindowSize);
+ int clientWindowSize = entry.getValue().getClientWindowSize();
- sendPacketWithoutLock(packet);
+ if (clientWindowSize != 0)
+ {
+ SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
+ clientWindowSize);
+
+ sendPacketWithoutLock(packet);
+ }
+ else
+ {
+ // https://jira.jboss.org/browse/HORNETQ-522
+ SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
+ 1);
+ sendPacketWithoutLock(packet);
+ }
}
- else
+
+ if ((!autoCommitAcks || !autoCommitSends) && workDone)
{
- //https://jira.jboss.org/browse/HORNETQ-522
- SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
- 1);
- sendPacketWithoutLock(packet);
+ // Session is transacted - set for rollback only
+ // FIXME - there is a race condition here - a commit could sneak in before this is set
+ rollbackOnly = true;
}
- }
- if ((!autoCommitAcks || !autoCommitSends) && workDone)
- {
- // Session is transacted - set for rollback only
- // FIXME - there is a race condition here - a commit could sneak in before this is set
- rollbackOnly = true;
- }
-
- // Now start the session if it was already started
- if (started)
- {
- for (ClientConsumerInternal consumer : consumers.values())
+ // Now start the session if it was already started
+ if (started)
{
- consumer.clearAtFailover();
- consumer.start();
- }
+ for (ClientConsumerInternal consumer : consumers.values())
+ {
+ consumer.clearAtFailover();
+ consumer.start();
+ }
- Packet packet = new PacketImpl(PacketImpl.SESS_START);
+ Packet packet = new PacketImpl(PacketImpl.SESS_START);
- packet.setChannelID(channel.getID());
+ packet.setChannelID(channel.getID());
- Connection conn = channel.getConnection().getTransportConnection();
+ Connection conn = channel.getConnection().getTransportConnection();
- HornetQBuffer buffer = packet.encode(channel.getConnection());
+ HornetQBuffer buffer = packet.encode(channel.getConnection());
- conn.write(buffer, false, false);
+ conn.write(buffer, false, false);
+ }
+
+ resetCreditManager = true;
}
- resetCreditManager = true;
+ channel.returnBlocking();
}
- channel.returnBlocking();
+ channel.setTransferring(false);
}
+ catch (Throwable t)
+ {
+ ClientSessionImpl.log.error("Failed to handle failover", t);
+ }
+ finally
+ {
+ channel.unlock();
+ }
- channel.setTransferring(false);
+ if (resetCreditManager)
+ {
+ producerCreditManager.reset();
+
+ // Also need to send more credits for consumers, otherwise the system could hand with the server
+ // not having any credits to send
+ }
}
- catch (Throwable t)
+
+ // Resetting the metadata after failover
+ try
{
- ClientSessionImpl.log.error("Failed to handle failover", t);
+ for (Map.Entry<String, String> entries : metadata.entrySet())
+ {
+ addMetaData(entries.getKey(), entries.getValue());
+ }
}
- finally
+ catch (HornetQException e)
{
- channel.unlock();
- }
- if (resetCreditManager)
- {
- producerCreditManager.reset();
+ log.warn("Error on resending metadata: " + metadata, e);
- // Also need to send more credits for consumers, otherwise the system could hand with the server
- // not having any credits to send
}
}
-
+
+ public void addMetaData(String key, String data) throws HornetQException
+ {
+ metadata.put(key, data);
+ channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
+ }
+
public ClientSessionFactoryInternal getSessionFactory()
{
return sessionFactory;
@@ -1735,8 +1763,6 @@
}
}
-
-
private static class BindingQueryImpl implements BindingQuery
{
@@ -1823,9 +1849,4 @@
}
}
-
- public void addMetaData(String key, String data) throws HornetQException
- {
- channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
- }
}
More information about the hornetq-commits
mailing list