[hornetq-commits] JBoss hornetq SVN: r8060 - in branches/Replication_Clebert: src/main/org/hornetq/core/replication/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 6 22:30:02 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-06 22:30:02 -0400 (Tue, 06 Oct 2009)
New Revision: 8060

Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java
   branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
Log:
Fixes on replicating ID generation

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java	2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/BatchingIDGenerator.java	2009-10-07 02:30:02 UTC (rev 8060)
@@ -38,8 +38,6 @@
    private static final Logger log = Logger.getLogger(BatchingIDGenerator.class);
 
 
-   public static final byte ID_COUNTER_RECORD = 24;
-
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
@@ -127,7 +125,7 @@
    {
       try
       {
-         journalStorage.appendAddRecord(journalID, ID_COUNTER_RECORD, new IDCounterEncoding(id), true);
+         journalStorage.appendAddRecord(journalID, JournalStorageManager.ID_COUNTER_RECORD, new IDCounterEncoding(id), true);
       }
       catch (Exception e)
       {

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-07 02:30:02 UTC (rev 8060)
@@ -94,6 +94,8 @@
 
    public static final byte PERSISTENT_ID_RECORD = 23;
 
+   public static final byte ID_COUNTER_RECORD = 24;
+
    // type + expiration + timestamp + priority
    public static final int SIZE_FIELDS = SIZE_INT + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
 
@@ -235,9 +237,16 @@
       {
          throw new IllegalArgumentException("Unsupported journal type " + config.getJournalType());
       }
+      
+      if (config.isBackup())
+      {
+         this.idGenerator = null;
+      }
+      else
+      {
+         this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
+      }
 
-      this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
-
       Journal localMessage = new JournalImpl(config.getJournalFileSize(),
                                        config.getJournalMinFiles(),
                                        config.getJournalCompactMinFiles(),
@@ -1087,7 +1096,7 @@
 
             persistentID = encoding.uuid;
          }
-         else if (rec == BatchingIDGenerator.ID_COUNTER_RECORD)
+         else if (rec == ID_COUNTER_RECORD)
          {
             idGenerator.loadState(record.id, buffer);
          }
@@ -1131,7 +1140,10 @@
       }
 
       // Must call close to make sure last id is persisted
-      idGenerator.close();
+      if (idGenerator != null)
+      {
+         idGenerator.close();
+      }
 
       bindingsJournal.stop();
 

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java	2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java	2009-10-07 02:30:02 UTC (rev 8060)
@@ -40,16 +40,20 @@
 
    // Attributes ----------------------------------------------------
 
+   private static final boolean trace = false;
+
    private final ReplicationManager replicationManager;
 
    private final Journal replicatedJournal;
 
    private final byte journalID;
 
-   public ReplicatedJournalImpl(byte journaID, Journal replicatedJournal, ReplicationManager replicationManager)
+   public ReplicatedJournalImpl(final byte journaID,
+                                final Journal replicatedJournal,
+                                final ReplicationManager replicationManager)
    {
       super();
-      this.journalID = journaID;
+      journalID = journaID;
       this.replicatedJournal = replicatedJournal;
       this.replicationManager = replicationManager;
    }
@@ -67,7 +71,7 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[], boolean)
     */
-   public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+   public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
    {
       this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
@@ -80,9 +84,12 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
     */
-   public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+   public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
    {
-      System.out.println("Append record id = " + id + " recordType = " + recordType);
+      if (trace)
+      {
+         System.out.println("Append record id = " + id + " recordType = " + recordType);
+      }
       replicationManager.appendAddRecord(journalID, id, recordType, record);
       replicatedJournal.appendAddRecord(id, recordType, record, sync);
    }
@@ -95,7 +102,7 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, byte[])
     */
-   public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+   public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
    {
       this.appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
    }
@@ -108,9 +115,15 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
     */
