[hornetq-commits] JBoss hornetq SVN: r8356 - in branches/ClebertTemporary: src/main/org/hornetq/core/persistence/impl/journal and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Nov 21 00:33:45 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-21 00:33:45 -0500 (Sat, 21 Nov 2009)
New Revision: 8356

Added:
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/OrderTest.java
Modified:
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Replication Ordering

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java	2009-11-21 03:07:33 UTC (rev 8355)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -39,12 +39,7 @@
    /** To be called when there are no more operations pending */
    void complete();
    
-   /** Replication may need some extra controls to guarantee ordering
-    *  when nothing is persisted through the contexts 
-    * @return The context is empty
-    */
-   boolean isEmpty();
-   
-   void setEmpty(boolean empty);
+   /** Is this a special operation to sync replication. */
+   boolean isSync();
 
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-21 03:07:33 UTC (rev 8355)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -153,4 +153,6 @@
 
 
    void deleteGrouping(GroupBinding groupBinding) throws Exception;
+
+   void sync();
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-21 03:07:33 UTC (rev 8355)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -504,6 +504,14 @@
       messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getIOContext());
    }
 
+   public void sync()
+   {
+      if (replicator != null)
+      {
+         replicator.sync();
+      }
+   }
+
    // Transactional operations
 
    public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
@@ -1321,7 +1329,7 @@
 
       return info;
    }
-
+   
    // Public -----------------------------------------------------------------------------------
 
    public Journal getMessageJournal()

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-21 03:07:33 UTC (rev 8355)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -138,20 +138,12 @@
    {
       tlContext.set(null);
    }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
-    */
-   public boolean isEmpty()
+   
+   public boolean isSync()
    {
-      return empty;
+      return false;
    }
 
-   public void setEmpty(final boolean sync)
-   {
-      this.empty = sync;
-   }
-
    /* (non-Javadoc)
     * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
     */

Added: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java	                        (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.persistence.OperationContext;
+
+/**
+ * A SyncOperation
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class SyncOperation implements OperationContext
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   
+   OperationContext ctx;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   
+   public SyncOperation (OperationContext ctx)
+   {
+      this.ctx = ctx;
+   }
+
+   // Public --------------------------------------------------------
+
+   /**
+    * 
+    * @see org.hornetq.core.persistence.OperationContext#complete()
+    */
+   public void complete()
+   {
+      ctx.complete();
+   }
+
+   /**
+    * 
+    * @see org.hornetq.core.asyncio.AIOCallback#done()
+    */
+   public void done()
+   {
+      ctx.done();
+   }
+
+   /**
+    * @param runnable
+    * @see org.hornetq.core.persistence.OperationContext#executeOnCompletion(org.hornetq.core.journal.IOAsyncTask)
+    */
+   public void executeOnCompletion(IOAsyncTask runnable)
+   {
+      ctx.executeOnCompletion(runnable);
+   }
+
+   /**
+    * @return
+    * @see org.hornetq.core.persistence.OperationContext#hasReplication()
+    */
+   public boolean hasReplication()
+   {
+      return ctx.hasReplication();
+   }
+
+   /**
+    * @return
+    * @see org.hornetq.core.persistence.OperationContext#isSync()
+    */
+   public boolean isSync()
+   {
+      return true;
+   }
+
+   /**
+    * 
+    * @see org.hornetq.core.journal.IOCompletion#lineUp()
+    */
+   public void lineUp()
+   {
+      ctx.lineUp();
+   }
+
+   /**
+    * @param errorCode
+    * @param errorMessage
+    * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
+    */
+   public void onError(int errorCode, String errorMessage)
+   {
+      ctx.onError(errorCode, errorMessage);
+   }
+
+   /**
+    * 
+    * @see org.hornetq.core.persistence.OperationContext#replicationDone()
+    */
+   public void replicationDone()
+   {
+      ctx.replicationDone();
+   }
+
+   /**
+    * 
+    * @see org.hornetq.core.persistence.OperationContext#replicationLineUp()
+    */
+   public void replicationLineUp()
+   {
+      ctx.replicationLineUp();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-21 03:07:33 UTC (rev 8355)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -917,6 +917,13 @@
                }
             }
          }
+         else
+         {
+            if (storageManager.isReplicated())
+            {
+               storageManager.sync();
+            }
+         }
 
          message.incrementRefCount(reference);
       }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-21 03:07:33 UTC (rev 8355)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -85,5 +85,7 @@
     * @throws HornetQException 
     */
    void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
