JBoss hornetq SVN: r11186 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-11 12:56:03 -0400 (Thu, 11 Aug 2011)
New Revision: 11186
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
Log:
Dealing with a dead lock that happened on the testsuite
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-08-10 17:30:15 UTC (rev 11185)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-08-11 16:56:03 UTC (rev 11186)
@@ -400,7 +400,7 @@
{
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<Map<String, Object>>();
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
LinkedListIterator<MessageReference> iterator = queue.iterator();
try
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-08-10 17:30:15 UTC (rev 11185)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-08-11 16:56:03 UTC (rev 11186)
@@ -167,7 +167,7 @@
void resetAllIterators();
- boolean blockOnExecutorFuture();
+ boolean flushExecutor();
void close() throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-08-10 17:30:15 UTC (rev 11185)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-08-11 16:56:03 UTC (rev 11186)
@@ -383,7 +383,7 @@
{
// We must block on the executor to ensure any async deliveries have completed or we might get out of order
// deliveries
- if (blockOnExecutorFuture())
+ if (flushExecutor())
{
// Go into direct delivery mode
directDeliver = true;
@@ -443,7 +443,20 @@
checkQueueSizeFuture.cancel(false);
}
- cancelRedistributor();
+ getExecutor().execute(new Runnable(){
+ public void run()
+ {
+ try
+ {
+ cancelRedistributor();
+ }
+ catch (Exception e)
+ {
+ // nothing that could be done anyway.. just logging
+ log.warn(e.getMessage(), e);
+ }
+ }
+ });
}
public Executor getExecutor()
@@ -464,14 +477,14 @@
{
deliverAsync();
- blockOnExecutorFuture();
+ flushExecutor();
}
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
Future future = new Future();
- executor.execute(future);
+ getExecutor().execute(future);
boolean ok = future.await(10000);
@@ -1137,28 +1150,43 @@
}
}
- public synchronized void expireReferences() throws Exception
+ public void expireReferences() throws Exception
{
- LinkedListIterator<MessageReference> iter = iterator();
-
- try
- {
- while (iter.hasNext())
+ getExecutor().execute(new Runnable(){
+ public void run()
{
- MessageReference ref = iter.next();
- if (ref.getMessage().isExpired())
+ synchronized (QueueImpl.this)
{
- deliveringCount.incrementAndGet();
- expire(ref);
- iter.remove();
- refRemoved(ref);
+ LinkedListIterator<MessageReference> iter = iterator();
+
+ try
+ {
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ try
+ {
+ if (ref.getMessage().isExpired())
+ {
+ deliveringCount.incrementAndGet();
+ expire(ref);
+ iter.remove();
+ refRemoved(ref);
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Error expiring reference " + ref, e);
+ }
+ }
+ }
+ finally
+ {
+ iter.close();
+ }
}
}
- }
- finally
- {
- iter.close();
- }
+ });
}
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-08-10 17:30:15 UTC (rev 11185)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-08-11 16:56:03 UTC (rev 11186)
@@ -134,7 +134,7 @@
prod.send(msg);
}
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
//Consumer is not started so should go queued
assertFalse(queue.isDirectDeliver());
@@ -157,7 +157,7 @@
prod.send(msg);
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
assertTrue(queue.isDirectDeliver());
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-08-10 17:30:15 UTC (rev 11185)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-08-11 16:56:03 UTC (rev 11186)
@@ -82,7 +82,7 @@
}
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
return true;
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-08-10 17:30:15 UTC (rev 11185)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-08-11 16:56:03 UTC (rev 11186)
@@ -617,7 +617,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#blockOnExecutorFuture()
*/
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
// TODO Auto-generated method stub
return false;
13 years, 5 months
JBoss hornetq SVN: r11185 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: remoting/server/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-10 13:30:15 -0400 (Wed, 10 Aug 2011)
New Revision: 11185
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
tweaks on my latest changes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-10 16:57:38 UTC (rev 11184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-10 17:30:15 UTC (rev 11185)
@@ -49,8 +49,6 @@
* */
private volatile Object owner;
- private volatile Executor executor;
-
public Topology(final Object owner)
{
this.owner = owner;
@@ -66,11 +64,6 @@
*/
private final Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
- public void setExecutor(Executor executor)
- {
- this.executor = executor;
- }
-
public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
if (log.isDebugEnabled())
@@ -95,7 +88,7 @@
}
}
- public boolean addMember(final String nodeId, final TopologyMember member, final boolean last)
+ public boolean addMember(final String nodeId, final TopologyMember member, final boolean last)
{
boolean replaced = false;
@@ -157,8 +150,7 @@
if (Topology.log.isDebugEnabled())
{
- Topology.log.debug(this +
- " Add member nodeId=" +
+ Topology.log.debug(this + " Add member nodeId=" +
nodeId +
" member = " +
member +
@@ -169,35 +161,28 @@
}
}
-
+
if (replaced)
{
final ArrayList<ClusterTopologyListener> copy = copyListeners();
-
// Has to use a different thread otherwise we may get dead locks case the remove is coming from the channel
- execute(new Runnable(){
- public void run()
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
{
- for (ClusterTopologyListener listener : copy)
- {
- if (Topology.log.isTraceEnabled())
- {
- Topology.log.trace(this + " informing " + listener + " about node up = " + nodeId);
- }
+ Topology.log.trace(this + " informing " + listener + " about node up = " + nodeId);
+ }
- try
- {
- listener.nodeUP(nodeId, member.getConnector(), last);
- }
- catch (Throwable e)
- {
- log.warn (e.getMessage(), e);
- }
- }
+ try
+ {
+ listener.nodeUP(nodeId, member.getConnector(), last);
}
- });
-
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
}
return replaced;
@@ -208,7 +193,7 @@
*/
private ArrayList<ClusterTopologyListener> copyListeners()
{
- ArrayList <ClusterTopologyListener> listenersCopy;
+ ArrayList<ClusterTopologyListener> listenersCopy;
synchronized (topologyListeners)
{
listenersCopy = new ArrayList<ClusterTopologyListener>(topologyListeners);
@@ -219,12 +204,11 @@
public boolean removeMember(final String nodeId)
{
TopologyMember member;
-
+
synchronized (this)
{
member = topology.remove(nodeId);
}
-
if (Topology.log.isDebugEnabled())
{
@@ -241,20 +225,14 @@
{
final ArrayList<ClusterTopologyListener> copy = copyListeners();
- // Has to use a different thread otherwise we may get dead locks case the remove is coming from the channel
- execute(new Runnable(){
- public void run()
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
{
- for (ClusterTopologyListener listener : copy)
- {
- if (Topology.log.isTraceEnabled())
- {
- Topology.log.trace(this + " informing " + listener + " about node down = " + nodeId);
- }
- listener.nodeDown(nodeId);
- }
+ Topology.log.trace(this + " informing " + listener + " about node down = " + nodeId);
}
- });
+ listener.nodeDown(nodeId);
+ }
}
return member != null;
}
@@ -268,7 +246,7 @@
{
// To make sure it was updated
addMember(nodeID, member, false);
-
+
ArrayList<ClusterTopologyListener> copy = copyListeners();
// Now force sending it
@@ -394,18 +372,6 @@
}
return null;
}
-
- private void execute(Runnable runnable)
- {
- if (executor != null)
- {
- executor.execute(runnable);
- }
- else
- {
- runnable.run();
- }
- }
/* (non-Javadoc)
* @see java.lang.Object#toString()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-08-10 16:57:38 UTC (rev 11184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-08-10 17:30:15 UTC (rev 11185)
@@ -162,7 +162,7 @@
// This needs to be a different thread pool to the main thread pool especially for OIO where we may need
// to support many hundreds of connections, but the main thread pool must be kept small for better performance
- ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads" + System.identityHashCode(this),
+ ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads-" + server.toString() + "-" + System.identityHashCode(this),
false,
tccl);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-10 16:57:38 UTC (rev 11184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-10 17:30:15 UTC (rev 11185)
@@ -95,7 +95,7 @@
private final long connectionTTL;
private final long retryInterval;
-
+
private final long callTimeout;
private final double retryIntervalMultiplier;
@@ -107,7 +107,7 @@
private final boolean useDuplicateDetection;
private final boolean routeWhenNoConsumers;
-
+
private final int confirmationWindowSize;
private final Map<String, MessageFlowRecord> records = new ConcurrentHashMap<String, MessageFlowRecord>();
@@ -195,7 +195,7 @@
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
-
+
this.confirmationWindowSize = confirmationWindowSize;
this.executorFactory = executorFactory;
@@ -221,7 +221,7 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
this.manager = manager;
-
+
this.callTimeout = callTimeout;
this.clusterManagerTopology = clusterManagerTopology;
@@ -293,7 +293,7 @@
this.maxRetryInterval = maxRetryInterval;
this.reconnectAttempts = reconnectAttempts;
-
+
this.callTimeout = callTimeout;
this.useDuplicateDetection = useDuplicateDetection;
@@ -353,7 +353,7 @@
{
return;
}
-
+
if (log.isDebugEnabled())
{
log.debug(this + "::stopping ClusterConnection");
@@ -382,32 +382,32 @@
{
}
}
+ }
- if (managementService != null)
- {
- TypedProperties props = new TypedProperties();
- props.putSimpleStringProperty(new SimpleString("name"), name);
- Notification notification = new Notification(nodeUUID.toString(),
- NotificationType.CLUSTER_CONNECTION_STOPPED,
- props);
- managementService.sendNotification(notification);
- }
+ if (managementService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), name);
+ Notification notification = new Notification(nodeUUID.toString(),
+ NotificationType.CLUSTER_CONNECTION_STOPPED,
+ props);
+ managementService.sendNotification(notification);
+ }
- executor.execute(new Runnable()
+ executor.execute(new Runnable()
+ {
+ public void run()
{
- public void run()
+ if (serverLocator != null)
{
- if (serverLocator != null)
- {
- serverLocator.close();
- serverLocator = null;
- }
-
+ serverLocator.close();
+ serverLocator = null;
}
- });
- started = false;
- }
+ }
+ });
+
+ started = false;
}
public boolean isStarted()
@@ -636,8 +636,7 @@
{
if (isTrace)
{
- log.trace(this +
- " ignored nodeUp record for " +
+ log.trace(this + " ignored nodeUp record for " +
connectorPair +
" on nodeID=" +
nodeID +
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-10 16:57:38 UTC (rev 11184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-08-10 17:30:15 UTC (rev 11185)
@@ -378,8 +378,6 @@
backup = false;
String nodeID = server.getNodeID().toString();
-
- topology.setExecutor(executor);
TopologyMember member = topology.getMember(nodeID);
//swap backup as live and send it to everybody
13 years, 5 months
JBoss hornetq SVN: r11184 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-10 12:57:38 -0400 (Wed, 10 Aug 2011)
New Revision: 11184
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
Log:
another possible deadlock from the testsuite after my last changes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-10 16:10:06 UTC (rev 11183)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-10 16:57:38 UTC (rev 11184)
@@ -118,7 +118,7 @@
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- Executor executor = server.getThreadPool();
+ Executor executor = server.getExecutorFactory().getExecutor();
public void nodeUP(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
13 years, 5 months
JBoss hornetq SVN: r11183 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-08-10 12:10:06 -0400 (Wed, 10 Aug 2011)
New Revision: 11183
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
Log:
fixing test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-10 15:39:03 UTC (rev 11182)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-08-10 16:10:06 UTC (rev 11183)
@@ -67,11 +67,18 @@
createLiveConfig(nodeManager2, 3, 0);
createBackupConfig(nodeManager2, 3, 4, true, new int[] {3, 5}, 0, 1, 2);
createBackupConfig(nodeManager2, 3, 5, true, new int[] {3, 4}, 0, 1, 2);
+
+ Thread.sleep(500);
servers.get(0).start();
+ Thread.sleep(500);
servers.get(3).start();
+ Thread.sleep(500);
servers.get(1).start();
+ Thread.sleep(500);
servers.get(2).start();
+ Thread.sleep(500);
servers.get(4).start();
+ Thread.sleep(500);
servers.get(5).start();
ServerLocator locator = getServerLocator(0);
13 years, 5 months
JBoss hornetq SVN: r11182 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-10 11:39:03 -0400 (Wed, 10 Aug 2011)
New Revision: 11182
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Fix delay cancelation, fixes FailoverTest.testFailoverOnInitialConnection()
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-08-10 15:05:24 UTC (rev 11181)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-08-10 15:39:03 UTC (rev 11182)
@@ -82,6 +82,7 @@
private Packet onHold;
private Channel channel;
public volatile boolean deliver;
+ private boolean receivedUpToDate;
private boolean mustHold = true;
public void addSubHandler(ReplicationEndpoint handler)
@@ -91,6 +92,10 @@
public synchronized void deliver()
{
+ deliver = true;
+ if (!receivedUpToDate)
+ return;
+
if (onHold == null)
{
throw new NullPointerException("Don't have the 'sync is done' packet to deliver");
@@ -131,8 +136,9 @@
if (packet.getType() == PacketImpl.REPLICATION_SYNC && mustHold)
{
ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
- if (syncMsg.isUpToDate())
+ if (syncMsg.isUpToDate() && !deliver)
{
+ receivedUpToDate = true;
assert onHold == null;
onHold = packet;
PacketImpl response = new ReplicationResponseMessage();
13 years, 5 months
JBoss hornetq SVN: r11181 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/server/impl and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-10 11:05:24 -0400 (Wed, 10 Aug 2011)
New Revision: 11181
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
HORNETQ-720 Run "FailoverTest" with delayed sync & implement more FileWrapperJournal calls.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-10 15:05:24 UTC (rev 11181)
@@ -351,7 +351,7 @@
* @param replicationManager
* @throws HornetQException
*/
- public void setReplicator(ReplicationManager replicationManager) throws Exception
+ public void startReplication(ReplicationManager replicationManager) throws Exception
{
if (!started)
{
@@ -375,7 +375,6 @@
try
{
- // XXX HORNETQ-720 WRITE LOCK the StorageManager.
storageManagerLock.writeLock().lock();
try
{
@@ -398,7 +397,6 @@
}
finally
{
- // XXX HORNETQ-720 UNLOCK StorageManager...
storageManagerLock.writeLock().unlock();
}
sendJournalFile(messageFiles, JournalContent.MESSAGES);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-10 15:05:24 UTC (rev 11181)
@@ -2029,7 +2029,7 @@
replicationManager = new ReplicationManagerImpl(rc, executorFactory);
replicationManager.start();
- journalStorageManager.setReplicator(replicationManager);
+ journalStorageManager.startReplication(replicationManager);
}
/**
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-10 15:05:24 UTC (rev 11181)
@@ -13,6 +13,8 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
@@ -128,7 +130,8 @@
public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
throws Exception
{
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+ writeRecord(addRecord, false, null);
}
@Override
@@ -144,21 +147,24 @@
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
throws Exception
{
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
+ writeRecord(updateRecordTX, false, null);
}
@Override
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext)
throws Exception
{
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, txID, null);
+ writeRecord(commitRecord, sync, callback);
}
@Override
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback)
throws Exception
{
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(false, txID, transactionData);
+ writeRecord(prepareRecord, sync, callback);
}
@Override
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-10 15:05:24 UTC (rev 11181)
@@ -1027,7 +1027,6 @@
private void setJournalState(JournalState newState)
{
- // log.info(this + " state=" + newState);
state = newState;
}
@@ -1750,7 +1749,6 @@
private synchronized JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions,
final boolean replicationSync) throws Exception
{
- System.out.println("LOAD! " + state + " " + replicationSync);
if (state == JournalState.STOPPED || state == JournalState.LOADED)
{
throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-10 15:05:24 UTC (rev 11181)
@@ -2,10 +2,8 @@
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.locks.Lock;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientProducer;
@@ -15,16 +13,7 @@
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.ChannelHandler;
-import org.hornetq.core.protocol.core.CommandConfirmationHandler;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
-import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
-import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.TransportConfigurationUtils;
@@ -35,7 +24,7 @@
private ClientSessionFactoryInternal sessionFactory;
private ClientSession session;
private ClientProducer producer;
- private ReplicationChannelHandler handler;
+ private BackupSyncDelay syncDelay;
private static final int N_MSGS = 100;
@Override
@@ -48,8 +37,7 @@
locator.setBlockOnDurableSend(true);
locator.setReconnectAttempts(-1);
sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
- handler = new ReplicationChannelHandler();
- liveServer.addInterceptor(new BackupSyncDelay(handler));
+ syncDelay = new BackupSyncDelay(backupServer, liveServer);
}
public void testNodeID() throws Exception
@@ -100,9 +88,7 @@
private void finishSyncAndFailover() throws Exception
{
- handler.deliver = true;
- // must send one more message to have the "SYNC is DONE" msg delivered.
- sendMessages(session, producer, 1);
+ syncDelay.deliverUpToDateMsg();
waitForBackup(sessionFactory, 10, true);
assertFalse("should not be initialized", backupServer.getServer().isInitialised());
crash(session);
@@ -127,7 +113,7 @@
private void startBackupCrashLive() throws Exception
{
assertFalse("backup is started?", backupServer.isStarted());
- handler.setHold(false);
+ liveServer.removeInterceptor(syncDelay);
backupServer.start();
waitForBackup(sessionFactory, 5);
crash(session);
@@ -218,260 +204,5 @@
return TransportConfigurationUtils.getInVMConnector(live);
}
- private class BackupSyncDelay implements Interceptor
- {
- private final ReplicationChannelHandler handler;
-
- public BackupSyncDelay(ReplicationChannelHandler handler)
- {
- this.handler = handler;
- }
-
- @Override
- public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
- {
- if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
- {
- try
- {
- ReplicationEndpoint repEnd = backupServer.getServer().getReplicationEndpoint();
- handler.addSubHandler(repEnd);
- Channel repChannel = repEnd.getChannel();
- repChannel.setHandler(handler);
- handler.setChannel(repChannel);
- liveServer.removeInterceptor(this);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
- return true;
- }
-
- }
-
- private static class ReplicationChannelHandler implements ChannelHandler
- {
-
- private ReplicationEndpoint handler;
- private Packet onHold;
- private Channel channel;
- public volatile boolean deliver;
- private boolean mustHold = true;
-
- public void addSubHandler(ReplicationEndpoint handler)
- {
- this.handler = handler;
- }
-
- public void setChannel(Channel channel)
- {
- this.channel = channel;
- }
-
- public void setHold(boolean hold)
- {
- mustHold = hold;
- }
-
- @Override
- public void handlePacket(Packet packet)
- {
-
- if (onHold != null && deliver)
- {
- // Use wrapper to avoid sending a response
- ChannelWrapper wrapper = new ChannelWrapper(channel);
- handler.setChannel(wrapper);
- try
- {
- handler.handlePacket(onHold);
- }
- finally
- {
- handler.setChannel(channel);
- onHold = null;
- }
- }
-
- if (packet.getType() == PacketImpl.REPLICATION_SYNC && mustHold)
- {
- ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
- if (syncMsg.isUpToDate())
- {
- assert onHold == null;
- onHold = packet;
- PacketImpl response = new ReplicationResponseMessage();
- channel.send(response);
- return;
- }
- }
-
- handler.handlePacket(packet);
- }
-
- }
-
- private static class ChannelWrapper implements Channel
- {
-
- private final Channel channel;
-
- /**
- * @param connection
- * @param id
- * @param confWindowSize
- */
- public ChannelWrapper(Channel channel)
- {
- this.channel = channel;
- }
-
- @Override
- public String toString()
- {
- return "ChannelWrapper(" + channel + ")";
- }
-
- @Override
- public long getID()
- {
- return channel.getID();
- }
-
- @Override
- public void send(Packet packet)
- {
- // no-op
- // channel.send(packet);
- }
-
- @Override
- public void sendBatched(Packet packet)
- {
- throw new UnsupportedOperationException();
-
- }
-
- @Override
- public void sendAndFlush(Packet packet)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Packet sendBlocking(Packet packet) throws HornetQException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setHandler(ChannelHandler handler)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void transferConnection(CoreRemotingConnection newConnection)
- {
- throw new UnsupportedOperationException();
-
- }
-
- @Override
- public void replayCommands(int lastConfirmedCommandID)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getLastConfirmedCommandID()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void lock()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void unlock()
- {
- throw new UnsupportedOperationException();
-
- }
-
- @Override
- public void returnBlocking()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Lock getLock()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CoreRemotingConnection getConnection()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void confirm(Packet packet)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setCommandConfirmationHandler(CommandConfirmationHandler handler)
- {
- throw new UnsupportedOperationException();
-
- }
-
- @Override
- public void flushConfirmations()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void handlePacket(Packet packet)
- {
- throw new UnsupportedOperationException();
-
- }
-
- @Override
- public void clearCommands()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getConfirmationWindowSize()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setTransferring(boolean transferring)
- {
- throw new UnsupportedOperationException();
- }
-
- }
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-08-10 13:54:41 UTC (rev 11180)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-08-10 15:05:24 UTC (rev 11181)
@@ -137,7 +137,7 @@
{
if (backupServer != null)
{
- // some tests fail the live before the backup is in sync
+ // some tests crash the liveServer before the backupServer is sync'ed
waitForBackup(sf, 3);
}
super.crash(sessions);
@@ -273,6 +273,7 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer.receive(1000);
+ assertNotNull("Just crashed? " + (i == 6) + " " + i, message);
message.acknowledge();
@@ -627,7 +628,7 @@
{
ClientMessage message = consumer.receive(1000);
- Assert.assertNotNull(message);
+ Assert.assertNotNull("expecting message " + i, message);
assertMessageBody(i, message);
@@ -1165,7 +1166,7 @@
if (isDurable(i))
{
ClientMessage message = consumer.receive(1000);
- Assert.assertNotNull(message);
+ Assert.assertNotNull("expecting durable msg " + i, message);
assertMessageBody(i, message);
Assert.assertEquals(i, message.getIntProperty("counter").intValue());
message.acknowledge();
Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedWithDelayFailoverTest.java 2011-08-10 15:05:24 UTC (rev 11181)
@@ -0,0 +1,26 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
+
+public class ReplicatedWithDelayFailoverTest extends ReplicatedFailoverTest
+{
+
+ private BackupSyncDelay syncDelay;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ startBackupServer = false;
+ super.setUp();
+ syncDelay = new BackupSyncDelay(backupServer, liveServer);
+ backupServer.start();
+ }
+
+ @Override
+ protected void crash(ClientSession... sessions) throws Exception
+ {
+ syncDelay.deliverUpToDateMsg();
+ super.crash(sessions);
+ }
+}
Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-08-10 15:05:24 UTC (rev 11181)
@@ -0,0 +1,309 @@
+/**
+ *
+ */
+package org.hornetq.tests.integration.cluster.util;
+
+import java.util.concurrent.locks.Lock;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.CommandConfirmationHandler;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+
+/**
+ * An interceptor to keep a replicated backup server from reaching "up-to-date" status.
+ * <p>
+ * One problem is that we can't add an interceptor to the backup before starting it. So we add the
+ * interceptor to the 'live' which will place a different {@link ChannelHandler} in the backup
+ * during the initialization of replication.
+ * <p>
+ * We need to hijack the replication channel handler, because we need to
+ * <ol>
+ * <li>send an early answer to the {@link PacketImpl#REPLICATION_SYNC} packet that signals being
+ * up-to-date
+ * <li>not send an answer to it, when we deliver the packet later.
+ * </ol>
+ */
+public class BackupSyncDelay implements Interceptor
+{
+
+ private final ReplicationChannelHandler handler = new ReplicationChannelHandler();
+ private final TestableServer backup;
+ private final TestableServer live;
+
+ public void deliverUpToDateMsg()
+ {
+ handler.deliver();
+ }
+
+ public BackupSyncDelay(TestableServer backup, TestableServer live)
+ {
+ assert backup.getServer().getConfiguration().isBackup();
+ assert !live.getServer().getConfiguration().isBackup();
+ this.backup = backup;
+ this.live = live;
+ live.addInterceptor(this);
+ }
+
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+ {
+ try
+ {
+ ReplicationEndpoint repEnd = backup.getServer().getReplicationEndpoint();
+ handler.addSubHandler(repEnd);
+ Channel repChannel = repEnd.getChannel();
+ repChannel.setHandler(handler);
+ handler.setChannel(repChannel);
+ live.removeInterceptor(this);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ }
+
+ public static class ReplicationChannelHandler implements ChannelHandler
+ {
+
+ private ReplicationEndpoint handler;
+ private Packet onHold;
+ private Channel channel;
+ public volatile boolean deliver;
+ private boolean mustHold = true;
+
+ public void addSubHandler(ReplicationEndpoint handler)
+ {
+ this.handler = handler;
+ }
+
+ public synchronized void deliver()
+ {
+ if (onHold == null)
+ {
+ throw new NullPointerException("Don't have the 'sync is done' packet to deliver");
+ }
+ // Use wrapper to avoid sending a response
+ ChannelWrapper wrapper = new ChannelWrapper(channel);
+ handler.setChannel(wrapper);
+ try
+ {
+ handler.handlePacket(onHold);
+ }
+ finally
+ {
+ handler.setChannel(channel);
+ onHold = null;
+ }
+ }
+
+ public void setChannel(Channel channel)
+ {
+ this.channel = channel;
+ }
+
+ public void setHold(boolean hold)
+ {
+ mustHold = hold;
+ }
+
+ @Override
+ public synchronized void handlePacket(Packet packet)
+ {
+
+ if (onHold != null && deliver)
+ {
+ deliver();
+ }
+
+ if (packet.getType() == PacketImpl.REPLICATION_SYNC && mustHold)
+ {
+ ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
+ if (syncMsg.isUpToDate())
+ {
+ assert onHold == null;
+ onHold = packet;
+ PacketImpl response = new ReplicationResponseMessage();
+ channel.send(response);
+ return;
+ }
+ }
+
+ handler.handlePacket(packet);
+ }
+
+ }
+
+ public static class ChannelWrapper implements Channel
+ {
+
+ private final Channel channel;
+
+ /**
+ * @param connection
+ * @param id
+ * @param confWindowSize
+ */
+ public ChannelWrapper(Channel channel)
+ {
+ this.channel = channel;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ChannelWrapper(" + channel + ")";
+ }
+
+ @Override
+ public long getID()
+ {
+ return channel.getID();
+ }
+
+ @Override
+ public void send(Packet packet)
+ {
+ // no-op
+ // channel.send(packet);
+ }
+
+ @Override
+ public void sendBatched(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void sendAndFlush(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Packet sendBlocking(Packet packet) throws HornetQException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setHandler(ChannelHandler handler)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void transferConnection(CoreRemotingConnection newConnection)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void replayCommands(int lastConfirmedCommandID)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getLastConfirmedCommandID()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void lock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unlock()
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void returnBlocking()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Lock getLock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CoreRemotingConnection getConnection()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void confirm(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCommandConfirmationHandler(CommandConfirmationHandler handler)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void flushConfirmations()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void handlePacket(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void clearCommands()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getConfirmationWindowSize()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setTransferring(boolean transferring)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+}
\ No newline at end of file
13 years, 5 months
JBoss hornetq SVN: r11180 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-10 09:54:41 -0400 (Wed, 10 Aug 2011)
New Revision: 11180
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
HORNETQ-720 Fixes tests that would crash 'live' before 'backup' was up-to-date.
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-08-10 11:43:38 UTC (rev 11179)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-08-10 13:54:41 UTC (rev 11180)
@@ -132,6 +132,17 @@
return sf.createSession(xa, autoCommitSends, autoCommitAcks);
}
+ @Override
+ protected void crash(ClientSession... sessions) throws Exception
+ {
+ if (backupServer != null)
+ {
+ // some tests fail the live before the backup is in sync
+ waitForBackup(sf, 3);
+ }
+ super.crash(sessions);
+ }
+
// https://jira.jboss.org/browse/HORNETQ-522
public void testNonTransactedWithZeroConsumerWindowSize() throws Exception
{
13 years, 5 months
JBoss hornetq SVN: r11179 - in branches/HORNETQ-720_Replication: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-10 07:43:38 -0400 (Wed, 10 Aug 2011)
New Revision: 11179
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-720 Add support for appendDelete & more tests
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-10 10:30:16 UTC (rev 11178)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-10 11:43:38 UTC (rev 11179)
@@ -13,6 +13,7 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
/**
@@ -111,9 +112,10 @@
}
@Override
- public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception
{
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
+ writeRecord(deleteRecord, sync, callback);
}
@Override
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-10 10:30:16 UTC (rev 11178)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-10 11:43:38 UTC (rev 11179)
@@ -24,7 +24,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.TransportConfigurationUtils;
@@ -49,6 +48,8 @@
locator.setBlockOnDurableSend(true);
locator.setReconnectAttempts(-1);
sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
+ handler = new ReplicationChannelHandler();
+ liveServer.addInterceptor(new BackupSyncDelay(handler));
}
public void testNodeID() throws Exception
@@ -62,8 +63,6 @@
public void testReserveFileIdValuesOnBackup() throws Exception
{
- handler = new ReplicationChannelHandler();
- liveServer.addInterceptor(new BackupSyncDelay(handler));
createProducerSendSomeMessages();
JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
for (int i = 0; i < 5; i++)
@@ -71,28 +70,90 @@
messageJournal.forceMoveNextFile();
sendMessages(session, producer, N_MSGS);
}
+
backupServer.start();
+
waitForBackup(sessionFactory, 10, false);
// SEND more messages, now with the backup replicating
sendMessages(session, producer, N_MSGS);
+ Set<Long> liveIds = getFileIds(messageJournal);
+
+ finishSyncAndFailover();
+
+ JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+ Set<Long> backupIds = getFileIds(backupMsgJournal);
+ assertEquals("File IDs must match!", liveIds, backupIds);
+ }
+
+ public void testReplicationDuringSync() throws Exception
+ {
+ createProducerSendSomeMessages();
+ backupServer.start();
+ waitForBackup(sessionFactory, 10, false);
+
+ sendMessages(session, producer, N_MSGS);
+ session.commit();
+ receiveMsgs(0, N_MSGS);
+ finishSyncAndFailover();
+ }
+
+ private void finishSyncAndFailover() throws Exception
+ {
handler.deliver = true;
+ // must send one more message to have the "SYNC is DONE" msg delivered.
sendMessages(session, producer, 1);
-
waitForBackup(sessionFactory, 10, true);
-
- Set<Long> liveIds = getFileIds(messageJournal);
assertFalse("should not be initialized", backupServer.getServer().isInitialised());
crash(session);
- waitForServerInitialization(backupServer.getServer(), 5);
+ waitForServerInitialization(backupServer, 5);
+ }
- JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
- Set<Long> backupIds = getFileIds(backupMsgJournal);
- assertEquals("File IDs must match!", liveIds, backupIds);
+ public void testMessageSyncSimple() throws Exception
+ {
+ createProducerSendSomeMessages();
+ startBackupCrashLive();
+ receiveMsgs(0, N_MSGS);
}
- private static void waitForServerInitialization(HornetQServer server, int seconds)
+ public void testMessageSync() throws Exception
{
+ createProducerSendSomeMessages();
+ receiveMsgs(0, N_MSGS / 2);
+ startBackupCrashLive();
+ receiveMsgs(N_MSGS / 2, N_MSGS);
+ }
+
+ private void startBackupCrashLive() throws Exception
+ {
+ assertFalse("backup is started?", backupServer.isStarted());
+ handler.setHold(false);
+ backupServer.start();
+ waitForBackup(sessionFactory, 5);
+ crash(session);
+ waitForServerInitialization(backupServer, 5);
+ }
+
+ private void createProducerSendSomeMessages() throws HornetQException, Exception
+ {
+ session = sessionFactory.createSession(true, true);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+ producer = session.createProducer(FailoverTestBase.ADDRESS);
+ sendMessages(session, producer, N_MSGS);
+ session.commit();
+ }
+
+ private void receiveMsgs(int start, int end) throws HornetQException
+ {
+ session.start();
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ receiveMessagesAndAck(consumer, start, end);
+ consumer.close();
+ session.commit();
+ }
+
+ private static void waitForServerInitialization(TestableServer server, int seconds)
+ {
long time = System.currentTimeMillis();
long toWait = seconds * 1000;
while (!server.isInitialised())
@@ -111,7 +172,6 @@
}
}
}
-
private Set<Long> getFileIds(JournalImpl journal)
{
Set<Long> results = new HashSet<Long>();
@@ -128,40 +188,6 @@
return (JournalImpl)sm.getMessageJournal();
}
- public void testMessageSync() throws Exception
- {
- createProducerSendSomeMessages();
-
- receiveMsgs(0, N_MSGS / 2);
- assertFalse("backup is not started!", backupServer.isStarted());
-
- // BLOCK ON journals
- backupServer.start();
-
- waitForBackup(sessionFactory, 5);
- crash(session);
-
- // consume N/2 from 'new' live (the old backup)
- receiveMsgs(N_MSGS / 2, N_MSGS);
- }
-
- private void createProducerSendSomeMessages() throws HornetQException, Exception
- {
- session = sessionFactory.createSession(true, true);
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
- producer = session.createProducer(FailoverTestBase.ADDRESS);
-
- sendMessages(session, producer, N_MSGS);
- session.start();
- }
-
- private void receiveMsgs(int start, int end) throws HornetQException
- {
- ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
- receiveMessagesAndAck(consumer, start, end);
- session.commit();
- }
-
@Override
protected void tearDown() throws Exception
{
@@ -233,6 +259,7 @@
private Packet onHold;
private Channel channel;
public volatile boolean deliver;
+ private boolean mustHold = true;
public void addSubHandler(ReplicationEndpoint handler)
{
@@ -244,6 +271,11 @@
this.channel = channel;
}
+ public void setHold(boolean hold)
+ {
+ mustHold = hold;
+ }
+
@Override
public void handlePacket(Packet packet)
{
@@ -264,7 +296,7 @@
}
}
- if (packet.getType() == PacketImpl.REPLICATION_SYNC)
+ if (packet.getType() == PacketImpl.REPLICATION_SYNC && mustHold)
{
ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
if (syncMsg.isUpToDate())
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-08-10 10:30:16 UTC (rev 11178)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-08-10 11:43:38 UTC (rev 11179)
@@ -513,7 +513,7 @@
}
/**
- * Send messages with pre-specified body.
+ * Send durable messages with pre-specified body.
* @param session
* @param producer
* @param numMessages
@@ -531,13 +531,13 @@
}
- protected final
- void receiveMessagesAndAck(ClientConsumer consumer, int start, int msgCount) throws HornetQException
+ protected final void receiveMessagesAndAck(ClientConsumer consumer, final int start, int msgCount)
+ throws HornetQException
{
for (int i = start; i < msgCount; i++)
{
ClientMessage message = consumer.receive(1000);
- Assert.assertNotNull(message);
+ Assert.assertNotNull("Expecting a message " + i, message);
assertMessageBody(i, message);
Assert.assertEquals(i, message.getIntProperty("counter").intValue());
message.acknowledge();
13 years, 5 months
JBoss hornetq SVN: r11178 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal/impl and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-10 06:30:16 -0400 (Wed, 10 Aug 2011)
New Revision: 11178
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
Log:
HORNETQ-720 Current file must be open at the correct position.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-10 10:29:24 UTC (rev 11177)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-10 10:30:16 UTC (rev 11178)
@@ -403,6 +403,7 @@
}
// files should be already in place.
filesReservedForSync.remove(jc);
+ getJournal(jc.typeByte).stop();
registerJournal(jc.typeByte, journal);
journal.loadInternalOnly();
// XXX HORNETQ-720 must reload journals
@@ -456,7 +457,6 @@
JournalImpl journal = assertJournalImpl(journalIf);
Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(), mapToFill);
- current.getFile().open(1, false);
registerJournal(packet.getJournalContentType().typeByte,
new FileWrapperJournal(current, storage.hasCallbackSupport()));
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-10 10:29:24 UTC (rev 11177)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-10 10:30:16 UTC (rev 11178)
@@ -443,9 +443,13 @@
return nextFile;
}
- public JournalFile createRemoteBackupSyncFile(long fileID, boolean init) throws Exception
+ /**
+ * Creates files for journal synchronization of a replicated backup.
+ * @param isCurrent a current file is initialized and kept open.
+ */
+ public JournalFile createRemoteBackupSyncFile(long fileID, boolean isCurrent) throws Exception
{
- return createFile(false, false, init, false, fileID);
+ return createFile(isCurrent, false, isCurrent, false, fileID);
}
// Package protected ---------------------------------------------
13 years, 5 months
JBoss hornetq SVN: r11177 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-10 06:29:24 -0400 (Wed, 10 Aug 2011)
New Revision: 11177
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 Fix system to delay "up-to-date" msg.
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-10 10:28:45 UTC (rev 11176)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-10 10:29:24 UTC (rev 11177)
@@ -2,6 +2,7 @@
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.locks.Lock;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
@@ -16,9 +17,12 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.CommandConfirmationHandler;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -209,6 +213,8 @@
handler.addSubHandler(repEnd);
Channel repChannel = repEnd.getChannel();
repChannel.setHandler(handler);
+ handler.setChannel(repChannel);
+ liveServer.removeInterceptor(this);
}
catch (Exception e)
{
@@ -223,33 +229,217 @@
private static class ReplicationChannelHandler implements ChannelHandler
{
- private ChannelHandler handler;
+ private ReplicationEndpoint handler;
private Packet onHold;
+ private Channel channel;
public volatile boolean deliver;
- public void addSubHandler(ChannelHandler handler)
+ public void addSubHandler(ReplicationEndpoint handler)
{
this.handler = handler;
}
+ public void setChannel(Channel channel)
+ {
+ this.channel = channel;
+ }
+
@Override
public void handlePacket(Packet packet)
{
+
if (onHold != null && deliver)
{
- handler.handlePacket(onHold);
+ // Use wrapper to avoid sending a response
+ ChannelWrapper wrapper = new ChannelWrapper(channel);
+ handler.setChannel(wrapper);
+ try
+ {
+ handler.handlePacket(onHold);
+ }
+ finally
+ {
+ handler.setChannel(channel);
+ onHold = null;
+ }
}
+
if (packet.getType() == PacketImpl.REPLICATION_SYNC)
{
ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
if (syncMsg.isUpToDate())
{
+ assert onHold == null;
onHold = packet;
+ PacketImpl response = new ReplicationResponseMessage();
+ channel.send(response);
return;
}
}
+
handler.handlePacket(packet);
}
}
+
+ private static class ChannelWrapper implements Channel
+ {
+
+ private final Channel channel;
+
+ /**
+ * @param connection
+ * @param id
+ * @param confWindowSize
+ */
+ public ChannelWrapper(Channel channel)
+ {
+ this.channel = channel;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ChannelWrapper(" + channel + ")";
+ }
+
+ @Override
+ public long getID()
+ {
+ return channel.getID();
+ }
+
+ @Override
+ public void send(Packet packet)
+ {
+ // no-op
+ // channel.send(packet);
+ }
+
+ @Override
+ public void sendBatched(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void sendAndFlush(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Packet sendBlocking(Packet packet) throws HornetQException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setHandler(ChannelHandler handler)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void transferConnection(CoreRemotingConnection newConnection)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void replayCommands(int lastConfirmedCommandID)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getLastConfirmedCommandID()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void lock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unlock()
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void returnBlocking()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Lock getLock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CoreRemotingConnection getConnection()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void confirm(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCommandConfirmationHandler(CommandConfirmationHandler handler)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void flushConfirmations()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void handlePacket(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void clearCommands()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getConfirmationWindowSize()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setTransferring(boolean transferring)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ }
}
13 years, 5 months