Author: clebert.suconic(a)jboss.com
Date: 2009-11-21 00:33:45 -0500 (Sat, 21 Nov 2009)
New Revision: 8356
Added:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/OrderTest.java
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Replication Ordering
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-21
03:07:33 UTC (rev 8355)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -39,12 +39,7 @@
/** To be called when there are no more operations pending */
void complete();
- /** Replication may need some extra controls to guarantee ordering
- * when nothing is persisted through the contexts
- * @return The context is empty
- */
- boolean isEmpty();
-
- void setEmpty(boolean empty);
+ /** Is this a special operation to sync replication. */
+ boolean isSync();
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-21
03:07:33 UTC (rev 8355)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -153,4 +153,6 @@
void deleteGrouping(GroupBinding groupBinding) throws Exception;
+
+ void sync();
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-21
03:07:33 UTC (rev 8355)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -504,6 +504,14 @@
messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getIOContext());
}
+ public void sync()
+ {
+ if (replicator != null)
+ {
+ replicator.sync();
+ }
+ }
+
// Transactional operations
public void storeMessageTransactional(final long txID, final ServerMessage message)
throws Exception
@@ -1321,7 +1329,7 @@
return info;
}
-
+
// Public
-----------------------------------------------------------------------------------
public Journal getMessageJournal()
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-21
03:07:33 UTC (rev 8355)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -138,20 +138,12 @@
{
tlContext.set(null);
}
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
- */
- public boolean isEmpty()
+
+ public boolean isSync()
{
- return empty;
+ return false;
}
- public void setEmpty(final boolean sync)
- {
- this.empty = sync;
- }
-
/* (non-Javadoc)
* @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
*/
Added:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
(rev 0)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.persistence.OperationContext;
+
+/**
+ * A SyncOperation
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class SyncOperation implements OperationContext
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ OperationContext ctx;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SyncOperation (OperationContext ctx)
+ {
+ this.ctx = ctx;
+ }
+
+ // Public --------------------------------------------------------
+
+ /**
+ *
+ * @see org.hornetq.core.persistence.OperationContext#complete()
+ */
+ public void complete()
+ {
+ ctx.complete();
+ }
+
+ /**
+ *
+ * @see org.hornetq.core.asyncio.AIOCallback#done()
+ */
+ public void done()
+ {
+ ctx.done();
+ }
+
+ /**
+ * @param runnable
+ * @see
org.hornetq.core.persistence.OperationContext#executeOnCompletion(org.hornetq.core.journal.IOAsyncTask)
+ */
+ public void executeOnCompletion(IOAsyncTask runnable)
+ {
+ ctx.executeOnCompletion(runnable);
+ }
+
+ /**
+ * @return
+ * @see org.hornetq.core.persistence.OperationContext#hasReplication()
+ */
+ public boolean hasReplication()
+ {
+ return ctx.hasReplication();
+ }
+
+ /**
+ * @return
+ * @see org.hornetq.core.persistence.OperationContext#isSync()
+ */
+ public boolean isSync()
+ {
+ return true;
+ }
+
+ /**
+ *
+ * @see org.hornetq.core.journal.IOCompletion#lineUp()
+ */
+ public void lineUp()
+ {
+ ctx.lineUp();
+ }
+
+ /**
+ * @param errorCode
+ * @param errorMessage
+ * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
+ */
+ public void onError(int errorCode, String errorMessage)
+ {
+ ctx.onError(errorCode, errorMessage);
+ }
+
+ /**
+ *
+ * @see org.hornetq.core.persistence.OperationContext#replicationDone()
+ */
+ public void replicationDone()
+ {
+ ctx.replicationDone();
+ }
+
+ /**
+ *
+ * @see org.hornetq.core.persistence.OperationContext#replicationLineUp()
+ */
+ public void replicationLineUp()
+ {
+ ctx.replicationLineUp();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-21
03:07:33 UTC (rev 8355)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -917,6 +917,13 @@
}
}
}
+ else
+ {
+ if (storageManager.isReplicated())
+ {
+ storageManager.sync();
+ }
+ }
message.incrementRefCount(reference);
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-21
03:07:33 UTC (rev 8355)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -85,5 +85,7 @@
* @throws HornetQException
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
+
+ void sync();
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-21
03:07:33 UTC (rev 8355)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -29,6 +29,7 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.SyncOperation;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Packet;
@@ -423,6 +424,25 @@
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
+ */
+ public void compareJournals(JournalLoadInformation[] journalInfo) throws
HornetQException
+ {
+ replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
+ }
+
+ public void sync()
+ {
+ sync(OperationContextImpl.getContext());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
private void sendReplicatePacket(final Packet packet)
{
boolean runItNow = false;
@@ -454,14 +474,6 @@
}
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
- */
- public void compareJournals(JournalLoadInformation[] journalInfo) throws
HornetQException
- {
- replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
- }
-
private void replicated()
{
List<OperationContext> tokensToExecute = getTokens();
@@ -472,19 +484,12 @@
}
}
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
private void sync(OperationContext context)
{
boolean executeNow = false;
synchronized (replicationLock)
{
- context.lineUp();
- context.setEmpty(true);
+ context.replicationLineUp();
if (pendingTokens.isEmpty())
{
// this means the list is empty and we should process it now
@@ -494,7 +499,7 @@
{
// adding the sync to be executed in order
// as soon as the reponses are back from the backup
- this.pendingTokens.add(context);
+ this.pendingTokens.add(new SyncOperation(context));
}
}
if (executeNow)
@@ -534,16 +539,16 @@
retList.add(tokenPolled);
}
- while (tokenPolled.isEmpty());
+ while (tokenPolled.isSync());
// This is to avoid a situation where we won't have more replicated packets
// We need to make sure we process any pending sync packet up to the next non empty
packet
synchronized (replicationLock)
{
- while (!pendingTokens.isEmpty() && pendingTokens.peek().isEmpty())
+ while (!pendingTokens.isEmpty() && pendingTokens.peek().isSync())
{
tokenPolled = pendingTokens.poll();
- if (!tokenPolled.isEmpty())
+ if (!tokenPolled.isSync())
{
throw new IllegalStateException("Replicatoin context is not a
roundtrip token as expected");
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-21
03:07:33 UTC (rev 8355)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -374,11 +374,6 @@
managementService.stop();
- if (storageManager != null)
- {
- storageManager.stop();
- }
-
if (replicationEndpoint != null)
{
replicationEndpoint.stop();
@@ -410,7 +405,14 @@
}
threadPool.shutdown();
+
+ threadPool.awaitTermination(60, TimeUnit.SECONDS);
+ if (storageManager != null)
+ {
+ storageManager.stop();
+ }
+
scheduledPool = null;
if (pagingManager != null)
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-21
03:07:33 UTC (rev 8355)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -40,6 +40,7 @@
import org.hornetq.core.management.Notification;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.postoffice.Bindings;
@@ -1460,7 +1461,7 @@
public void handleSend(final SessionSendMessage packet)
{
Packet response = null;
-
+
ServerMessage message = packet.getServerMessage();
try
Added:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/OrderTest.java
(rev 0)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.SessionFailureListener;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A OrderTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OrderTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
+ server.start();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ super.tearDown();
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testLoop() throws Exception
+ {
+ for (int i = 0 ; i < 50; i ++)
+ {
+ testSimpleOrder();
+ tearDown();
+ setUp();
+ }
+ }
+
+ public void testSimpleOrder() throws Exception
+ {
+ ClientSessionFactory sf = createNettyFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ try
+ {
+ session.createQueue("queue", "queue", true);
+
+ ClientProducer prod = session.createProducer("queue");
+
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = session.createClientMessage(i == 0);
+ msg.setBody(session.createBuffer(new byte[1024]));
+ msg.putIntProperty("id", i);
+ prod.send(msg);
+ }
+
+ session.close();
+
+ boolean started = false;
+
+ for (int start = 0; start < 3; start++)
+ {
+
+
+ if (start == 20)
+ {
+ started = true;
+ server.stop();
+ server.start();
+ }
+
+ session = sf.createSession(true, true);
+
+ session.start();
+
+// fail(session);
+
+ ClientConsumer cons = session.createConsumer("queue");
+
+ for (int i = 0; i < 100; i++)
+ {
+ if (!started || started && i % 2 == 0)
+ {
+ ClientMessage msg = cons.receive(10000);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ }
+ }
+
+ cons.close();
+
+ cons = session.createConsumer("queue");
+
+ for (int i = 0; i < 100; i++)
+ {
+ if (!started || started && i % 2 == 0)
+ {
+ ClientMessage msg = cons.receive(10000);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ }
+ }
+
+ session.close();
+ }
+
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+
+ }
+
+
+ private void fail(ClientSession session) throws InterruptedException
+ {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements SessionFailureListener
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ latch.countDown();
+ }
+
+ public void beforeReconnect(HornetQException exception)
+ {
+ }
+ }
+
+ MyListener listener = new MyListener();
+ session.addFailureListener(listener);
+
+
+
+ RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+
+ // Simulate failure on connection
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ // Wait to be informed of failure
+
+ boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+
+ session.removeFailureListener(listener);
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-11-21
03:07:33 UTC (rev 8355)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -169,7 +169,6 @@
assertEquals(0, sf.numConnections());
}
-
/** It doesn't fail, but it restart both servers, live and backup, and the data
should be received after the restart,
* and the servers should be able to connect without any problems. */
public void testRestartServers() throws Exception
@@ -1670,6 +1669,73 @@
assertEquals(0, sf.numConnections());
}
+ public void testSimpleSendAfterFailover() throws Exception
+ {
+ ClientSessionFactoryInternal sf = getSessionFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener extends BaseListener
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ session.addFailureListener(new MyListener());
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 100;
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ fail(session, latch);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(i % 2 == 0);
+
+ setBody(i, message);
+
+ System.out.println("Durable = " + message.isDurable());
+
+ message.putIntProperty("counter", i);
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertNotNull(message);
+
+ assertMessageBody(i, message);
+
+ assertEquals(i, message.getIntProperty("counter").intValue());
+
+ message.acknowledge();
+ }
+
+ session.close();
+
+ assertEquals(0, sf.numSessions());
+
+ assertEquals(0, sf.numConnections());
+ }
+
public void testForceBlockingReturn() throws Exception
{
ClientSessionFactoryInternal sf = this.getSessionFactory();
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-21
03:07:33 UTC (rev 8355)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-21
05:33:45 UTC (rev 8356)
@@ -494,7 +494,7 @@
}
}
- public void disabledForNowtestOrderOnNonPersistency() throws Exception
+ public void testOrderOnNonPersistency() throws Exception
{
Configuration config = createDefaultConfig(false);
@@ -529,6 +529,10 @@
{
replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
}
+ else
+ {
+ manager.sync();
+ }
OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
{
@@ -539,6 +543,7 @@
public void done()
{
+ executions.add(nAdd);
latch.countDown();
}
});