[hornetq-commits] JBoss hornetq SVN: r8533 - in trunk: tests/src/org/hornetq/tests/integration/cluster/reattach and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 3 11:42:03 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-03 11:42:03 -0500 (Thu, 03 Dec 2009)
New Revision: 8533

Modified:
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
Log:
no code changes.. just clean up

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-12-03 16:26:48 UTC (rev 8532)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-12-03 16:42:03 UTC (rev 8533)
@@ -20,8 +20,6 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -177,7 +175,7 @@
    {
       this.executorFactory = executorFactory;
 
-      this.executor = executorFactory.getExecutor();
+      executor = executorFactory.getExecutor();
 
       this.replicator = replicator;
 
@@ -210,11 +208,11 @@
 
       if (replicator != null)
       {
-         this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
+         bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
       }
       else
       {
-         this.bindingsJournal = localBindings;
+         bindingsJournal = localBindings;
       }
 
       if (journalDir == null)
@@ -264,11 +262,11 @@
 
       if (config.isBackup())
       {
-         this.idGenerator = null;
+         idGenerator = null;
       }
       else
       {
-         this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
+         idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
       }
 
       Journal localMessage = new JournalImpl(config.getJournalFileSize(),
@@ -283,11 +281,11 @@
 
       if (replicator != null)
       {
-         this.messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
+         messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
       }
       else
       {
-         this.messageJournal = localMessage;
+         messageJournal = localMessage;
       }
 
       largeMessagesDirectory = config.getLargeMessagesDirectory();
@@ -338,14 +336,12 @@
       }
    }
 
-   // TODO: shouldn't those page methods be on the PageManager?
-
    /*
     *
     * (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#pageClosed(org.hornetq.utils.SimpleString, int)
     */
-   public void pageClosed(SimpleString storeName, int pageNumber)
+   public void pageClosed(final SimpleString storeName, final int pageNumber)
    {
       if (isReplicated())
       {
@@ -356,7 +352,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#pageDeleted(org.hornetq.utils.SimpleString, int)
     */
-   public void pageDeleted(SimpleString storeName, int pageNumber)
+   public void pageDeleted(final SimpleString storeName, final int pageNumber)
    {
       if (isReplicated())
       {
@@ -367,7 +363,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString, int, org.hornetq.core.buffers.ChannelBuffer)
     */
-   public void pageWrite(PagedMessage message, int pageNumber)
+   public void pageWrite(final PagedMessage message, final int pageNumber)
    {
       if (isReplicated())
       {
@@ -375,8 +371,6 @@
       }
    }
 
-   // TODO: shouldn't those page methods be on the PageManager? ^^^^
-
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#getContext()
     */
@@ -385,7 +379,7 @@
       return OperationContextImpl.getContext(executorFactory);
    }
 
-   public void setContext(OperationContext context)
+   public void setContext(final OperationContext context)
    {
       OperationContextImpl.setContext(context);
    }
@@ -393,12 +387,12 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#newContext()
     */
-   public OperationContext newContext(Executor executor)
+   public OperationContext newContext(final Executor executor)
    {
       return new OperationContextImpl(executor);
    }
 
-   public void afterCompleteOperations(IOAsyncTask run)
+   public void afterCompleteOperations(final IOAsyncTask run)
    {
       getContext().executeOnCompletion(run);
    }
@@ -408,7 +402,7 @@
       return persistentID;
    }
 
-   public void setPersistentID(UUID id) throws Exception
+   public void setPersistentID(final UUID id) throws Exception
    {
       long recordID = generateUniqueID();
 
@@ -417,7 +411,7 @@
          bindingsJournal.appendAddRecord(recordID, PERSISTENT_ID_RECORD, new PersistentIDEncoding(id), true);
       }
 
-      this.persistentID = id;
+      persistentID = id;
    }
 
    public long generateUniqueID()
@@ -437,7 +431,7 @@
       return new LargeServerMessageImpl(this);
    }
 
-   public void addBytesToLargeMessage(SequentialFile file, long messageId, final byte[] bytes) throws Exception
+   public void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes) throws Exception
    {
       file.position(file.size());
 
@@ -445,11 +439,11 @@
 
       if (isReplicated())
       {
-         this.replicator.largeMessageWrite(messageId, bytes);
+         replicator.largeMessageWrite(messageId, bytes);
       }
    }
 
