JBoss hornetq SVN: r10936 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: remoting/impl/invm and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-06 15:19:41 -0400 (Wed, 06 Jul 2011)
New Revision: 10936
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
Fixing a few tests
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-06 15:50:53 UTC (rev 10935)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-06 19:19:41 UTC (rev 10936)
@@ -945,7 +945,10 @@
private void getConnectionWithRetry(final int reconnectAttempts)
{
- log.info("getConnectionWithRetry::" + reconnectAttempts);
+ if (log.isTraceEnabled())
+ {
+ log.trace("getConnectionWithRetry::" + reconnectAttempts + " with retryInterval = " + retryInterval + " multiplier = " + retryIntervalMultiplier, new Exception ("trace"));
+ }
long interval = retryInterval;
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2011-07-06 15:50:53 UTC (rev 10935)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2011-07-06 19:19:41 UTC (rev 10936)
@@ -128,6 +128,8 @@
if (InVMConnector.failOnCreateConnection)
{
InVMConnector.incFailures();
+
+ log.debug("Returning null on InVMConnector for tests");
// For testing only
return null;
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-07-06 15:50:53 UTC (rev 10935)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-07-06 19:19:41 UTC (rev 10936)
@@ -518,8 +518,9 @@
public final void connectionFailed(final HornetQException me, boolean failedOver)
{
- log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, new Exception (me.getMessage()));
+ log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
+
try
{
// csf.cleanup();
@@ -538,12 +539,10 @@
if (me.getCode() == HornetQException.DISCONNECTED)
{
- log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
fail(true);
}
else
{
- log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
fail(false);
scheduleRetryConnect();
}
@@ -612,7 +611,7 @@
/* This is called only when the bridge is activated */
protected void connect()
{
- BridgeImpl.log.info("Connecting " + this + " to its destination [" + nodeUUID.toString() + "], csf=" + this.csf);
+ BridgeImpl.log.debug("Connecting " + this + " to its destination [" + nodeUUID.toString() + "], csf=" + this.csf);
retryCount++;
@@ -763,7 +762,7 @@
if (log.isDebugEnabled())
{
- log.debug("Scheduling retry for bridge " + this.name + "in " + milliseconds + " milliseconds");
+ log.debug("Scheduling retry for bridge " + this.name + " in " + milliseconds + " milliseconds");
}
futureScheduledReconnection = scheduledExecutor.schedule(new FutureConnectRunnable(), milliseconds, TimeUnit.MILLISECONDS);
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-06 15:50:53 UTC (rev 10935)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-06 19:19:41 UTC (rev 10936)
@@ -627,6 +627,10 @@
targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
targetLocator.setClusterConnection(true);
+ targetLocator.setRetryInterval(retryInterval);
+ targetLocator.setMaxRetryInterval(maxRetryInterval);
+ targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
+
targetLocator.setNodeID(serverLocator.getNodeID());
targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-06 15:50:53 UTC (rev 10935)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-06 19:19:41 UTC (rev 10936)
@@ -682,9 +682,9 @@
// We are going to manually retry on the bridge in case of failure
serverLocator.setReconnectAttempts(0);
serverLocator.setInitialConnectAttempts(-1);
-
-
-
+ serverLocator.setRetryInterval(config.getRetryInterval());
+ serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
+ serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
13 years, 8 months
JBoss hornetq SVN: r10935 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-06 11:50:53 -0400 (Wed, 06 Jul 2011)
New Revision: 10935
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Stop sending the CreateReplication msg.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-06 15:49:45 UTC (rev 10934)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-06 15:50:53 UTC (rev 10935)
@@ -35,7 +35,6 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
@@ -314,42 +313,7 @@
}
replicatingChannel.setHandler(responseHandler);
-
- CreateReplicationSessionMessage replicationStartPackage =
- new CreateReplicationSessionMessage(replicatingChannel.getID());
-
- Channel systemChannel = remotingConnection.getChannel(CHANNEL_ID.SESSION.id, -1);
-
- systemChannel.send(replicationStartPackage);
-
- failureListener = new SessionFailureListener()
- {
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
- if (me.getCode() == HornetQException.DISCONNECTED)
- {
- // Backup has shut down - no need to log a stack trace
- ReplicationManagerImpl.log.warn("The backup node has been shut-down, replication will now stop");
- }
- else
- {
- ReplicationManagerImpl.log.warn("Connection to the backup node failed, removing replication now", me);
- }
-
- try
- {
- stop();
- }
- catch (Exception e)
- {
- ReplicationManagerImpl.log.warn(e.getMessage(), e);
- }
- }
-
- public void beforeReconnect(final HornetQException me)
- {
- }
- };
+ failureListener = new ReplicatedSessionFailureListener();
remotingConnection.addFailureListener(failureListener);
started = true;
@@ -486,8 +450,37 @@
// Inner classes -------------------------------------------------
- protected class ResponseHandler implements ChannelHandler
+ private final class ReplicatedSessionFailureListener implements SessionFailureListener
{
+ public void connectionFailed(final HornetQException me, boolean failedOver)
+ {
+ if (me.getCode() == HornetQException.DISCONNECTED)
+ {
+ // Backup has shut down - no need to log a stack trace
+ ReplicationManagerImpl.log.warn("The backup node has been shut-down, replication will now stop");
+ }
+ else
+ {
+ ReplicationManagerImpl.log.warn("Connection to the backup node failed, removing replication now", me);
+ }
+
+ try
+ {
+ stop();
+ }
+ catch (Exception e)
+ {
+ ReplicationManagerImpl.log.warn(e.getMessage(), e);
+ }
+ }
+
+ public void beforeReconnect(final HornetQException me)
+ {
+ }
+ }
+
+ private class ResponseHandler implements ChannelHandler
+ {
/* (non-Javadoc)
* @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
*/
13 years, 8 months
JBoss hornetq SVN: r10934 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl/wireformat and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-06 11:49:45 -0400 (Wed, 06 Jul 2011)
New Revision: 10934
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Clean up
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-06 15:49:01 UTC (rev 10933)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-06 15:49:45 UTC (rev 10934)
@@ -142,9 +142,9 @@
else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
{
NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
-
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), getPair(msg.getConnector(), msg.isBackup()),
- false, true);
+ final boolean backup = msg.isBackup();
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(), getPair(msg.getConnector(), backup), backup,
+ true);
}
else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-06 15:49:01 UTC (rev 10933)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-06 15:49:45 UTC (rev 10934)
@@ -15,7 +15,6 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
-import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_REPLICATION;
import static org.hornetq.core.protocol.core.impl.PacketImpl.REATTACH_SESSION;
import org.hornetq.api.core.HornetQException;
@@ -103,7 +102,7 @@
break;
}
- case CREATE_REPLICATION:
+ case PacketImpl.CREATE_REPLICATION:
{
// Create queue can also be fielded here in the case of a replicated store and forward queue creation
@@ -115,7 +114,7 @@
}
default:
{
- HornetQPacketHandler.log.error("Invalid packet " + packet);
+ log.error("Invalid packet " + packet);
}
}
}
@@ -205,7 +204,7 @@
{
log.error("Failed to create session ", e);
- HornetQPacketHandler.log.error("Failed to create session", e);
+ log.error("Failed to create session", e);
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
@@ -271,7 +270,7 @@
}
catch (Exception e)
{
- HornetQPacketHandler.log.error("Failed to reattach session", e);
+ log.error("Failed to reattach session", e);
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
@@ -291,7 +290,7 @@
}
catch (Exception e)
{
- HornetQPacketHandler.log.error("Failed to handle create queue", e);
+ log.error("Failed to handle create queue", e);
}
}
@@ -311,11 +310,11 @@
}
catch (HornetQException e)
{
- response = new HornetQExceptionMessage(e);
+ response = new HornetQExceptionMessage(e);
}
catch (Exception e)
{
- HornetQPacketHandler.log.warn(e.getMessage(), e);
+ log.warn(e.getMessage(), e);
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java 2011-07-06 15:49:01 UTC (rev 10933)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java 2011-07-06 15:49:45 UTC (rev 10934)
@@ -54,7 +54,7 @@
final boolean temporary,
final boolean requiresResponse)
{
- super(PacketImpl.CREATE_QUEUE);
+ this();
this.address = address;
this.queueName = queueName;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-06 15:49:01 UTC (rev 10933)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-06 15:49:45 UTC (rev 10934)
@@ -126,54 +126,55 @@
public void handlePacket(final Packet packet)
{
PacketImpl response = new ReplicationResponseMessage();
+ final byte type=packet.getType();
try
{
- if (packet.getType() == PacketImpl.REPLICATION_APPEND)
+ if (type == PacketImpl.REPLICATION_APPEND)
{
handleAppendAddRecord((ReplicationAddMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_APPEND_TX)
+ else if (type == PacketImpl.REPLICATION_APPEND_TX)
{
handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_DELETE)
+ else if (type == PacketImpl.REPLICATION_DELETE)
{
handleAppendDelete((ReplicationDeleteMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_DELETE_TX)
+ else if (type == PacketImpl.REPLICATION_DELETE_TX)
{
handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_PREPARE)
+ else if (type == PacketImpl.REPLICATION_PREPARE)
{
handlePrepare((ReplicationPrepareMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
+ else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
{
handleCommitRollback((ReplicationCommitMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_PAGE_WRITE)
+ else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
{
handlePageWrite((ReplicationPageWriteMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_PAGE_EVENT)
+ else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
{
handlePageEvent((ReplicationPageEventMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
{
handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
{
handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
{
handleLargeMessageEnd((ReplicationLargemessageEndMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_COMPARE_DATA)
+ else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
{
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
@@ -183,10 +184,15 @@
ReplicationEndpointImpl.log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
}
}
+ catch (HornetQException e)
+ {
+ log.warn(e.getMessage(), e);
+ response = new HornetQExceptionMessage(e);
+ }
catch (Exception e)
{
- ReplicationEndpointImpl.log.warn(e.getMessage(), e);
- response = new HornetQExceptionMessage((HornetQException)e);
+ log.warn(e.getMessage(), e);
+ throw new RuntimeException(e);
}
channel.send(response);
@@ -616,7 +622,4 @@
{
return this.journals[journalID];
}
-
- // Inner classes -------------------------------------------------
-
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-06 15:49:01 UTC (rev 10933)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-06 15:49:45 UTC (rev 10934)
@@ -144,11 +144,14 @@
private static final Logger log = Logger.getLogger(HornetQServerImpl.class);
- // JMS Topics (which are outside of the scope of the core API) will require a dumb subscription with a dummy-filter at this current version
- // as a way to keep its existence valid and TCK tests
- // That subscription needs an invalid filter, however paging needs to ignore any subscription with this filter.
- // For that reason, this filter needs to be rejected on paging or any other component on the system, and just be ignored for any purpose
- // It's declared here as this filter is considered a global ignore
+ /*
+ * JMS Topics (which are outside of the scope of the core API) will require a dumb subscription
+ * with a dummy-filter at this current version as a way to keep its existence valid and TCK
+ * tests. That subscription needs an invalid filter, however paging needs to ignore any
+ * subscription with this filter. For that reason, this filter needs to be rejected on paging or
+ * any other component on the system, and just be ignored for any purpose It's declared here as
+ * this filter is considered a global ignore
+ */
public static final String GENERIC_IGNORED_FILTER = "__HQX=-1";
// Static
@@ -551,9 +554,7 @@
liveChannel.send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1).setHandler(replicationEndpoint);
- liveServerSessionFactory.getConnection()
- .getChannel(CHANNEL_ID.PING.id, -1)
- .send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
+ liveChannel.send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
started = true;
@@ -1197,15 +1198,6 @@
return connectorsService;
}
- // Public
- // ---------------------------------------------------------------------------------------
-
- // Package protected
- // ----------------------------------------------------------------------------
-
- // Protected
- // ------------------------------------------------------------------------------------
-
/**
* Protected so tests can change this behaviour
* @param backupConnector
@@ -1959,6 +1951,5 @@
replicationManager.start();
journalStorageManager.setReplicator(replicationManager);
- System.out.println("HornetQServerImpl: ReplicationManagerImpl is started & added to JournalStorageManager...");
}
}
13 years, 8 months
JBoss hornetq SVN: r10933 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-06 11:49:01 -0400 (Wed, 06 Jul 2011)
New Revision: 10933
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Add the replicationEndPoint handler to the replication channel
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-06 15:48:32 UTC (rev 10932)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-06 15:49:01 UTC (rev 10933)
@@ -546,11 +546,14 @@
// XXX
throw new RuntimeException("Need to retry...");
}
- log.info("announce backup to live-server (id=" + liveConnectorName + ")");
+ CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
+ Channel liveChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
+ liveChannel.send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
+ liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1).setHandler(replicationEndpoint);
+
liveServerSessionFactory.getConnection()
.getChannel(CHANNEL_ID.PING.id, -1)
.send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
- log.info("backup registered");
started = true;
@@ -1952,8 +1955,6 @@
return;
JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
- System.out.println(HornetQServerImpl.class.getName() + " " + this.getIdentity() +
- ": create a ReplicationManagerImpl");
replicationManager = new ReplicationManagerImpl(rc, executorFactory);
replicationManager.start();
13 years, 8 months
JBoss hornetq SVN: r10932 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-06 11:48:32 -0400 (Wed, 06 Jul 2011)
New Revision: 10932
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
Clean up
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-07-06 10:21:27 UTC (rev 10931)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-07-06 15:48:32 UTC (rev 10932)
@@ -54,7 +54,6 @@
import org.hornetq.core.exception.HornetQXAException;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
@@ -121,7 +120,7 @@
private final Channel channel;
private volatile CoreRemotingConnection remotingConnection;
-
+
private final boolean direct;
public ServerSessionPacketHandler(final ServerSession session,
@@ -135,10 +134,10 @@
this.channel = channel;
this.remotingConnection = channel.getConnection();
-
+
//TODO think of a better way of doing this
Connection conn = remotingConnection.getTransportConnection();
-
+
if (conn instanceof NettyConnection)
{
direct = ((NettyConnection)conn).isDirectDeliver();
@@ -188,7 +187,7 @@
{
return channel;
}
-
+
public void handlePacket(final Packet packet)
{
byte type = packet.getType();
@@ -199,7 +198,7 @@
boolean flush = false;
boolean closeChannel = false;
boolean requiresResponse = false;
-
+
try
{
try
@@ -207,8 +206,8 @@
switch (type)
{
case SESS_CREATECONSUMER:
- {
- SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
+ {
+ SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
requiresResponse = request.isRequiresResponse();
session.createConsumer(request.getID(),
request.getQueueName(),
@@ -217,7 +216,7 @@
if (requiresResponse)
{
// We send back queue information on the queue as a response- this allows the queue to
- // be automaticall recreated on failover
+ // be automatically recreated on failover
response = new SessionQueueQueryResponseMessage(session.executeQueueQuery(request.getQueueName()));
}
@@ -239,7 +238,7 @@
break;
}
case DELETE_QUEUE:
- {
+ {
requiresResponse = true;
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
session.deleteQueue(request.getQueueName());
@@ -507,7 +506,7 @@
{
if (requiresResponse)
{
- response = new HornetQExceptionMessage((HornetQException)e);
+ response = new HornetQExceptionMessage(e);
}
else
{
@@ -586,11 +585,11 @@
channel.close();
}
}
-
+
public void closeListeners()
{
List<CloseListener> listeners = remotingConnection.removeCloseListeners();
-
+
for (CloseListener closeListener: listeners)
{
closeListener.connectionClosed();
@@ -600,7 +599,7 @@
}
}
}
-
+
public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID)
{
// We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
@@ -611,10 +610,10 @@
// might be executed
// before we have transferred the connection, leaving it in a started state
session.setTransferring(true);
-
+
List<CloseListener> closeListeners = remotingConnection.removeCloseListeners();
List<FailureListener> failureListeners = remotingConnection.removeFailureListeners();
-
+
// Note. We do not destroy the replicating connection here. In the case the live server has really crashed
// then the connection will get cleaned up anyway when the server ping timeout kicks in.
// In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-06 10:21:27 UTC (rev 10931)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-06 15:48:32 UTC (rev 10932)
@@ -149,8 +149,6 @@
else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
{
HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
- System.out.println("HA_BACKUP_REGISTRATION: " + msg + " connector=" + msg.getConnector());
- System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
try
{
server.addHaBackup(rc);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-06 10:21:27 UTC (rev 10931)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-06 15:48:32 UTC (rev 10932)
@@ -309,17 +309,14 @@
response = new NullResponseMessage();
}
+ catch (HornetQException e)
+ {
+ response = new HornetQExceptionMessage(e);
+ }
catch (Exception e)
{
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- HornetQPacketHandler.log.warn(e.getMessage(), e);
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
+ HornetQPacketHandler.log.warn(e.getMessage(), e);
+ response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
channel1.send(response);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java 2011-07-06 10:21:27 UTC (rev 10931)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java 2011-07-06 15:48:32 UTC (rev 10932)
@@ -40,7 +40,7 @@
private boolean durable;
private boolean temporary;
-
+
private boolean requiresResponse;
// Static --------------------------------------------------------
@@ -108,7 +108,7 @@
{
return temporary;
}
-
+
public boolean isRequiresResponse()
{
return requiresResponse;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-06 10:21:27 UTC (rev 10931)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-06 15:48:32 UTC (rev 10932)
@@ -76,7 +76,7 @@
private final HornetQServer server;
private Channel channel;
-
+
private Journal[] journals;
private JournalStorageManager storage;
@@ -99,14 +99,14 @@
}
// Public --------------------------------------------------------
-
+
public void registerJournal(final byte id, final Journal journal)
{
if (journals == null || id >= journals.length)
{
Journal[] oldJournals = journals;
journals = new Journal[id + 1];
-
+
if (oldJournals != null)
{
for (int i = 0 ; i < oldJournals.length; i++)
@@ -115,11 +115,11 @@
}
}
}
-
+
journals[id] = journal;
}
-
- /*
+
+ /*
* (non-Javadoc)
* @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
*/
@@ -266,7 +266,7 @@
}
largeMessages.clear();
-
+
pageManager.stop();
}
13 years, 8 months
JBoss hornetq SVN: r10931 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-06 06:21:27 -0400 (Wed, 06 Jul 2011)
New Revision: 10931
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
clean up
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-06 10:20:45 UTC (rev 10930)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-06 10:21:27 UTC (rev 10931)
@@ -57,6 +57,7 @@
public class FailoverTest extends FailoverTestBase
{
private static final Logger log = Logger.getLogger(FailoverTest.class);
+ private static final int NUM_MESSAGES = 100;
private ServerLocator locator;
private ClientSessionFactoryInternal sf;
@@ -145,9 +146,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session.createMessage(true);
@@ -187,7 +186,7 @@
crash(session);
int retry = 0;
- while (received.size() >= numMessages)
+ while (received.size() >= NUM_MESSAGES)
{
Thread.sleep(1000);
retry++;
@@ -199,19 +198,13 @@
System.out.println("received.size() = " + received.size());
session.close();
- sf.close();
-
Assert.assertTrue(retry <= 5);
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testNonTransacted() throws Exception
{
-
createSessionFactory();
ClientSession session = createSession(sf, true, true);
@@ -220,50 +213,19 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < numMessages; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
private void createSessionFactory() throws Exception
@@ -348,11 +310,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
// https://jira.jboss.org/jira/browse/HORNETQ-285
@@ -374,9 +332,9 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
- for (int i = 0; i < numMessages; i++)
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session.createMessage(true);
@@ -391,26 +349,11 @@
session.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testTransactedMessagesSentSoRollback() throws Exception
@@ -423,19 +366,8 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
Assert.assertTrue(session.isRollbackOnly());
@@ -461,11 +393,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
/**
@@ -482,19 +410,8 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
Assert.assertTrue(session.isRollbackOnly());
@@ -532,11 +449,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testTransactedMessagesNotSentSoNoRollback() throws Exception
@@ -549,19 +462,8 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.commit();
crash(session);
@@ -576,35 +478,15 @@
session.start();
- for (int i = 0; i < numMessages; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
Assert.assertNull(consumer.receiveImmediate());
session.commit();
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
@@ -622,19 +504,8 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
// messages will be delivered to the consumer when the session is committed
session.commit();
@@ -652,35 +523,15 @@
session.start();
- for (int i = 0; i < numMessages; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
Assert.assertNull(consumer.receiveImmediate());
session.commit();
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testTransactedMessagesConsumedSoRollback() throws Exception
@@ -693,19 +544,8 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session1, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, false, false);
@@ -714,19 +554,8 @@
session2.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session2);
Assert.assertTrue(session2.isRollbackOnly());
@@ -746,11 +575,7 @@
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
@@ -763,9 +588,9 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
- for (int i = 0; i < numMessages; i++)
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session1.createMessage(true);
@@ -784,7 +609,7 @@
session2.start();
- for (int i = 0; i < numMessages / 2; i++)
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -807,7 +632,7 @@
consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
- for (int i = numMessages / 2; i < numMessages; i++)
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -828,11 +653,7 @@
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testXAMessagesSentSoRollbackOnEnd() throws Exception
@@ -847,21 +668,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessages(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
try
@@ -885,11 +697,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
@@ -904,21 +712,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessages(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
crash(session);
@@ -944,11 +743,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
// This might happen if 1PC optimisation kicks in
@@ -964,21 +759,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessages(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
crash(session);
@@ -1004,11 +790,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testXAMessagesNotSentSoNoRollbackOnCommit() throws Exception
@@ -1023,21 +805,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessages(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
@@ -1054,24 +827,8 @@
session.start(xid2, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
session.end(xid2, XAResource.TMSUCCESS);
session.prepare(xid2);
@@ -1080,11 +837,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
@@ -1097,19 +850,8 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session1, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1122,19 +864,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session2);
try
@@ -1152,11 +883,7 @@
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
@@ -1169,19 +896,8 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session1, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1194,19 +910,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session2.end(xid, XAResource.TMSUCCESS);
crash(session2);
@@ -1226,11 +931,7 @@
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
// 1PC optimisation
@@ -1243,19 +944,8 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session1, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1268,19 +958,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session2.end(xid, XAResource.TMSUCCESS);
// session2.prepare(xid);
@@ -1303,11 +982,7 @@
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testCreateNewFactoryAfterFailover() throws Exception
@@ -1331,11 +1006,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testFailoverMultipleSessionsWithConsumers() throws Exception
@@ -1372,9 +1043,9 @@
ClientProducer producer = sendSession.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
- for (int i = 0; i < numMessages; i++)
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = sendSession.createMessage(true);
@@ -1399,18 +1070,7 @@
{
for (ClientConsumer consumer : consumerList)
{
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
+ receiveMessages(consumer);
}
}
@@ -1421,11 +1081,7 @@
sendSession.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
/*
@@ -1440,24 +1096,13 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true);
session.start();
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -1470,31 +1115,22 @@
crash(session);
- for (int i = 0; i < numMessages; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
session.close();
- sf.close();
+ closeSessionFactory();
+ }
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private void sendMessages(ClientSession session, ClientProducer producer) throws Exception, HornetQException
+ {
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage message = session.createMessage(isDurable(i));
+ setBody(i, message);
+ message.putIntProperty("counter", i);
+ producer.send(message);
+ }
}
public void testFailThenReceiveMoreMessagesAfterFailover() throws Exception
@@ -1507,24 +1143,13 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -1539,31 +1164,33 @@
// Should get the same ones after failover since we didn't ack
- for (int i = 0; i < numMessages; i++)
+ receiveDurableMessages(consumer);
+
+ session.close();
+
+ closeSessionFactory();
+ }
+
+ private void receiveDurableMessages(ClientConsumer consumer) throws HornetQException
+ {
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
// Only the persistent messages will survive
- if (i % 2 == 0)
+ if (isDurable(i))
{
ClientMessage message = consumer.receive(1000);
-
Assert.assertNotNull(message);
-
assertMessageBody(i, message);
-
Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
message.acknowledge();
}
}
+ }
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private boolean isDurable(int i)
+ {
+ return i % 2 == 0;
}
public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception
@@ -1580,43 +1207,21 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session);
// Send some more
- for (int i = numMessages; i < numMessages * 2; i++)
+ for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
{
- ClientMessage message = session.createMessage(i % 2 == 0);
+ ClientMessage message = session.createMessage(isDurable(i));
setBody(i, message);
@@ -1627,7 +1232,7 @@
// Should get the same ones after failover since we didn't ack
- for (int i = numMessages; i < numMessages * 2; i++)
+ for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -1642,11 +1247,19 @@
session.close();
- sf.close();
+ closeSessionFactory();
+ }
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private void receiveMessages(ClientConsumer consumer) throws HornetQException
+ {
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ assertMessageBody(i, message);
+ Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+ message.acknowledge();
+ }
}
public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
@@ -1690,45 +1303,19 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
crash(session);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessages(session, producer);
- setBody(i, message);
+ receiveMessages(consumer);
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void _testForceBlockingReturn() throws Exception
@@ -1787,11 +1374,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
@@ -1807,13 +1390,13 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final int numMessages = 100;
+
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
String txID = "my-tx-id";
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session.createMessage(true);
@@ -1871,8 +1454,8 @@
Committer committer = new Committer();
- // Commit will occur, but response will never get back, connetion is failed, and commit should be unblocked
- // with transaction rolled back
+ // Commit will occur, but response will never get back, connection is failed, and commit
+ // should be unblocked with transaction rolled back
committer.start();
@@ -1896,7 +1479,7 @@
// We now try and resend the messages since we get a transaction rolled back exception
// but the commit actually succeeded, duplicate detection should kick in and prevent dups
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session2.createMessage(true);
@@ -1926,29 +1509,21 @@
session2.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
session2.close();
- sf.close();
+ closeSessionFactory();
+ }
+ private void closeSessionFactory()
+ {
+ sf.close();
Assert.assertEquals(0, sf.numSessions());
-
Assert.assertEquals(0, sf.numConnections());
}
@@ -1964,11 +1539,9 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- final int numMessages = 100;
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session.createMessage(true);
@@ -2036,7 +1609,7 @@
// We now try and resend the messages since we get a transaction rolled back exception
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session2.createMessage(true);
@@ -2053,30 +1626,15 @@
session2.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testBackupServerNotRemoved() throws Exception
@@ -2126,11 +1684,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testLiveAndBackupLiveComesBack() throws Exception
@@ -2182,11 +1736,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
@@ -2254,11 +1804,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
@@ -2326,11 +1872,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
// Package protected ---------------------------------------------
13 years, 8 months
JBoss hornetq SVN: r10930 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-06 06:20:45 -0400 (Wed, 06 Jul 2011)
New Revision: 10930
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
Log:
Give a shorter name to the new method (it is private anyway)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-06 04:55:20 UTC (rev 10929)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-06 10:20:45 UTC (rev 10930)
@@ -143,8 +143,7 @@
{
NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
- server.getClusterManager().notifyNodeUp(msg.getNodeID(),
- getPairForNotification(msg.getConnector(), msg.isBackup()),
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(), getPair(msg.getConnector(), msg.isBackup()),
false, true);
}
else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
@@ -162,14 +161,12 @@
e.printStackTrace();
throw new RuntimeException(e);
}
- server.getClusterManager().notifyNodeUp(msg.getNodeID(),
- getPairForNotification(msg.getConnector(), true), true, true);
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(), getPair(msg.getConnector(), true), true, true);
}
}
- private
- Pair<TransportConfiguration, TransportConfiguration>
- getPairForNotification(TransportConfiguration conn, boolean isBackup)
+ private Pair<TransportConfiguration, TransportConfiguration> getPair(TransportConfiguration conn,
+ boolean isBackup)
{
if (isBackup)
{
13 years, 8 months
JBoss hornetq SVN: r10929 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-06 00:55:20 -0400 (Wed, 06 Jul 2011)
New Revision: 10929
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-07-06 04:50:47 UTC (rev 10928)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-07-06 04:55:20 UTC (rev 10929)
@@ -147,31 +147,32 @@
}
}
});
+
+ if (flush)
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ executor.execute(new Runnable(){
+ public void run()
+ {
+ latch.countDown();
+ }
+ });
+
+ try
+ {
+ latch.await(10, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ log.debug(e.getMessage(), e);
+ }
+ }
}
catch (RejectedExecutionException e)
{
// Ignore - this can happen if server/client is shutdown and another request comes in
}
- if (flush)
- {
- final CountDownLatch latch = new CountDownLatch(1);
- executor.execute(new Runnable(){
- public void run()
- {
- latch.countDown();
- }
- });
-
- try
- {
- latch.await(10, TimeUnit.SECONDS);
- }
- catch (InterruptedException e)
- {
- log.debug(e.getMessage(), e);
- }
- }
}
public String getRemoteAddress()
13 years, 8 months
JBoss hornetq SVN: r10928 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-06 00:50:47 -0400 (Wed, 06 Jul 2011)
New Revision: 10928
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-06 04:45:46 UTC (rev 10927)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-06 04:50:47 UTC (rev 10928)
@@ -368,7 +368,7 @@
{
// It has to use the same executor as the disconnect message is being sent through
- final HornetQException ex = new HornetQException(HornetQException.DISCONNECTED, "Channel disconnected");
+ final HornetQException ex = new HornetQException(HornetQException.NOT_CONNECTED, "Channel disconnected");
closeExecutor.execute(new Runnable()
{
13 years, 8 months
JBoss hornetq SVN: r10927 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: remoting/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-06 00:45:46 -0400 (Wed, 06 Jul 2011)
New Revision: 10927
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
changes on my branch (fixing test)
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-07-05 16:59:31 UTC (rev 10926)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-07-06 04:45:46 UTC (rev 10927)
@@ -12,8 +12,10 @@
*/
package org.hornetq.core.remoting.impl.invm;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -139,7 +141,7 @@
}
catch (Exception e)
{
- final String msg = "Failed to write to handler";
+ final String msg = "Failed to write to handler on connector " + this;
InVMConnection.log.error(msg, e);
throw new IllegalStateException(msg, e);
}
@@ -150,6 +152,26 @@
{
// Ignore - this can happen if server/client is shutdown and another request comes in
}
+
+ if (flush)
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ executor.execute(new Runnable(){
+ public void run()
+ {
+ latch.countDown();
+ }
+ });
+
+ try
+ {
+ latch.await(10, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ log.debug(e.getMessage(), e);
+ }
+ }
}
public String getRemoteAddress()
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-07-05 16:59:31 UTC (rev 10926)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-07-06 04:45:46 UTC (rev 10927)
@@ -64,6 +64,8 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(RemotingServiceImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
public static final long CONNECTION_TTL_CHECK_INTERVAL = 2000;
@@ -369,6 +371,11 @@
ConnectionEntry entry = pmgr.createConnectionEntry(connection);
+ if (isTrace)
+ {
+ log.trace("Connection created " + connection);
+ }
+
connections.put(connection.getID(), entry);
if (config.isBackup())
@@ -379,6 +386,12 @@
public void connectionDestroyed(final Object connectionID)
{
+
+ if (isTrace)
+ {
+ log.trace("Connection removed " + connectionID, new Exception ("trace"));
+ }
+
ConnectionEntry conn = connections.get(connectionID);
if (conn != null)
@@ -458,6 +471,13 @@
{
conn.connection.bufferReceived(connectionID, buffer);
}
+ else
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("ConnectionID = " + connectionID + " was already closed, so ignoring packet");
+ }
+ }
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-07-05 16:59:31 UTC (rev 10926)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-07-06 04:45:46 UTC (rev 10927)
@@ -518,7 +518,7 @@
public final void connectionFailed(final HornetQException me, boolean failedOver)
{
- log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
+ log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, new Exception (me.getMessage()));
try
{
@@ -777,6 +777,8 @@
{
try
{
+ log.debug("stopping bridge " + BridgeImpl.this);
+
if (session != null)
{
log.debug("Cleaning up session " + session);
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-05 16:59:31 UTC (rev 10926)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-06 04:45:46 UTC (rev 10927)
@@ -594,7 +594,7 @@
if (log.isDebugEnabled())
{
- log.debug("PORRA creating record between " + this.connector + " and " + connector + bridge);
+ log.debug("creating record between " + this.connector + " and " + connector + bridge);
}
record.setBridge(bridge);
13 years, 8 months