[hornetq-commits] JBoss hornetq SVN: r9706 - trunk/src/main/org/hornetq/core/persistence/impl/journal.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Sep 21 05:52:36 EDT 2010


Author: timfox
Date: 2010-09-21 05:52:36 -0400 (Tue, 21 Sep 2010)
New Revision: 9706

Modified:
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
https://jira.jboss.org/browse/HORNETQ-506

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-09-21 09:31:13 UTC (rev 9705)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-09-21 09:52:36 UTC (rev 9706)
@@ -168,11 +168,10 @@
    private final String journalDir;
 
    private final String largeMessagesDirectory;
-   
-   
+
    // Persisted core configuration
    private final Map<SimpleString, PersistedRoles> mapPersistedRoles = new ConcurrentHashMap<SimpleString, PersistedRoles>();
-   
+
    private final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();
 
    public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory)
@@ -585,7 +584,11 @@
 
    public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
    {
-      messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), JournalStorageManager.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages));
+      messageJournal.appendUpdateRecordTransactional(txID,
+                                                     pageTransaction.getRecordID(),
+                                                     JournalStorageManager.PAGE_TRANSACTION,
+                                                     new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+                                                                              depages));
    }
 
    public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -697,8 +700,7 @@
                                         getContext(syncNonTransactional));
 
    }
-   
-   
+
    public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
    {
       deleteAddressSetting(addressSetting.getAddressMatch());
@@ -707,14 +709,13 @@
       bindingsJournal.appendAddRecord(id, ADDRESS_SETTING_RECORD, addressSetting, true);
       mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting);
    }
-   
+
    public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
    {
       ArrayList<PersistedAddressSetting> list = new ArrayList<PersistedAddressSetting>(mapPersistedAddressSettings.size());
       list.addAll(mapPersistedAddressSettings.values());
       return list;
    }
-   
 
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#recoverPersistedRoles()
@@ -746,9 +747,9 @@
       {
          bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
       }
-      
+
    }
-   
+
    public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
    {
       PersistedRoles oldRoles = mapPersistedRoles.remove(addressMatch);
@@ -758,8 +759,6 @@
       }
    }
 
-
-
    public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
                                                     final PagingManager pagingManager,
                                                     final ResourceManager resourceManager,
@@ -771,7 +770,7 @@
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
 
       Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
-      
+
       JournalLoadInformation info = messageJournal.load(records,
                                                         preparedTransactions,
                                                         new LargeMessageTXFailureCallback(messages));
@@ -781,17 +780,17 @@
       Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
 
       final int totalSize = records.size();
-      
-      for (int reccount = 0 ; reccount < totalSize; reccount++)
+
+      for (int reccount = 0; reccount < totalSize; reccount++)
       {
          // It will show log.info only with large journals (more than 1 million records)
-         if (reccount> 0 && reccount % 1000000 == 0)
+         if (reccount > 0 && reccount % 1000000 == 0)
          {
             long percent = (long)((((double)reccount) / ((double)totalSize)) * 100f);
-            
+
             log.info(percent + "% loaded");
          }
-         
+
          RecordInfo record = records.get(reccount);
          byte[] data = record.data;
 
@@ -885,15 +884,15 @@
 
                if (queueMessages == null)
                {
-                  log.warn("Cannot find queue "  + encoding.queueID + " to update delivery count");
+                  log.warn("Cannot find queue " + encoding.queueID + " to update delivery count");
                }
                else
                {
                   AddMessageRecord rec = queueMessages.get(messageID);
-   
+
                   if (rec == null)
                   {
-                     log.warn("Cannot find message "  + messageID + " to update delivery count");
+                     log.warn("Cannot find message " + messageID + " to update delivery count");
                   }
                   else
                   {
@@ -908,21 +907,21 @@
                if (record.isUpdate)
                {
                   PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
-                  
+
                   pageUpdate.decode(buff);
-                  
+
                   PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
-                  
+
                   pageTX.update(pageUpdate.recods, null, null);
                }
                else
                {
                   PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
-   
+
                   pageTransactionInfo.decode(buff);
-   
+
                   pageTransactionInfo.setRecordID(record.id);
-   
+
                   pagingManager.addTransaction(pageTransactionInfo);
                }
 
@@ -985,13 +984,13 @@
                throw new IllegalStateException("Invalid record type " + recordType);
             }
          }
-         
+
          // This will free up memory sooner. The record is not needed any more
          // and its byte array would consume memory during the load process even though it's not necessary any longer
          // what would delay processing time during load
          records.set(reccount, null);
       }
-      
+
       // Release the memory as soon as not needed any longer
       records.clear();
       records = null;
@@ -1003,7 +1002,14 @@
          Map<Long, AddMessageRecord> queueRecords = entry.getValue();
 
          Queue queue = queues.get(queueID);
-         
+
+         if (queue == null)
+         {
+            log.warn("Message for queue " + queueID + " which does not exist. This message will be ignored.");
+
+            continue;
+         }
+
          Collection<AddMessageRecord> valueRecords = queueRecords.values();
 
          for (AddMessageRecord record : valueRecords)
@@ -1037,7 +1043,7 @@
             msg.decrementDelayDeletionCount();
          }
       }
-      
+
       for (ServerMessage msg : messages.values())
       {
          if (msg.getRefCount() == 0)
@@ -1053,7 +1059,7 @@
             }
          }
       }
-      
+
       if (perfBlastPages != -1)
       {
          messageJournal.perfBlast(perfBlastPages);
@@ -1418,18 +1424,22 @@
 
                   if (queue == null)
                   {
-                     throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+                     log.warn("Message in prepared tx for queue " + encoding.queueID +
+                              " which does not exist. This message will be ignored.");
+
                   }
+                  else
+                  {
+                     ServerMessage message = messages.get(messageID);
 
-                  ServerMessage message = messages.get(messageID);
+                     if (message == null)
+                     {
+                        throw new IllegalStateException("Cannot find message with id " + messageID);
+                     }
 
-                  if (message == null)
-                  {
-                     throw new IllegalStateException("Cannot find message with id " + messageID);
+                     postOffice.reroute(message, queue, tx);
                   }
 
-                  postOffice.reroute(message, queue, tx);
-
                   break;
                }
                case ACKNOWLEDGE_REF:
@@ -1446,7 +1456,7 @@
                   {
                      throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
                   }
-                  
+
                   // TODO - this involves a scan - we should find a quicker way of doing it
                   MessageReference removed = queue.removeReferenceWithID(messageID);
 
@@ -2017,24 +2027,24 @@
          super(queueID);
       }
    }
-   
+
    private static class PageUpdateTXEncoding implements EncodingSupport
    {
-      
+
       public long pageTX;
-      
+
       public int recods;
-      
+
       public PageUpdateTXEncoding()
       {
       }
-      
+
       public PageUpdateTXEncoding(final long pageTX, final int records)
       {
          this.pageTX = pageTX;
          this.recods = records;
       }
-      
+
       public void decode(HornetQBuffer buffer)
       {
          this.pageTX = buffer.readLong();
@@ -2057,7 +2067,7 @@
       {
          return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
       }
-      
+
    }
 
    private static class ScheduledDeliveryEncoding extends QueueEncoding
@@ -2182,6 +2192,7 @@
       }
 
    }
+
    private static final class AddMessageRecord
    {
       public AddMessageRecord(final ServerMessage message)
@@ -2194,7 +2205,7 @@
       long scheduledDeliveryTime;
 
       int deliveryCount;
-      
+
       boolean referenced = false;
    }
 



More information about the hornetq-commits mailing list