-   public LargeServerMessage createLargeMessage(long id, byte[] header)
+   public LargeServerMessage createLargeMessage(final long id, final byte[] header)
    {
       if (isReplicated())
       {
@@ -471,9 +465,9 @@
 
    public void storeMessage(final ServerMessage message) throws Exception
    {
-      // TODO - how can this be less than zero?
       if (message.getMessageID() <= 0)
       {
+         // Sanity check only... this shouldn't happen unless there is a bug
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
       }
 
@@ -540,7 +534,7 @@
                                      getContext(syncNonTransactional));
    }
 
-   public void deleteDuplicateID(long recordID) throws Exception
+   public void deleteDuplicateID(final long recordID) throws Exception
    {
       messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
    }
@@ -595,7 +589,7 @@
       messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID));
    }
 
-   public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
+   public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
    {
       long id = generateUniqueID();
 
@@ -607,7 +601,7 @@
       return id;
    }
 
-   public void deleteHeuristicCompletion(long id) throws Exception
+   public void deleteHeuristicCompletion(final long id) throws Exception
    {
       messageJournal.appendDeleteRecord(id, true, getContext(true));
    }
@@ -668,7 +662,7 @@
       messageJournal.appendUpdateRecordTransactional(txID, recordID, DUPLICATE_ID, encoding);
    }
 
-   public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception
+   public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception
    {
       messageJournal.appendDeleteRecordTransactional(txID, recordID);
    }