-   public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
+   public void appendAddRecordTransactional(final long txID,
+                                            final long id,
+                                            final byte recordType,
+                                            final EncodingSupport record) throws Exception
    {
-      System.out.println("Append record TXid = " + id + " recordType = " + recordType);
+      if (trace)
+      {
+         System.out.println("Append record TXid = " + id + " recordType = " + recordType);
+      }
       replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
       replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
    }
@@ -121,9 +134,12 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean)
     */
-   public void appendCommitRecord(long txID, boolean sync) throws Exception
+   public void appendCommitRecord(final long txID, final boolean sync) throws Exception
    {
-      System.out.println("AppendCommit " + txID);
+      if (trace)
+      {
+         System.out.println("AppendCommit " + txID);
+      }
       replicationManager.appendCommitRecord(journalID, txID);
       replicatedJournal.appendCommitRecord(txID, sync);
    }
@@ -134,12 +150,15 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean)
     */
-   public void appendDeleteRecord(long id, boolean sync) throws Exception
+   public void appendDeleteRecord(final long id, final boolean sync) throws Exception
    {
-      System.out.println("AppendDelete " + id);
+      if (trace)
+      {
+         System.out.println("AppendDelete " + id);
+      }
       replicationManager.appendDeleteRecord(journalID, id);
       replicatedJournal.appendDeleteRecord(id, sync);
-    }
+   }
 
    /**
     * @param txID
@@ -148,7 +167,7 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, byte[])
     */
-   public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception
+   public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
    {
       this.appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
    }
@@ -160,9 +179,12 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, org.hornetq.core.journal.EncodingSupport)
     */
-   public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
+   public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
    {
-      System.out.println("AppendDelete txID=" + txID + " id=" + id);
+      if (trace)
+      {
+         System.out.println("AppendDelete txID=" + txID + " id=" + id);
+      }
       replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
       replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
    }
@@ -173,9 +195,12 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long)
     */
-   public void appendDeleteRecordTransactional(long txID, long id) throws Exception
+   public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
    {
-      System.out.println("AppendDelete (noencoding) txID=" + txID + " id=" + id);
+      if (trace)
+      {
+         System.out.println("AppendDelete (noencoding) txID=" + txID + " id=" + id);
+      }
       replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
       replicatedJournal.appendDeleteRecordTransactional(txID, id);
    }
@@ -187,7 +212,7 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
     */
-   public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
+   public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception
    {
       this.appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
    }
@@ -199,9 +224,12 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, org.hornetq.core.journal.EncodingSupport, boolean)
     */
-   public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception
+   public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
    {
-      System.out.println("AppendPrepare txID=" + txID);
+      if (trace)
+      {
+         System.out.println("AppendPrepare txID=" + txID);
+      }
       replicationManager.appendPrepareRecord(journalID, txID, transactionData);
       replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
    }
@@ -212,9 +240,12 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean)
     */
-   public void appendRollbackRecord(long txID, boolean sync) throws Exception
+   public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
    {
-      System.out.println("AppendRollback " + txID);
+      if (trace)
+      {
+         System.out.println("AppendRollback " + txID);
+      }
       replicationManager.appendRollbackRecord(journalID, txID);
       replicatedJournal.appendRollbackRecord(txID, sync);
    }
@@ -227,7 +258,7 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[], boolean)
     */
