[hornetq-commits] JBoss hornetq SVN: r10441 - in trunk: src/main/org/hornetq/core/journal/impl and 12 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Apr 4 00:04:10 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-04-04 00:04:09 -0400 (Mon, 04 Apr 2011)
New Revision: 10441

Added:
   trunk/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java
Modified:
   trunk/src/main/org/hornetq/core/journal/Journal.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/persistence/StorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/src/main/org/hornetq/core/transaction/Transaction.java
   trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
   trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://issues.jboss.org/browse/HORNETQ-664 - Async commit on Paging

Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -75,6 +75,15 @@
 
    void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception;
 
+   /**
+    * @param txID
+    * @param sync
+    * @param callback
+    * @param useLineUp if appendCommitRecord should call a storeLineUp. This is because the caller may have already taken into account
+    * @throws Exception
+    */
+   void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception;
+
    /** 
     * 
     * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction 
@@ -106,6 +115,8 @@
     *  This is only useful if you're using the journal but not interested on the current data.
     *  Useful in situations where the journal is being replicated, copied... etc. */
    JournalLoadInformation loadInternalOnly() throws Exception;
+   
+   void lineUpContex(IOCompletion callback);
 
    JournalLoadInformation load(List<RecordInfo> committedRecords,
                                List<PreparedTransactionInfo> preparedTransactions,

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -1289,7 +1289,7 @@
    {
       SyncIOCompletion syncCompletion = getSyncCallback(sync);
 
-      appendCommitRecord(txID, sync, syncCompletion);
+      appendCommitRecord(txID, sync, syncCompletion, true);
 
       if (syncCompletion != null)
       {
@@ -1297,6 +1297,20 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#lineUpContex(org.hornetq.core.journal.IOCompletion)
+    */
+   public void lineUpContex(IOCompletion callback)
+   {
+      callback.storeLineUp();
+   }
+
+   public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
+   {
+      appendCommitRecord(txID, sync, callback, true);
+   }
+
+   
    /**
     * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
     * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file. 
@@ -1314,7 +1328,7 @@
     *
     */
 
-   public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
+   public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback, boolean lineUpContext) throws Exception
    {
       if (state != JournalImpl.STATE_LOADED)
       {
@@ -1334,7 +1348,7 @@
 
          JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, txID, null);
 
-         if (callback != null)
+         if (callback != null && lineUpContext)
          {
             callback.storeLineUp();
          }

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -889,6 +889,11 @@
          {
             sync();
          }
+         
+         if (tx != null)
+         {
+            tx.setWaitBeforeCommit(true);
+         }
 
          return true;
       }

Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -54,6 +54,8 @@
 
    /** Get the context associated with the thread for later reuse */
    OperationContext getContext();
+   
+   void lineUpContext();
 
    /** It just creates an OperationContext without associating it */
    OperationContext newContext(Executor executor);
@@ -146,6 +148,8 @@
 
    void commit(long txID) throws Exception;
 
+   void commit(long txID, boolean lineUpContext) throws Exception;
+
    void rollback(long txID) throws Exception;
 
    void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -707,9 +707,14 @@
 
    public void commit(final long txID) throws Exception
    {
-      messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional));
+      commit(txID, true);
    }
 