@@ -704,13 +698,15 @@
    {
       private final Map<Long, ServerMessage> messages;
 
-      public LargeMessageTXFailureCallback(Map<Long, ServerMessage> messages)
+      public LargeMessageTXFailureCallback(final Map<Long, ServerMessage> messages)
       {
          super();
          this.messages = messages;
       }
 
-      public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+      public void failedTransaction(final long transactionID,
+                                    final List<RecordInfo> records,
+                                    final List<RecordInfo> recordsToDelete)
       {
          for (RecordInfo record : records)
          {
@@ -986,240 +982,8 @@
 
       return info;
    }
-
-   /**
-    * @param messages
-    * @param buff
-    * @return
-    * @throws Exception
-    */
-   private LargeServerMessage parseLargeMessage(Map<Long, ServerMessage> messages, HornetQBuffer buff) throws Exception
-   {
-      LargeServerMessage largeMessage = createLargeMessage();
-
-      LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
-
-      messageEncoding.decode(buff);
-
-      if (largeMessage.getProperties().containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
-      {
-         long originalMessageID = largeMessage.getProperties().getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
-
-         LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
-
-         if (originalMessage == null)
-         {
-            // this could happen if the message was deleted but the file still exists as the file still being used
-            originalMessage = createLargeMessage();
-            originalMessage.setDurable(true);
-            originalMessage.setMessageID(originalMessageID);
-            messages.put(originalMessageID, originalMessage);
-         }
-
-         originalMessage.incrementDelayDeletionCount();
-
-         largeMessage.setLinkedMessage(originalMessage);
-      }
-      return largeMessage;
-   }
-
-   private void loadPreparedTransactions(final PostOffice postOffice,
-                                         final PagingManager pagingManager,
-                                         final ResourceManager resourceManager,
-                                         final Map<Long, Queue> queues,
-                                         final List<PreparedTransactionInfo> preparedTransactions,
-                                         final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
-   {
-      // recover prepared transactions
-      for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
-      {
-         XidEncoding encodingXid = new XidEncoding(preparedTransaction.extraData);
-
-         Xid xid = encodingXid.xid;
-
-         Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this);
-
-         List<MessageReference> referencesToAck = new ArrayList<MessageReference>();
-
-         Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
-
-         // Use same method as load message journal to prune out acks, so they don't get added.
-         // Then have reacknowledge(tx) methods on queue, which needs to add the page size
-
-         // first get any sent messages for this tx and recreate
-         for (RecordInfo record : preparedTransaction.records)
-         {
-            byte[] data = record.data;
-
-            HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
-
-            byte recordType = record.getUserRecordType();
-
-            switch (recordType)
-            {
-               case ADD_LARGE_MESSAGE:
-               {
-                  messages.put(record.id, parseLargeMessage(messages, buff));
-
-                  break;
-               }
-               case ADD_MESSAGE:
-               {
-                  ServerMessage message = new ServerMessageImpl(record.id, 50);
-
-                  message.decode(buff);
-
-                  messages.put(record.id, message);
-
-                  break;
-               }
-               case ADD_REF:
-               {
-
-                  long messageID = record.id;
-
-                  RefEncoding encoding = new RefEncoding();
-
-                  encoding.decode(buff);
-
-                  Queue queue = queues.get(encoding.queueID);
-
-                  if (queue == null)
-                  {
-                     throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
-                  }
-
-                  ServerMessage message = messages.get(messageID);
-
-                  if (message == null)
-                  {
-                     throw new IllegalStateException("Cannot find message with id " + messageID);
-                  }
-
-                  postOffice.reroute(message, queue, tx);
-
-                  break;
-               }
-               case ACKNOWLEDGE_REF:
-               {
-                  long messageID = record.id;
-
-                  RefEncoding encoding = new RefEncoding();
-
-                  encoding.decode(buff);
-
-                  Queue queue = queues.get(encoding.queueID);
-
-                  if (queue == null)
-                  {
-                     throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
-                  }
-
-                  // TODO - this involves a scan - we should find a quicker qay of doing it
-                  MessageReference removed = queue.removeReferenceWithID(messageID);
-
-                  referencesToAck.add(removed);
-
-                  if (removed == null)
-                  {
-                     throw new IllegalStateException("Failed to remove reference for " + messageID);
-                  }
-
-                  break;
-               }
-               case PAGE_TRANSACTION:
-               {
-                  PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl();
-
-                  pageTransactionInfo.decode(buff);
-
-                  pageTransactionInfo.markIncomplete();
-
-                  tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
-
-                  pagingManager.addTransaction(pageTransactionInfo);
-
-                  tx.addOperation(new FinishPageMessageOperation());
-
-                  break;
-               }
-               case SET_SCHEDULED_DELIVERY_TIME:
-               {
-                  // Do nothing - for prepared txs, the set scheduled delivery time will only occur in a send in which
-                  // case the message will already have the header for the scheduled delivery time, so no need to do
-                  // anything.
-
-                  break;
-               }
-               case DUPLICATE_ID:
-               {
-                  // We need load the duplicate ids at prepare time too
-                  DuplicateIDEncoding encoding = new DuplicateIDEncoding();
-
-                  encoding.decode(buff);
-
-                  List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);
-
-                  if (ids == null)
-                  {
-                     ids = new ArrayList<Pair<byte[], Long>>();
-
-                     duplicateIDMap.put(encoding.address, ids);
-                  }
-
-                  ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
-
-                  break;
-               }
-               default:
-               {
-                  log.warn("InternalError: Record type " + recordType +
-                           " not recognized. Maybe you're using journal files created on a different version");
-               }
-            }
-         }
-
-         for (RecordInfo record : preparedTransaction.recordsToDelete)
-         {
-            byte[] data = record.data;
-
-            HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
-
-            long messageID = record.id;
-
-            DeleteEncoding encoding = new DeleteEncoding();
-
-            encoding.decode(buff);
-
-            Queue queue = queues.get(encoding.queueID);
-
-            if (queue == null)
-            {
-               throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
-            }
-
-            MessageReference removed = queue.removeReferenceWithID(messageID);
-
-            if (removed != null)
-            {
-               referencesToAck.add(removed);
-            }
-
-         }
-
-         for (MessageReference ack : referencesToAck)
-         {
-            ack.getQueue().reacknowledge(tx, ack);
-         }
-
-         tx.setState(Transaction.State.PREPARED);
-
-         resourceManager.putTransaction(xid, tx);
-      }
-   }
-
    // grouping handler operations
-   public void addGrouping(GroupBinding groupBinding) throws Exception
+   public void addGrouping(final GroupBinding groupBinding) throws Exception
    {
       GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
                                                                groupBinding.getGroupId(),
@@ -1227,7 +991,7 @@
       bindingsJournal.appendAddRecord(groupBinding.getId(), GROUP_RECORD, groupingEncoding, true);
    }
 
-   public void deleteGrouping(GroupBinding groupBinding) throws Exception
+   public void deleteGrouping(final GroupBinding groupBinding) throws Exception
    {
       bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
    }
@@ -1453,15 +1217,249 @@
          }
       }
    }
+   
 
    /**
+    * @param messages
+    * @param buff
+    * @return
     * @throws Exception
     */