-   public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+   public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
    {
       this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
@@ -240,9 +271,12 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
     */
-   public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+   public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
    {
-      System.out.println("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
+      if (trace)
+      {
+         System.out.println("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
+      }
       replicationManager.appendUpdateRecord(journalID, id, recordType, record);
       replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
    }
@@ -255,7 +289,10 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, byte[])
     */
-   public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+   public void appendUpdateRecordTransactional(final long txID,
+                                               final long id,
+                                               final byte recordType,
+                                               final byte[] record) throws Exception
    {
       this.appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
    }
@@ -268,9 +305,15 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
     */
-   public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
+   public void appendUpdateRecordTransactional(final long txID,
+                                               final long id,
+                                               final byte recordType,
+                                               final EncodingSupport record) throws Exception
    {
-      System.out.println("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
+      if (trace)
+      {
+         System.out.println("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
+      }
       replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
       replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
    }
@@ -283,9 +326,9 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List, org.hornetq.core.journal.TransactionFailureCallback)
     */
-   public long load(List<RecordInfo> committedRecords,
-                    List<PreparedTransactionInfo> preparedTransactions,
-                    TransactionFailureCallback transactionFailure) throws Exception
+   public long load(final List<RecordInfo> committedRecords,
+                    final List<PreparedTransactionInfo> preparedTransactions,
+                    final TransactionFailureCallback transactionFailure) throws Exception
    {
       return replicatedJournal.load(committedRecords, preparedTransactions, transactionFailure);
    }
@@ -296,7 +339,7 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
     */
-   public long load(LoaderCallback reloadManager) throws Exception
+   public long load(final LoaderCallback reloadManager) throws Exception
    {
       return replicatedJournal.load(reloadManager);
    }
@@ -306,7 +349,7 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#perfBlast(int)
     */
-   public void perfBlast(int pages) throws Exception
+   public void perfBlast(final int pages) throws Exception
    {
       replicatedJournal.perfBlast(pages);
    }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-07 02:30:02 UTC (rev 8060)
@@ -31,7 +31,6 @@
 import org.hornetq.core.server.HornetQServer;
 
 /**
- * A ReplicationPacketHandler
  *
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  *
@@ -46,6 +45,8 @@
 
    // Attributes ----------------------------------------------------
 
+   private static final boolean trace = false;
+
    private final HornetQServer server;
 
    private Channel channel;
@@ -55,13 +56,9 @@
    private Journal messagingJournal;
 
    private JournalStorageManager storage;
-   
-   private volatile boolean started;
 
-   // Static --------------------------------------------------------
-
    // Constructors --------------------------------------------------
-   public ReplicationEndpointImpl(HornetQServer server)
+   public ReplicationEndpointImpl(final HornetQServer server)
    {
       this.server = server;
    }
@@ -71,7 +68,7 @@
     * (non-Javadoc)
     * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
     */
-   public void handlePacket(Packet packet)
+   public void handlePacket(final Packet packet)
    {
       try
       {
@@ -127,13 +124,11 @@
       storage = new JournalStorageManager(config, null);
       storage.start();
 
-      this.bindingsJournal = storage.getBindingsJournal();
-      this.messagingJournal = storage.getMessageJournal();
+      bindingsJournal = storage.getBindingsJournal();
+      messagingJournal = storage.getMessageJournal();
 
       // We only need to load internal structures on the backup...
       storage.loadInternalOnly();
-
-      started = true;
    }
 
    /* (non-Javadoc)
@@ -141,7 +136,6 @@
     */
    public void stop() throws Exception
    {
-      started = false;
       channel.close();
       storage.stop();
    }
@@ -157,7 +151,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationEndpoint#setChannel(org.hornetq.core.remoting.Channel)
     */
-   public void setChannel(Channel channel)
+   public void setChannel(final Channel channel)
    {
       this.channel = channel;
    }
@@ -171,13 +165,12 @@
    /**
     * @param packet
     */
-   private void handleCommitRollback(Packet packet) throws Exception
+   private void handleCommitRollback(final Packet packet) throws Exception
    {
       ReplicationCommitMessage commitMessage = (ReplicationCommitMessage)packet;
 
       Journal journalToUse = getJournal(commitMessage.getJournalID());
-      
-      
+
       if (commitMessage.isRollback())
       {
          journalToUse.appendRollbackRecord(commitMessage.getTxId(), false);
@@ -191,7 +184,7 @@
    /**
     * @param packet
     */
-   private void handlePrepare(Packet packet) throws Exception
+   private void handlePrepare(final Packet packet) throws Exception
    {
       ReplicationPrepareMessage prepareMessage = (ReplicationPrepareMessage)packet;
 
@@ -203,7 +196,7 @@
    /**
     * @param packet
     */
-   private void handleAppendDeleteTX(Packet packet) throws Exception
+   private void handleAppendDeleteTX(final Packet packet) throws Exception
    {
       ReplicationDeleteTXMessage deleteMessage = (ReplicationDeleteTXMessage)packet;
 
@@ -217,7 +210,7 @@
    /**
     * @param packet
     */
-   private void handleAppendDelete(Packet packet) throws Exception
+   private void handleAppendDelete(final Packet packet) throws Exception
    {
       ReplicationDeleteMessage deleteMessage = (ReplicationDeleteMessage)packet;
 
@@ -229,7 +222,7 @@
    /**
     * @param packet
     */
-   private void handleAppendAddTXRecord(Packet packet) throws Exception
+   private void handleAppendAddTXRecord(final Packet packet) throws Exception
    {
       ReplicationAddTXMessage addMessage = (ReplicationAddTXMessage)packet;
 
@@ -255,7 +248,7 @@
     * @param packet
     * @throws Exception
     */
-   private void handleAppendAddRecord(Packet packet) throws Exception
+   private void handleAppendAddRecord(final Packet packet) throws Exception
    {
       ReplicationAddMessage addMessage = (ReplicationAddMessage)packet;
 
@@ -263,7 +256,10 @@
 
       if (addMessage.isUpdate())
       {
-         System.out.println("Endpoint appendUpdate id = "  + addMessage.getId());
+         if (trace)
+         {
+            System.out.println("Endpoint appendUpdate id = " + addMessage.getId());
+         }
          journalToUse.appendUpdateRecord(addMessage.getId(),
                                          addMessage.getRecordType(),
                                          addMessage.getRecordData(),
@@ -271,7 +267,10 @@
       }
       else
       {
-         System.out.println("Endpoint append id = "  + addMessage.getId());
+         if (trace)
+         {
+            System.out.println("Endpoint append id = " + addMessage.getId());
+         }
          journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
       }
    }
@@ -280,7 +279,7 @@
     * @param journalID
     * @return
     */
-   private Journal getJournal(byte journalID)
+   private Journal getJournal(final byte journalID)
    {
       Journal journalToUse;
       if (journalID == (byte)0)

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java	2009-10-07 02:30:02 UTC (rev 8060)
@@ -126,7 +126,6 @@
    
             sf.setBlockOnNonPersistentSend(true);
             sf.setBlockOnPersistentSend(true);
-            sf.setBlockOnAcknowledge(true);
    
             ClientSession createSession = sf.createSession(true, true);
    
@@ -246,8 +245,6 @@
             {
                break;
             }
-            
-            System.out.println("Thread " + Thread.currentThread().getName() + " received " + message.getMessageID());
 
             // There may be some missing or duplicate messages - but the order should be correct
 
@@ -257,7 +254,6 @@
 
             lastCount = count;
 
-            System.out.println("Client ACK: " + message.getMessageID());
             message.acknowledge();
          }
 

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java	2009-10-06 22:44:02 UTC (rev 8059)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java	2009-10-07 02:30:02 UTC (rev 8060)
@@ -23,6 +23,7 @@
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.persistence.impl.journal.BatchingIDGenerator;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.tests.util.UnitTestCase;
 
@@ -142,7 +143,7 @@
 
       for (RecordInfo record : records)
       {
-         if (record.userRecordType == BatchingIDGenerator.ID_COUNTER_RECORD)
+         if (record.userRecordType == JournalStorageManager.ID_COUNTER_RECORD)
          {
             HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(record.data);
             batch.loadState(record.id, buffer);



More information about the hornetq-commits mailing list