+   public void commit(final long txID, final boolean lineUpContext) throws Exception
+   {
+      messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional), lineUpContext);
+   }
+
    public void rollback(final long txID) throws Exception
    {
       messageJournal.appendRollbackRecord(txID, syncTransactional, getContext(syncTransactional));
@@ -3291,4 +3296,11 @@
       journal.stop();
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
+    */
+   public void lineUpContext()
+   {
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -353,9 +353,12 @@
    public String toString()
    {
       StringBuffer buffer = new StringBuffer();
-      for (TaskHolder hold : tasks)
+      if (tasks != null)
       {
-         buffer.append("Task = " + hold + "\n");
+         for (TaskHolder hold : tasks)
+         {
+            buffer.append("Task = " + hold + "\n");
+         }
       }
       
       return "OperationContextImpl [minimalStore=" + minimalStore +

Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -553,4 +553,20 @@
       return 0;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#commit(long, boolean)
+    */
+   public void commit(long txID, boolean lineUpContext) throws Exception
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
+    */
+   public void lineUpContext()
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationManager.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationManager.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -44,7 +44,7 @@
 
    void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception;
 
-   void appendCommitRecord(byte journalID, long txID) throws Exception;
+   void appendCommitRecord(byte journalID, long txID, boolean lineUp) throws Exception;
 
    void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -172,7 +172,7 @@
       {
          ReplicatedJournal.trace("AppendCommit " + txID);
       }
-      replicationManager.appendCommitRecord(journalID, txID);
+      replicationManager.appendCommitRecord(journalID, txID, true);
       localJournal.appendCommitRecord(txID, sync);
    }
 
@@ -185,10 +185,25 @@
       {
          ReplicatedJournal.trace("AppendCommit " + txID);
       }
-      replicationManager.appendCommitRecord(journalID, txID);
+      replicationManager.appendCommitRecord(journalID, txID, true);
       localJournal.appendCommitRecord(txID, sync, callback);
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean, org.hornetq.core.journal.IOCompletion, boolean)
+    */
+   public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
+   {
+      if (ReplicatedJournal.trace)
+      {
+         ReplicatedJournal.trace("AppendCommit " + txID);
+      }
+      replicationManager.appendCommitRecord(journalID, txID, lineUpContext);
+      localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
+      
+   }
+
+   
    /**
     * @param id
     * @param sync
@@ -544,6 +559,15 @@
       return localJournal.getUserVersion();
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#lineUpContex(org.hornetq.core.journal.IOCompletion)
+    */
+   public void lineUpContex(IOCompletion callback)
+   {
+      localJournal.lineUpContex(callback);
+   }
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -165,11 +165,11 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
     */
-   public void appendCommitRecord(final byte journalID, final long txID) throws Exception
+   public void appendCommitRecord(final byte journalID, final long txID, final boolean lineUp) throws Exception
    {
       if (enabled)
       {
-         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
       }
    }
 
@@ -440,10 +440,18 @@
 
    private void sendReplicatePacket(final Packet packet)
    {
+      sendReplicatePacket(packet, true);
+   }
+
+   private void sendReplicatePacket(final Packet packet, boolean lineUp)
+   {
       boolean runItNow = false;
 
       OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
-      repliToken.replicationLineUp();
+      if (lineUp)
+      {
+         repliToken.replicationLineUp();
+      }
 
       synchronized (replicationLock)
       {

Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -65,12 +65,10 @@
    
    boolean hasTimedOut(long currentTime, int defaultTimeout);
    
-   /** We don't want to look on operations at every send, so we keep the paging attribute and will only look at 
-    *  the PagingOperation case this attribute is true*/
-   boolean isPaging();
-   
-   void setPaging(boolean paging);
+   boolean isWaitBeforeCommit();
 
+   void setWaitBeforeCommit(boolean waitBeforeCommit);
+
    void putProperty(int index, Object property);
 
    Object getProperty(int index);

Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
@@ -48,7 +49,10 @@
 
    private final long id;
    
-   private boolean paging = false;
+   /**
+    * if the appendCommit has to be done only after the current operations are completed
+    */
+   private boolean waitBeforeCommit = false;
 
    private volatile State state = State.ACTIVE;
 
@@ -259,8 +263,17 @@
 
          if (containsPersistent || xid != null && state == State.PREPARED)
          {
-            storageManager.commit(id);
 
+            if (waitBeforeCommit)
+            {
+               // we will wait all the pending operations to finish before we can add this
+               asyncAppendCommit();
+            }
+            else
+            {
+               storageManager.commit(id);
+            }
+
             state = State.COMMITTED;
          }
 
@@ -288,6 +301,39 @@
       }
    }
 
+   /**
+    * 
+    */
+   protected void asyncAppendCommit()
+   {
+      final OperationContext ctx = storageManager.getContext(); 
+      storageManager.afterCompleteOperations(new IOAsyncTask()
+      {
+         public void onError(int errorCode, String errorMessage)
+         {
+         }
+         
+         public void done()
+         {
+            OperationContext originalCtx = storageManager.getContext();
+            try
+            {
+               storageManager.setContext(ctx);
+               storageManager.commit(id, false);
+            }
+            catch (Exception e)
+            {
+               onError(HornetQException.IO_ERROR, e.getMessage());
+            }
+            finally
+            {
+               storageManager.setContext(originalCtx);
+            }
+         }
+      });
+      ctx.storeLineUp();
+   }
+
    public void rollback() throws Exception
    {
       synchronized (timeoutLock)
@@ -361,14 +407,14 @@
       this.state = state;
    }
    
-   public boolean isPaging()
+   public boolean isWaitBeforeCommit()
    {
-      return paging;
+      return waitBeforeCommit;
    }
 
-   public void setPaging(boolean paging)
+   public void setWaitBeforeCommit(boolean waitBeforeCommit)
    {
-      this.paging = paging;
+      this.waitBeforeCommit = waitBeforeCommit;
    }
 
    public Xid getXid()

Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -913,6 +913,8 @@
       assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
 
       store = server.getPagingManager().getPageStore(new SimpleString("TT"));
+      
+      conn.close();
 
       server.stop();
 

Added: trunk/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingSyncTest.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.tests.unit.util.InVMContext;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A PagingOrderTest.
+ * 
+ * PagingTest has a lot of tests already. I decided to create a newer one more specialized on Ordering and counters
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PagingSyncTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   private ServerLocator locator;
+
+   public PagingSyncTest(final String name)
+   {
+      super(name);
+   }
+
+   public PagingSyncTest()
+   {
+      super();
+   }
+
+   // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(PagingTest.class);
+
+   private static final int RECEIVE_TIMEOUT = 30000;
+
+   private static final int PAGE_MAX = 100 * 1024;
+
+   private static final int PAGE_SIZE = 10 * 1024;
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      locator = createInVMNonHALocator();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      locator.close();
+
+      super.tearDown();
+   }
+   public void testOrder1() throws Throwable
+   {
+      boolean persistentMessages = true;
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 500;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setClientFailureCheckPeriod(1000);
+         locator.setConnectionTTL(2000);
+         locator.setReconnectAttempts(0);
+
+         locator.setBlockOnNonDurableSend(false);
+         locator.setBlockOnDurableSend(false);
+         locator.setBlockOnAcknowledge(false);
+         locator.setConsumerWindowSize(1024 * 1024);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         Queue queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createMessage(persistentMessages);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+         }
+
+         session.commit();
+
+         session.close();
+      }
+      catch (Throwable e)
+      {
+         e.printStackTrace();
+         throw e;
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -1059,5 +1059,23 @@
          return 0;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean, org.hornetq.core.journal.IOCompletion, boolean)