+   private LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages, final HornetQBuffer buff) throws Exception
+   {
+      LargeServerMessage largeMessage = createLargeMessage();
+
+      LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
+
+      messageEncoding.decode(buff);
+
+      if (largeMessage.getProperties().containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
+      {
+         long originalMessageID = largeMessage.getProperties().getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+
+         LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
+
+         if (originalMessage == null)
+         {
+            // this could happen if the message was deleted but the file still exists as the file still being used
+            originalMessage = createLargeMessage();
+            originalMessage.setDurable(true);
+            originalMessage.setMessageID(originalMessageID);
+            messages.put(originalMessageID, originalMessage);
+         }
+
+         originalMessage.incrementDelayDeletionCount();
+
+         largeMessage.setLinkedMessage(originalMessage);
+      }
+      return largeMessage;
+   }
+
+   private void loadPreparedTransactions(final PostOffice postOffice,
+                                         final PagingManager pagingManager,
+                                         final ResourceManager resourceManager,
+                                         final Map<Long, Queue> queues,
+                                         final List<PreparedTransactionInfo> preparedTransactions,
+                                         final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+   {
+      // recover prepared transactions
+      for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
+      {
+         XidEncoding encodingXid = new XidEncoding(preparedTransaction.extraData);
+
+         Xid xid = encodingXid.xid;
+
+         Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this);
+
+         List<MessageReference> referencesToAck = new ArrayList<MessageReference>();
+
+         Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
+
+         // Use same method as load message journal to prune out acks, so they don't get added.
+         // Then have reacknowledge(tx) methods on queue, which needs to add the page size
+
+         // first get any sent messages for this tx and recreate
+         for (RecordInfo record : preparedTransaction.records)
+         {
+            byte[] data = record.data;
+
+            HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
+            byte recordType = record.getUserRecordType();
+
+            switch (recordType)
+            {
+               case ADD_LARGE_MESSAGE:
+               {
+                  messages.put(record.id, parseLargeMessage(messages, buff));
+
+                  break;
+               }
+               case ADD_MESSAGE:
+               {
+                  ServerMessage message = new ServerMessageImpl(record.id, 50);
+
+                  message.decode(buff);
+
+                  messages.put(record.id, message);
+
+                  break;
+               }
+               case ADD_REF:
+               {
+
+                  long messageID = record.id;
+
+                  RefEncoding encoding = new RefEncoding();
+
+                  encoding.decode(buff);
+
+                  Queue queue = queues.get(encoding.queueID);
+
+                  if (queue == null)
+                  {
+                     throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+                  }
+
+                  ServerMessage message = messages.get(messageID);
+
+                  if (message == null)
+                  {
+                     throw new IllegalStateException("Cannot find message with id " + messageID);
+                  }
+
+                  postOffice.reroute(message, queue, tx);
+
+                  break;
+               }
+               case ACKNOWLEDGE_REF:
+               {
+                  long messageID = record.id;
+
+                  RefEncoding encoding = new RefEncoding();
+
+                  encoding.decode(buff);
+
+                  Queue queue = queues.get(encoding.queueID);
+
+                  if (queue == null)
+                  {
+                     throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+                  }
+
+                  // TODO - this involves a scan - we should find a quicker qay of doing it
+                  MessageReference removed = queue.removeReferenceWithID(messageID);
+
+                  referencesToAck.add(removed);
+
+                  if (removed == null)
+                  {
+                     throw new IllegalStateException("Failed to remove reference for " + messageID);
+                  }
+
+                  break;
+               }
+               case PAGE_TRANSACTION:
+               {
+                  PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl();
+
+                  pageTransactionInfo.decode(buff);
+
+                  pageTransactionInfo.markIncomplete();
+
+                  tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
+
+                  pagingManager.addTransaction(pageTransactionInfo);
+
+                  tx.addOperation(new FinishPageMessageOperation());
+
+                  break;
+               }
+               case SET_SCHEDULED_DELIVERY_TIME:
+               {
+                  // Do nothing - for prepared txs, the set scheduled delivery time will only occur in a send in which
+                  // case the message will already have the header for the scheduled delivery time, so no need to do
+                  // anything.
+
+                  break;
+               }
+               case DUPLICATE_ID:
+               {
+                  // We need load the duplicate ids at prepare time too
+                  DuplicateIDEncoding encoding = new DuplicateIDEncoding();
+
+                  encoding.decode(buff);
+
+                  List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);
+
+                  if (ids == null)
+                  {
+                     ids = new ArrayList<Pair<byte[], Long>>();
+
+                     duplicateIDMap.put(encoding.address, ids);
+                  }
+
+                  ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
+
+                  break;
+               }
+               default:
+               {
+                  log.warn("InternalError: Record type " + recordType +
+                           " not recognized. Maybe you're using journal files created on a different version");
+               }
+            }
+         }
+
+         for (RecordInfo record : preparedTransaction.recordsToDelete)
+         {
+            byte[] data = record.data;
+
+            HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
+            long messageID = record.id;
+
+            DeleteEncoding encoding = new DeleteEncoding();
+
+            encoding.decode(buff);
+
+            Queue queue = queues.get(encoding.queueID);
+
+            if (queue == null)
+            {
+               throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+            }
+
+            MessageReference removed = queue.removeReferenceWithID(messageID);
+
+            if (removed != null)
+            {
+               referencesToAck.add(removed);
+            }
+
+         }
+
+         for (MessageReference ack : referencesToAck)
+         {
+            ack.getQueue().reacknowledge(tx, ack);
+         }
+
+         tx.setState(Transaction.State.PREPARED);
+
+         resourceManager.putTransaction(xid, tx);
+      }
+   }
+
+   
+
+   /**
+    * @throws Exception
+    */
    private void cleanupIncompleteFiles() throws Exception
    {
       if (largeMessagesFactory != null)
       {
-         List<String> tmpFiles = this.largeMessagesFactory.listFiles("tmp");
+         List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
          for (String tmpFile : tmpFiles)
          {
             SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
@@ -1505,7 +1503,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.OperationContext#executeOnCompletion(org.hornetq.core.journal.IOAsyncTask)
        */
-      public void executeOnCompletion(IOAsyncTask runnable)
+      public void executeOnCompletion(final IOAsyncTask runnable)
       {
       }
 
@@ -1540,7 +1538,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
        */
-      public void onError(int errorCode, String errorMessage)
+      public void onError(final int errorCode, final String errorMessage)
       {
       }
 
@@ -1618,7 +1616,7 @@
 
       SimpleString clusterName;
 
-      public GroupingEncoding(long id, SimpleString groupId, SimpleString clusterName)
+      public GroupingEncoding(final long id, final SimpleString groupId, final SimpleString clusterName)
       {
          this.id = id;
          this.groupId = groupId;
@@ -1634,13 +1632,13 @@
          return SimpleString.sizeofString(groupId) + SimpleString.sizeofString(clusterName);
       }
 
-      public void encode(HornetQBuffer buffer)
+      public void encode(final HornetQBuffer buffer)
       {
          buffer.writeSimpleString(groupId);
          buffer.writeSimpleString(clusterName);
       }
 
-      public void decode(HornetQBuffer buffer)
+      public void decode(final HornetQBuffer buffer)
       {
          groupId = buffer.readSimpleString();
          clusterName = buffer.readSimpleString();
@@ -1651,7 +1649,7 @@
          return id;
       }
 
-      public void setId(long id)
+      public void setId(final long id)
       {
          this.id = id;
       }
@@ -1780,7 +1778,7 @@
    {
       private final LargeServerMessage message;
 
-      public LargeMessageEncoding(LargeServerMessage message)
+      public LargeMessageEncoding(final LargeServerMessage message)
       {
          this.message = message;
       }
@@ -1908,7 +1906,7 @@
    {
       long scheduledDeliveryTime;
 
-      private ScheduledDeliveryEncoding(long scheduledDeliveryTime, long queueID)
+      private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID)
       {
          super(queueID);
          this.scheduledDeliveryTime = scheduledDeliveryTime;
@@ -1918,18 +1916,21 @@
       {
       }
 
+      @Override
       public int getEncodeSize()
       {
          return super.getEncodeSize() + 8;
       }
 
-      public void encode(HornetQBuffer buffer)
+      @Override
+      public void encode(final HornetQBuffer buffer)
       {
          super.encode(buffer);
          buffer.writeLong(scheduledDeliveryTime);
       }
 
-      public void decode(HornetQBuffer buffer)
+      @Override
+      public void decode(final HornetQBuffer buffer)
       {
          super.decode(buffer);
          scheduledDeliveryTime = buffer.readLong();
@@ -2022,14 +2023,6 @@
       {
       }
 
-      /* (non-Javadoc)
-       * @see org.hornetq.core.transaction.TransactionOperation#getDistinctQueues()
-       */
-      public Collection<Queue> getDistinctQueues()
-      {
-         return Collections.emptySet();
-      }
-
    }
 
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java	2009-12-03 16:26:48 UTC (rev 8532)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java	2009-12-03 16:42:03 UTC (rev 8533)
@@ -186,6 +186,7 @@
 
       Thread t = new Thread()
       {
+         @Override
          public void run()
          {
             try
@@ -256,12 +257,12 @@
       {
          volatile boolean failed;
 
-         public void connectionFailed(HornetQException me)
+         public void connectionFailed(final HornetQException me)
          {
             failed = true;
          }
 
-         public void beforeReconnect(HornetQException exception)
+         public void beforeReconnect(final HornetQException exception)
          {
          }
       }
@@ -299,6 +300,7 @@
 
       Thread t = new Thread()
       {
+         @Override
          public void run()
          {
             try
@@ -382,7 +384,7 @@
          producer.send(message);
       }
 
-      ClientConsumer consumer = session.createConsumer(ADDRESS);
+      session.createConsumer(ADDRESS);
 
       InVMConnector.failOnCreateConnection = true;
 
@@ -392,6 +394,7 @@
 
       Thread t = new Thread()
       {
+         @Override
          public void run()
          {
             try
@@ -435,7 +438,7 @@
 
       Timer timer = new Timer();
       ClientSession session = null;
-      
+
       try
       {
 
@@ -446,21 +449,16 @@
          final int reconnectAttempts = -1;
 
          final ClientSessionFactoryInternal sf = createFactory(false);
-         
 
-
          sf.setRetryInterval(retryInterval);
          sf.setRetryIntervalMultiplier(retryMultiplier);
          sf.setReconnectAttempts(reconnectAttempts);
          sf.setConfirmationWindowSize(1024 * 1024);
-         
-         
+
          session = sf.createSession();
 
          final RemotingConnection connFailure = ((ClientSessionInternal)session).getConnection();
-         
 
-
          int numberOfThreads = 100;
          final int numberOfSessionsToCreate = 10;
 
@@ -471,24 +469,25 @@
          {
             Throwable failure;
 
+            @Override
             public void run()
             {
                try
                {
                   alignLatch.countDown();
                   startFlag.await();
-                  for (int i = 0 ; i < numberOfSessionsToCreate; i++)
+                  for (int i = 0; i < numberOfSessionsToCreate; i++)
                   {
                      Thread.yield();
                      ClientSession session = sf.createSession(false, true, true);
-   
+
                      session.close();
                   }
                }
                catch (Throwable e)
                {
                   e.printStackTrace();
-                  this.failure = e;
+                  failure = e;
                }
             }
          }
@@ -500,8 +499,6 @@
             threads[i].start();
          }
 
-         // Sleep 3 times retryInterval, so it should at least have 3 retries
-
          alignLatch.await();
 
          timer.schedule(new TimerTask()
@@ -518,9 +515,9 @@
                   log.warn("Error on the timer " + e);
                }
             }
-            
+
          }, 10, 10);
-         
+
          startFlag.countDown();
 
          Throwable failure = null;
@@ -546,7 +543,7 @@
       finally
       {
          timer.cancel();
-         
+
          if (session != null)
          {
             session.close();
@@ -580,6 +577,7 @@
       {
          Throwable failure;
 
+         @Override
          public void run()
          {
             try
@@ -593,7 +591,7 @@
             catch (Throwable e)
             {
                e.printStackTrace();
-               this.failure = e;
+               failure = e;
             }
          }
       }
@@ -609,6 +607,7 @@
 
       Thread t = new Thread()
       {
+         @Override
          public void run()
          {
             try
@@ -678,6 +677,7 @@
 
       Thread t = new Thread()
       {
+         @Override
          public void run()
          {
             try
@@ -836,6 +836,7 @@
 
       Thread t = new Thread()
       {
+         @Override
          public void run()
          {
             try
@@ -874,7 +875,7 @@
 
       long end = System.currentTimeMillis();
 
-      assertTrue((end - start) >= retryInterval);
+      assertTrue(end - start >= retryInterval);
 
       session.close();
 
@@ -952,7 +953,7 @@
 
       double wait = retryInterval + retryMultiplier * retryInterval + retryMultiplier * retryMultiplier * retryInterval;
 
-      assertTrue((end - start) >= wait);
+      assertTrue(end - start >= wait);
 
       session.close();
 
@@ -1031,9 +1032,9 @@
 
       double wait = retryInterval + retryMultiplier * 2 * retryInterval + retryMultiplier;
 
-      assertTrue((end - start) >= wait);
+      assertTrue(end - start >= wait);
 
-      assertTrue((end - start) < wait + 500);
+      assertTrue(end - start < wait + 500);
 
       session.close();
 



More information about the hornetq-commits mailing list