Author: clebert.suconic(a)jboss.com
Date: 2011-03-12 21:22:03 -0500 (Sat, 12 Mar 2011)
New Revision: 10323
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
Resending metadata on failover/reconnection
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-11
21:33:07 UTC (rev 10322)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-13
02:22:03 UTC (rev 10323)
@@ -13,6 +13,7 @@
package org.hornetq.core.client.impl;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -109,6 +110,8 @@
// Attributes
----------------------------------------------------------------------------
+ private Map<String, String> metadata = new HashMap<String, String>();
+
private final ClientSessionFactoryInternal sessionFactory;
private final String name;
@@ -155,7 +158,7 @@
private final boolean blockOnDurableSend;
private final int minLargeMessageSize;
-
+
private final boolean compressLargeMessages;
private volatile int initialMessagePacketSize;
@@ -184,7 +187,7 @@
private final String groupID;
private volatile boolean inClose;
-
+
private volatile SimpleString defaultAddress;
// Constructors
----------------------------------------------------------------------------
@@ -262,7 +265,7 @@
this.cacheLargeMessageClient = cacheLargeMessageClient;
this.minLargeMessageSize = minLargeMessageSize;
-
+
this.compressLargeMessages = compressLargeMessages;
this.initialMessagePacketSize = initialMessagePacketSize;
@@ -274,7 +277,7 @@
// ClientSession implementation
// -----------------------------------------------------------------
-
+
public void createQueue(final SimpleString address, final SimpleString queueName)
throws HornetQException
{
internalCreateQueue(address, queueName, null, false, false);
@@ -649,7 +652,7 @@
{
stop(true);
}
-
+
public void stop(final boolean waitForOnMessage) throws HornetQException
{
checkClosed();
@@ -689,7 +692,7 @@
{
return minLargeMessageSize;
}
-
+
public boolean isCompressLargeMessages()
{
return compressLargeMessages;
@@ -875,197 +878,222 @@
// Needs to be synchronized to prevent issues with occurring concurrently with
close()
- public synchronized void handleFailover(final CoreRemotingConnection
backupConnection)
+ public void handleFailover(final CoreRemotingConnection backupConnection)
{
- if (closed)
+ synchronized (this)
{
- return;
- }
+ if (closed)
+ {
+ return;
+ }
- boolean resetCreditManager = false;
+ boolean resetCreditManager = false;
- // We lock the channel to prevent any packets to be added to the resend
- // cache during the failover process
- channel.lock();
- try
- {
- channel.transferConnection(backupConnection);
+ // We lock the channel to prevent any packets to be added to the resend
+ // cache during the failover process
+ channel.lock();
+ try
+ {
+ channel.transferConnection(backupConnection);
-
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
+
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
- remotingConnection = backupConnection;
+ remotingConnection = backupConnection;
- int lcid = channel.getLastConfirmedCommandID();
+ int lcid = channel.getLastConfirmedCommandID();
- Packet request = new ReattachSessionMessage(name, lcid);
+ Packet request = new ReattachSessionMessage(name, lcid);
- Channel channel1 = backupConnection.getChannel(1, -1);
+ Channel channel1 = backupConnection.getChannel(1, -1);
- ReattachSessionResponseMessage response =
(ReattachSessionResponseMessage)channel1.sendBlocking(request);
+ ReattachSessionResponseMessage response =
(ReattachSessionResponseMessage)channel1.sendBlocking(request);
- if (response.isReattached())
- {
- // The session was found on the server - we reattached transparently ok
+ if (response.isReattached())
+ {
+ // The session was found on the server - we reattached transparently ok
- channel.replayCommands(response.getLastConfirmedCommandID());
- }
- else
- {
+ channel.replayCommands(response.getLastConfirmedCommandID());
+ }
+ else
+ {
- // The session wasn't found on the server - probably we're failing
over onto a backup server where the
- // session won't exist or the target server has been restarted - in this
case the session will need to be
- // recreated,
- // and we'll need to recreate any consumers
+ // The session wasn't found on the server - probably we're failing
over onto a backup server where the
+ // session won't exist or the target server has been restarted - in
this case the session will need to be
+ // recreated,
+ // and we'll need to recreate any consumers
- // It could also be that the server hasn't been restarted, but the
session is currently executing close, and
- // that
- // has already been executed on the server, that's why we can't find
the session- in this case we *don't*
- // want
- // to recreate the session, we just want to unblock the blocking call
- if (!inClose)
- {
- Packet createRequest = new CreateSessionMessage(name,
- channel.getID(),
- version,
- username,
- password,
- minLargeMessageSize,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- confirmationWindowSize,
- defaultAddress == null ?
null
- :
defaultAddress.toString());
- boolean retry = false;
- do
+ // It could also be that the server hasn't been restarted, but the
session is currently executing close,
+ // and
+ // that
+ // has already been executed on the server, that's why we can't
find the session- in this case we *don't*
+ // want
+ // to recreate the session, we just want to unblock the blocking call
+ if (!inClose)
{
- try
+ Packet createRequest = new CreateSessionMessage(name,
+ channel.getID(),
+ version,
+ username,
+ password,
+ minLargeMessageSize,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+
confirmationWindowSize,
+ defaultAddress == null
? null
+ :
defaultAddress.toString());
+ boolean retry = false;
+ do
{
- channel1.sendBlocking(createRequest);
- retry = false;
- }
- catch (HornetQException e)
- {
- // the session was created while its server was starting, retry it:
- if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+ try
{
- ClientSessionImpl.log.warn("Server is starting, retry to
create the session " + name);
- retry = true;
- // sleep a little bit to avoid spinning too much
- Thread.sleep(10);
+ channel1.sendBlocking(createRequest);
+ retry = false;
}
- else
+ catch (HornetQException e)
{
- throw e;
+ // the session was created while its server was starting, retry
it:
+ if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+ {
+ ClientSessionImpl.log.warn("Server is starting, retry to
create the session " + name);
+ retry = true;
+ // sleep a little bit to avoid spinning too much
+ Thread.sleep(10);
+ }
+ else
+ {
+ throw e;
+ }
}
}
- }
- while (retry);
+ while (retry);
- channel.clearCommands();
+ channel.clearCommands();
- for (Map.Entry<Long, ClientConsumerInternal> entry :
consumers.entrySet())
- {
- SessionQueueQueryResponseMessage queueInfo =
entry.getValue().getQueueInfo();
-
- // We try and recreate any non durable queues, since they probably
won't be there unless
- // they are defined in hornetq-configuration.xml
- // This allows e.g. JMS non durable subs and temporary queues to
continue to be used after failover
- if (!queueInfo.isDurable())
+ for (Map.Entry<Long, ClientConsumerInternal> entry :
consumers.entrySet())
{
- CreateQueueMessage createQueueRequest = new
CreateQueueMessage(queueInfo.getAddress(),
-
queueInfo.getName(),
-
queueInfo.getFilterString(),
-
false,
-
queueInfo.isTemporary(),
-
false);
+ SessionQueueQueryResponseMessage queueInfo =
entry.getValue().getQueueInfo();
- sendPacketWithoutLock(createQueueRequest);
- }
+ // We try and recreate any non durable queues, since they probably
won't be there unless
+ // they are defined in hornetq-configuration.xml
+ // This allows e.g. JMS non durable subs and temporary queues to
continue to be used after failover
+ if (!queueInfo.isDurable())
+ {
+ CreateQueueMessage createQueueRequest = new
CreateQueueMessage(queueInfo.getAddress(),
+
queueInfo.getName(),
+
queueInfo.getFilterString(),
+
false,
+
queueInfo.isTemporary(),
+
false);
- SessionCreateConsumerMessage createConsumerRequest = new
SessionCreateConsumerMessage(entry.getKey(),
-
entry.getValue()
-
.getQueueName(),
-
entry.getValue()
-
.getFilterString(),
-
entry.getValue()
-
.isBrowseOnly(),
-
false);
+ sendPacketWithoutLock(createQueueRequest);
+ }
- sendPacketWithoutLock(createConsumerRequest);
+ SessionCreateConsumerMessage createConsumerRequest = new
SessionCreateConsumerMessage(entry.getKey(),
+
entry.getValue()
+
.getQueueName(),
+
entry.getValue()
+
.getFilterString(),
+
entry.getValue()
+
.isBrowseOnly(),
+
false);
- int clientWindowSize = entry.getValue().getClientWindowSize();
+ sendPacketWithoutLock(createConsumerRequest);
- if (clientWindowSize != 0)
- {
- SessionConsumerFlowCreditMessage packet = new
SessionConsumerFlowCreditMessage(entry.getKey(),
-
clientWindowSize);
+ int clientWindowSize = entry.getValue().getClientWindowSize();
- sendPacketWithoutLock(packet);
+ if (clientWindowSize != 0)
+ {
+ SessionConsumerFlowCreditMessage packet = new
SessionConsumerFlowCreditMessage(entry.getKey(),
+
clientWindowSize);
+
+ sendPacketWithoutLock(packet);
+ }
+ else
+ {
+ //
https://jira.jboss.org/browse/HORNETQ-522
+ SessionConsumerFlowCreditMessage packet = new
SessionConsumerFlowCreditMessage(entry.getKey(),
+
1);
+ sendPacketWithoutLock(packet);
+ }
}
- else
+
+ if ((!autoCommitAcks || !autoCommitSends) && workDone)
{
- //https://jira.jboss.org/browse/HORNETQ-522
- SessionConsumerFlowCreditMessage packet = new
SessionConsumerFlowCreditMessage(entry.getKey(),
-
1);
- sendPacketWithoutLock(packet);
+ // Session is transacted - set for rollback only
+ // FIXME - there is a race condition here - a commit could sneak in
before this is set
+ rollbackOnly = true;
}
- }
- if ((!autoCommitAcks || !autoCommitSends) && workDone)
- {
- // Session is transacted - set for rollback only
- // FIXME - there is a race condition here - a commit could sneak in
before this is set
- rollbackOnly = true;
- }
-
- // Now start the session if it was already started
- if (started)
- {
- for (ClientConsumerInternal consumer : consumers.values())
+ // Now start the session if it was already started
+ if (started)
{
- consumer.clearAtFailover();
- consumer.start();
- }
+ for (ClientConsumerInternal consumer : consumers.values())
+ {
+ consumer.clearAtFailover();
+ consumer.start();
+ }
- Packet packet = new PacketImpl(PacketImpl.SESS_START);
+ Packet packet = new PacketImpl(PacketImpl.SESS_START);
- packet.setChannelID(channel.getID());
+ packet.setChannelID(channel.getID());
- Connection conn = channel.getConnection().getTransportConnection();
+ Connection conn = channel.getConnection().getTransportConnection();
- HornetQBuffer buffer = packet.encode(channel.getConnection());
+ HornetQBuffer buffer = packet.encode(channel.getConnection());
- conn.write(buffer, false, false);
+ conn.write(buffer, false, false);
+ }
+
+ resetCreditManager = true;
}
- resetCreditManager = true;
+ channel.returnBlocking();
}
- channel.returnBlocking();
+ channel.setTransferring(false);
}
+ catch (Throwable t)
+ {
+ ClientSessionImpl.log.error("Failed to handle failover", t);
+ }
+ finally
+ {
+ channel.unlock();
+ }
- channel.setTransferring(false);
+ if (resetCreditManager)
+ {
+ producerCreditManager.reset();
+
+ // Also need to send more credits for consumers, otherwise the system could
hand with the server
+ // not having any credits to send
+ }
}
- catch (Throwable t)
+
+ // Resetting the metadata after failover
+ try
{
- ClientSessionImpl.log.error("Failed to handle failover", t);
+ for (Map.Entry<String, String> entries : metadata.entrySet())
+ {
+ addMetaData(entries.getKey(), entries.getValue());
+ }
}
- finally
+ catch (HornetQException e)
{
- channel.unlock();
- }
- if (resetCreditManager)
- {
- producerCreditManager.reset();
+ log.warn("Error on resending metadata: " + metadata, e);
- // Also need to send more credits for consumers, otherwise the system could hand
with the server
- // not having any credits to send
}
}
-
+
+ public void addMetaData(String key, String data) throws HornetQException
+ {
+ metadata.put(key, data);
+ channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
+ }
+
public ClientSessionFactoryInternal getSessionFactory()
{
return sessionFactory;
@@ -1735,8 +1763,6 @@
}
}
-
-
private static class BindingQueryImpl implements BindingQuery
{
@@ -1823,9 +1849,4 @@
}
}
-
- public void addMetaData(String key, String data) throws HornetQException
- {
- channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
- }
}