+       */
+      public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#lineUpContex(org.hornetq.core.journal.IOCompletion)
+       */
+      public void lineUpContex(IOCompletion callback)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -1671,6 +1671,24 @@
          return getContext();
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#commit(long, boolean)
+       */
+      public void commit(long txID, boolean lineUpContext) throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
+       */
+      public void lineUpContext()
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2011-04-03 09:50:09 UTC (rev 10440)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2011-04-04 04:04:09 UTC (rev 10441)
@@ -323,38 +323,38 @@
       }
 
       /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.Transaction#isPaging()
+       * @see org.hornetq.core.transaction.Transaction#isContainsPersistent()
        */
-      public boolean isPaging()
+      public boolean isContainsPersistent()
       {
          // TODO Auto-generated method stub
          return false;
       }
 
       /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.Transaction#setPaging(boolean)
+       * @see org.hornetq.core.transaction.Transaction#getAllOperations()
        */
-      public void setPaging(boolean paging)
+      public List<TransactionOperation> getAllOperations()
       {
-         // TODO Auto-generated method stub
-         
+         return null;
       }
 
       /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.Transaction#isContainsPersistent()
+       * @see org.hornetq.core.transaction.Transaction#isWaitBeforeCommit()
        */
-      public boolean isContainsPersistent()
+      public boolean isWaitBeforeCommit()
       {
          // TODO Auto-generated method stub
          return false;
       }
 
       /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.Transaction#getAllOperations()
+       * @see org.hornetq.core.transaction.Transaction#setWaitBeforeCommit(boolean)
        */
-      public List<TransactionOperation> getAllOperations()
+      public void setWaitBeforeCommit(boolean waitBeforeCommit)
       {
-         return null;
+         // TODO Auto-generated method stub
+         
       }
 
    }



More information about the hornetq-commits mailing list