+   
+   void sync();
 
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-21 03:07:33 UTC (rev 8355)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -29,6 +29,7 @@
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.SyncOperation;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.Packet;
@@ -423,6 +424,25 @@
 
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
+    */
+   public void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException
+   {
+      replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
+   }
+
+   public void sync()
+   {
+      sync(OperationContextImpl.getContext());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+   
    private void sendReplicatePacket(final Packet packet)
    {
       boolean runItNow = false;
@@ -454,14 +474,6 @@
       }
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
-    */
-   public void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException
-   {
-      replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
-   }
-
    private void replicated()
    {
       List<OperationContext> tokensToExecute = getTokens();
@@ -472,19 +484,12 @@
       }
    }
 
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
    private void sync(OperationContext context)
    {
       boolean executeNow = false;
       synchronized (replicationLock)
       {
-         context.lineUp();
-         context.setEmpty(true);
+         context.replicationLineUp();
          if (pendingTokens.isEmpty())
          {
             // this means the list is empty and we should process it now
@@ -494,7 +499,7 @@
          {
             // adding the sync to be executed in order
             // as soon as the reponses are back from the backup
-            this.pendingTokens.add(context);
+            this.pendingTokens.add(new SyncOperation(context));
          }
       }
       if (executeNow)
@@ -534,16 +539,16 @@
          retList.add(tokenPolled);
 
       }
-      while (tokenPolled.isEmpty());
+      while (tokenPolled.isSync());
 
       // This is to avoid a situation where we won't have more replicated packets
       // We need to make sure we process any pending sync packet up to the next non empty packet
       synchronized (replicationLock)
       {
-         while (!pendingTokens.isEmpty() && pendingTokens.peek().isEmpty())
+         while (!pendingTokens.isEmpty() && pendingTokens.peek().isSync())
          {
             tokenPolled = pendingTokens.poll();
-            if (!tokenPolled.isEmpty())
+            if (!tokenPolled.isSync())
             {
                throw new IllegalStateException("Replicatoin context is not a roundtrip token as expected");
             }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-21 03:07:33 UTC (rev 8355)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -374,11 +374,6 @@
 
          managementService.stop();
 
-         if (storageManager != null)
-         {
-            storageManager.stop();
-         }
-
          if (replicationEndpoint != null)
          {
             replicationEndpoint.stop();
@@ -410,7 +405,14 @@
          }
 
          threadPool.shutdown();
+         
+         threadPool.awaitTermination(60, TimeUnit.SECONDS);
 
+         if (storageManager != null)
+         {
+            storageManager.stop();
+         }
+
          scheduledPool = null;
 
          if (pagingManager != null)

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-21 03:07:33 UTC (rev 8355)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -40,6 +40,7 @@
 import org.hornetq.core.management.Notification;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.BindingType;
 import org.hornetq.core.postoffice.Bindings;
@@ -1460,7 +1461,7 @@
    public void handleSend(final SessionSendMessage packet)
    {
       Packet response = null;
-
+      
       ServerMessage message = packet.getServerMessage();
 
       try

Added: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/OrderTest.java	                        (rev 0)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/OrderTest.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.SessionFailureListener;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A OrderTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OrderTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private HornetQServer server;
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      server = createServer(true, true);
+      server.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
+      server.start();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+      super.tearDown();
+   }
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testLoop() throws Exception
+   {
+      for (int i = 0 ; i < 50; i ++)
+      {
+         testSimpleOrder();
+         tearDown();
+         setUp();
+      }
+   }
+   
+   public void testSimpleOrder() throws Exception
+   {
+      ClientSessionFactory sf = createNettyFactory();
+
+      sf.setBlockOnNonPersistentSend(true);
+      sf.setBlockOnPersistentSend(true);
+      sf.setBlockOnAcknowledge(true);
+
+      
+      ClientSession session = sf.createSession(true, true, 0);
+      
+      try
+      {
+         session.createQueue("queue", "queue", true);
+
+         ClientProducer prod = session.createProducer("queue");
+
+
+         for (int i = 0; i < 100; i++)
+         {
+            ClientMessage msg = session.createClientMessage(i == 0);
+            msg.setBody(session.createBuffer(new byte[1024]));
+            msg.putIntProperty("id", i);
+            prod.send(msg);
+         }
+
+         session.close();
+         
+         boolean started = false;
+
+         for (int start = 0; start < 3; start++)
+         {
+            
+            
+            if (start == 20)
+            {
+               started = true;
+               server.stop();
+               server.start();
+            }
+            
+            session = sf.createSession(true, true);
+            
+            session.start();
+            
+//            fail(session);
+            
+            ClientConsumer cons = session.createConsumer("queue");
+
+            for (int i = 0; i < 100; i++)
+            {
+               if (!started || started && i % 2 == 0)
+               {
+                  ClientMessage msg = cons.receive(10000);
+                  assertEquals(i, msg.getIntProperty("id").intValue());
+               }
+            }
+
+            cons.close();
+
+            cons = session.createConsumer("queue");
+
+            for (int i = 0; i < 100; i++)
+            {
+               if (!started || started && i % 2 == 0)
+               {
+                  ClientMessage msg = cons.receive(10000);
+                  assertEquals(i, msg.getIntProperty("id").intValue());
+               }
+            }
+            
+            session.close();
+         }
+
+      }
+      finally
+      {
+         sf.close();
+         session.close();
+      }
+
+   }
+   
+   
+   private void fail(ClientSession session) throws InterruptedException
+   {
+      
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      class MyListener implements SessionFailureListener
+      {
+         public void connectionFailed(HornetQException me)
+         {
+            latch.countDown();
+         }
+
+         public void beforeReconnect(HornetQException exception)
+         {
+         }
+      }
+
+      MyListener listener = new MyListener();
+      session.addFailureListener(listener);
+
+
+      
+      RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+
+      // Simulate failure on connection
+      conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+      // Wait to be informed of failure
+
+      boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+      assertTrue(ok);
+      
+      session.removeFailureListener(listener);
+   }
+
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-11-21 03:07:33 UTC (rev 8355)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -169,7 +169,6 @@
       assertEquals(0, sf.numConnections());
    }
 
-   
    /** It doesn't fail, but it restart both servers, live and backup, and the data should be received after the restart,
     *  and the servers should be able to connect without any problems. */
    public void testRestartServers() throws Exception
@@ -1670,6 +1669,73 @@
       assertEquals(0, sf.numConnections());
    }
 
+   public void testSimpleSendAfterFailover() throws Exception
+   {
+      ClientSessionFactoryInternal sf = getSessionFactory();
+
+      sf.setBlockOnNonPersistentSend(true);
+      sf.setBlockOnPersistentSend(true);
+      sf.setBlockOnAcknowledge(true);
+
+      ClientSession session = sf.createSession(true, true, 0);
+
+      session.createQueue(ADDRESS, ADDRESS, null, true);
+
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      class MyListener extends BaseListener
+      {
+         public void connectionFailed(HornetQException me)
+         {
+            latch.countDown();
+         }
+      }
+
+      session.addFailureListener(new MyListener());
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 100;
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      fail(session, latch);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(i % 2 == 0);
+
+         setBody(i, message);
+         
+         System.out.println("Durable = " + message.isDurable());
+
+         message.putIntProperty("counter", i);
+
+         producer.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+
+         assertNotNull(message);
+
+         assertMessageBody(i, message);
+
+         assertEquals(i, message.getIntProperty("counter").intValue());
+
+         message.acknowledge();
+      }
+
+      session.close();
+
+      assertEquals(0, sf.numSessions());
+
+      assertEquals(0, sf.numConnections());
+   }
+
    public void testForceBlockingReturn() throws Exception
    {
       ClientSessionFactoryInternal sf = this.getSessionFactory();

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-21 03:07:33 UTC (rev 8355)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-21 05:33:45 UTC (rev 8356)
@@ -494,7 +494,7 @@
       }
    }
 
-   public void disabledForNowtestOrderOnNonPersistency() throws Exception
+   public void testOrderOnNonPersistency() throws Exception
    {
 
       Configuration config = createDefaultConfig(false);
@@ -529,6 +529,10 @@
             {
                replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
             }
+            else
+            {
+               manager.sync();
+            }
 
             OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
             {
@@ -539,6 +543,7 @@
 
                public void done()
                {
+                  executions.add(nAdd);
                   latch.countDown();
                }
             });



More information about the hornetq-commits mailing list