[hornetq-commits] JBoss hornetq SVN: r12238 - branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Mar 5 11:25:24 EST 2012


Author: jbertram
Date: 2012-03-05 11:25:22 -0500 (Mon, 05 Mar 2012)
New Revision: 12238

Added:
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XMLDataWriter.java
Removed:
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrintDataAsXML.java
Modified:
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
[HORNETQ-787] refactoring all XML output logic into separate class

Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2012-03-05 11:19:23 UTC (rev 12237)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2012-03-05 16:25:22 UTC (rev 12238)
@@ -13,12 +13,52 @@
 
 package org.hornetq.core.persistence.impl.journal;
 
-import org.hornetq.api.core.*;
+import java.io.File;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.filter.Filter;
-import org.hornetq.core.journal.*;
-import org.hornetq.core.journal.impl.*;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCriticalErrorListener;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.JournalReaderCallback;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.paging.PageTransactionInfo;
@@ -42,8 +82,11 @@
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.replication.impl.ReplicatedJournal;
-import org.hornetq.core.server.*;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.transaction.ResourceManager;
@@ -52,27 +95,20 @@
 import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.utils.*;
+import org.hornetq.utils.Base64;
+import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.XidCodecSupport;
 
-import javax.transaction.xa.Xid;
-import javax.xml.stream.XMLOutputFactory;
-import javax.xml.stream.XMLStreamWriter;
-import java.io.File;
-import java.io.PrintStream;
-import java.lang.reflect.Proxy;
-import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.LinkedList;
-import java.util.concurrent.*;
-
 /**
+ *
  * A JournalStorageManager
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ *
  */
 public class JournalStorageManager implements StorageManager
 {
@@ -139,9 +175,7 @@
 
    private volatile boolean started;
 
-   /**
-    * Used to create Operation Contexts
-    */
+   /** Used to create Operation Contexts */
    private final ExecutorFactory executorFactory;
 
    private final Executor executor;
@@ -223,7 +257,7 @@
 
       if (replicator != null)
       {
-         bindingsJournal = new ReplicatedJournal((byte) 0, localBindings, replicator);
+         bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
       }
       else
       {
@@ -286,7 +320,7 @@
 
       if (replicator != null)
       {
-         messageJournal = new ReplicatedJournal((byte) 1, localMessage, replicator);
+         messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
       }
       else
       {
@@ -449,7 +483,7 @@
          replicator.largeMessageBegin(id);
       }
 
-      LargeServerMessageImpl largeMessage = (LargeServerMessageImpl) createLargeMessage();
+      LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
 
       largeMessage.copyHeadersAndProperties(message);
 
@@ -489,9 +523,7 @@
             new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
    }
 
-   /**
-    * We don't need messageID now but we are likely to need it we ever decide to support a database
-    */
+   /** We don't need messageID now but we are likely to need it we ever decide to support a database */
    public void confirmPendingLargeMessage(long recordID) throws Exception
    {
       messageJournal.appendDeleteRecord(recordID, true, getContext());
@@ -511,7 +543,7 @@
       {
          messageJournal.appendAddRecord(message.getMessageID(),
                JournalStorageManager.ADD_LARGE_MESSAGE,
-               new LargeMessageEncoding((LargeServerMessage) message),
+               new LargeMessageEncoding((LargeServerMessage)message),
                false,
                getContext(false));
       }
@@ -606,7 +638,7 @@
          messageJournal.appendAddRecordTransactional(txID,
                message.getMessageID(),
                JournalStorageManager.ADD_LARGE_MESSAGE,
-               new LargeMessageEncoding(((LargeServerMessage) message)));
+               new LargeMessageEncoding(((LargeServerMessage)message)));
       }
       else
       {
@@ -889,7 +921,7 @@
          // It will show log.info only with large journals (more than 1 million records)
          if (reccount > 0 && reccount % 1000000 == 0)
          {
-            long percent = (long) ((((double) reccount) / ((double) totalSize)) * 100f);
+            long percent = (long)((((double)reccount) / ((double)totalSize)) * 100f);
 
             log.info(percent + "% loaded");
          }
@@ -1352,7 +1384,7 @@
 
    public void addQueueBinding(final long tx, final Binding binding) throws Exception
    {
-      Queue queue = (Queue) binding.getBindable();
+      Queue queue = (Queue)binding.getBindable();
 
       Filter filter = queue.getFilter();
 
@@ -1546,7 +1578,7 @@
       started = true;
    }
 
-   public void stop() throws Exception
+   public  void stop() throws Exception
    {
       stop(false);
    }
@@ -2138,9 +2170,7 @@
 
    }
 
-   /**
-    * It's public as other classes may want to unparse data on tools
-    */
+   /** It's public as other classes may want to unparse data on tools*/
    public static class XidEncoding implements EncodingSupport
    {
       public final Xid xid;
@@ -2756,7 +2786,7 @@
          // transaction until all the messages were added to the queue
          // or else we could deliver the messages out of order
 
-         PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+         PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
 
          if (pageTransaction != null)
          {
@@ -2770,7 +2800,7 @@
 
       public void afterRollback(final Transaction tx)
       {
-         PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+         PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
 
          if (tx.getState() == State.PREPARED && pageTransaction != null)
          {
@@ -3034,6 +3064,11 @@
 
    public static Object newObjectEncoding(RecordInfo info)
    {
+      return newObjectEncoding(info, null);
+   }
+
+   public static Object newObjectEncoding(RecordInfo info, JournalStorageManager storageManager)
+   {
       HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(info.data);
       long id = info.id;
       int rec = info.getUserRecordType();
@@ -3050,7 +3085,7 @@
          case ADD_LARGE_MESSAGE:
          {
 
-            LargeServerMessage largeMessage = new LargeServerMessageImpl(null);
+            LargeServerMessage largeMessage = new LargeServerMessageImpl(storageManager);
 
             LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
 
@@ -3238,7 +3273,7 @@
             Object value = msg.getObjectProperty(prop);
             if (value instanceof byte[])
             {
-               buffer.append(prop + "=" + Arrays.toString((byte[]) value) + ",");
+               buffer.append(prop + "=" + Arrays.toString((byte[])value) + ",");
 
             }
             else
@@ -3433,7 +3468,7 @@
          }
          else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
          {
-            ReferenceDescribe ref = (ReferenceDescribe) o;
+            ReferenceDescribe ref = (ReferenceDescribe)o;
             Integer count = messageRefCounts.get(ref.refEncoding.queueID);
             if (count == null)
             {
@@ -3447,7 +3482,7 @@
          }
          else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
          {
-            AckDescribe ref = (AckDescribe) o;
+            AckDescribe ref = (AckDescribe)o;
             Integer count = messageRefCounts.get(ref.refEncoding.queueID);
             if (count == null)
             {
@@ -3477,7 +3512,7 @@
             }
             else if (info.getUserRecordType() == 32)
             {
-               ReferenceDescribe ref = (ReferenceDescribe) o;
+               ReferenceDescribe ref = (ReferenceDescribe)o;
                Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID);
                if (count == null)
                {
@@ -3525,272 +3560,9 @@
       journal.stop();
    }
 
-   /**
-    * @param bindingsDir
-    * @param messagesDir
-    * @throws Exception
-    */
-   public static void describeJournalAsXML(String bindingsDir, String messagesDir) throws Exception
-   {
-      // map of message refs hashed by the queue ID to which they belong and then hashed by their ID
-      Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs = new HashMap<Long, HashMap<Long, ReferenceDescribe>>();
-
-      // map of all message records hashed by their ID (which will match the ID of the message refs)
-      HashMap<Long, Message> messages = new HashMap<Long, Message>();
-
-      processMessageRecords(messagesDir, messageRefs, messages);
-      HashMap<Long, PersistentQueueBindingEncoding> bindings = getBindings(bindingsDir);
-
-      printXML(bindings, messageRefs, messages);
-   }
-
-   private static void processMessageRecords(String messagesDir, Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs, HashMap<Long, Message> messages) throws Exception
-   {
-      ArrayList<RecordInfo> acks = new ArrayList<RecordInfo>();
-
-      JournalImpl journal = openMessageJournal(messagesDir);
-
-      List<RecordInfo> records = new LinkedList<RecordInfo>();
-
-      journal.start();
-
-      journal.load(records, null, null, false);
-
-      for (RecordInfo info : records)
-      {
-         Object o = newObjectEncoding(info);
-         if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
-         {
-            messages.put(info.id, ((MessageDescribe) o).msg);
-         }
-         else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
-         {
-            ReferenceDescribe ref = (ReferenceDescribe) o;
-            HashMap<Long, ReferenceDescribe> map = messageRefs.get(info.id);
-            if (map == null)
-            {
-               HashMap<Long, ReferenceDescribe> newMap = new HashMap<Long, ReferenceDescribe>();
-               newMap.put(ref.refEncoding.queueID, ref);
-               messageRefs.put(info.id, newMap);
-            }
-            else
-            {
-               map.put(ref.refEncoding.queueID, ref);
-            }
-         }
-         else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
-         {
-            acks.add(info);
-         }
-      }
-
-      journal.stop();
-
-      removeAcked(messageRefs, messages, acks);
-   }
-
-   private static void removeAcked(Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs, HashMap<Long, Message> messages, ArrayList<RecordInfo> acks)
-   {
-      for (RecordInfo info : acks)
-      {
-         AckDescribe ack = (AckDescribe) newObjectEncoding(info);
-         HashMap<Long, ReferenceDescribe> referenceDescribeHashMap = messageRefs.get(info.id);
-         referenceDescribeHashMap.remove(ack.refEncoding.queueID);
-         if (referenceDescribeHashMap.size() == 0)
-         {
-            messages.remove(info.id);
-            messageRefs.remove(info.id);
-         }
-      }
-   }
-
-   private static void printXML(HashMap<Long, PersistentQueueBindingEncoding> queueBindings, Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs, HashMap<Long, Message> messages)
-   {
-      try
-      {
-         XMLOutputFactory factory = XMLOutputFactory.newInstance();
-         XMLStreamWriter writer = factory.createXMLStreamWriter(System.out);
-         PrettyPrintHandler handler = new PrettyPrintHandler(writer);
-         XMLStreamWriter prettyWriter = (XMLStreamWriter) Proxy.newProxyInstance(
-               XMLStreamWriter.class.getClassLoader(),
-               new Class[]{XMLStreamWriter.class},
-               handler);
-
-         prettyWriter.writeStartDocument("1.0");
-         prettyWriter.writeStartElement("hornetq-journal");
-         prettyWriter.writeStartElement("bindings");
-         for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet())
-         {
-            PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
-            prettyWriter.writeEmptyElement("binding");
-            prettyWriter.writeAttribute("address", bindingEncoding.getAddress().toString());
-            String filter = "";
-            if (bindingEncoding.getFilterString() != null)
-            {
-               filter = bindingEncoding.getFilterString().toString();
-            }
-            prettyWriter.writeAttribute("filter-string", filter);
-            prettyWriter.writeAttribute("queue-name", bindingEncoding.getQueueName().toString());
-            prettyWriter.writeAttribute("id", new Long(bindingEncoding.getId()).toString());
-         }
-
-         prettyWriter.writeEndElement(); // end "bindings"
-         prettyWriter.flush();
-         prettyWriter.writeStartElement("messages");
-
-         for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet())
-         {
-            Message message = messageMapEntry.getValue();
-            prettyWriter.writeStartElement("message");
-            prettyWriter.writeAttribute("id", Long.toString(message.getMessageID()));
-            prettyWriter.writeAttribute("priority", Byte.toString(message.getPriority()));
-            prettyWriter.writeAttribute("expiration", Long.toString(message.getExpiration()));
-            prettyWriter.writeAttribute("timestamp", Long.toString(message.getTimestamp()));
-            String type = "default";
-            if (message.getType() == Message.BYTES_TYPE)
-            {
-               type = "bytes";
-            }
-            else if (message.getType() == Message.MAP_TYPE)
-            {
-               type = "map";
-            }
-            else if (message.getType() == Message.OBJECT_TYPE)
-            {
-               type = "object";
-            }
-            else if (message.getType() == Message.STREAM_TYPE)
-            {
-               type = "stream";
-            }
-            else if (message.getType() == Message.TEXT_TYPE)
-            {
-               type = "text";
-            }
-            prettyWriter.writeAttribute("type", type);
-            prettyWriter.writeAttribute("user-id", message.getUserID().toString());
-            prettyWriter.writeStartElement("body");
-            prettyWriter.writeCharacters(encode(message.getBodyBuffer().toByteBuffer().array()));
-            prettyWriter.writeEndElement();
-            prettyWriter.writeStartElement("properties");
-            for (SimpleString key : message.getPropertyNames())
-            {
-               Object value = message.getObjectProperty(key);
-               prettyWriter.writeEmptyElement("property");
-               prettyWriter.writeAttribute("name", key.toString());
-               prettyWriter.writeAttribute("value", value.toString());
-               if (value instanceof Boolean)
-               {
-                  prettyWriter.writeAttribute("type", "boolean");
-               }
-               else if (value instanceof Byte)
-               {
-                  prettyWriter.writeAttribute("type", "byte");
-               }
-               else if (value instanceof Short)
-               {
-                  prettyWriter.writeAttribute("type", "short");
-               }
-               else if (value instanceof Integer)
-               {
-                  prettyWriter.writeAttribute("type", "integer");
-               }
-               else if (value instanceof Long)
-               {
-                  prettyWriter.writeAttribute("type", "long");
-               }
-               else if (value instanceof Float)
-               {
-                  prettyWriter.writeAttribute("type", "float");
-               }
-               else if (value instanceof Double)
-               {
-                  prettyWriter.writeAttribute("type", "double");
-               }
-               else if (value instanceof String)
-               {
-                  prettyWriter.writeAttribute("type", "string");
-               }
-               else if (value instanceof SimpleString)
-               {
-                  prettyWriter.writeAttribute("type", "simple-string");
-               }
-            }
-            prettyWriter.writeEndElement(); // end "properties"
-            prettyWriter.writeStartElement("queues");
-            HashMap<Long, ReferenceDescribe> refMap = messageRefs.get(messageMapEntry.getKey());
-            for (Map.Entry<Long, ReferenceDescribe> refMapEntry : refMap.entrySet())
-            {
-               prettyWriter.writeEmptyElement("queue");
-               prettyWriter.writeAttribute("name", queueBindings.get(refMapEntry.getValue().refEncoding.queueID).getQueueName().toString());
-            }
-
-            prettyWriter.writeEndElement(); // end "queues"
-            prettyWriter.writeEndElement(); // end "message"
-         }
-         prettyWriter.writeEndElement(); // end "messages"
-         prettyWriter.writeEndElement(); // end "hornetq-journal"
-         prettyWriter.flush();
-         prettyWriter.close();
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-   }
-
-   private static JournalImpl openMessageJournal(String messagesDir)
-   {
-      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(messagesDir, null);
-
-      // Will use only default values. The load function should adapt to anything different
-      ConfigurationImpl defaultValues = new ConfigurationImpl();
-
-      return new JournalImpl(defaultValues.getJournalFileSize(),
-            defaultValues.getJournalMinFiles(),
-            0,
-            0,
-            messagesFF,
-            "hornetq-data",
-            "hq",
-            1);
-   }
-
-   private static HashMap<Long, PersistentQueueBindingEncoding> getBindings(String bindingsDir) throws Exception
-   {
-      JournalImpl journal = openBindingsJournal(bindingsDir);
-
-      List<RecordInfo> records = new LinkedList<RecordInfo>();
-
-      journal.start();
-
-      journal.load(records, null, null, false);
-
-      HashMap<Long, PersistentQueueBindingEncoding> queueBindings = new HashMap<Long, PersistentQueueBindingEncoding>();
-
-      for (RecordInfo info : records)
-      {
-         if (info.getUserRecordType() == JournalStorageManager.QUEUE_BINDING_RECORD)
-         {
-            PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) newObjectEncoding(info);
-            queueBindings.put(bindingEncoding.getId(), bindingEncoding);
-         }
-      }
-
-      journal.stop();
-      return queueBindings;
-   }
-
-   private static JournalImpl openBindingsJournal(String bindingsDir)
-   {
-      SequentialFileFactory fileFactory = new NIOSequentialFileFactory(bindingsDir, null);
-
-      return new JournalImpl(1024 * 1024, 2, -1, 0, fileFactory, "hornetq-bindings", "bindings", 1);
-   }
-
    private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
    {
-      TXLargeMessageConfirmationOperation txoper = (TXLargeMessageConfirmationOperation) tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
+      TXLargeMessageConfirmationOperation txoper = (TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
       if (txoper == null)
       {
          txoper = new TXLargeMessageConfirmationOperation();
@@ -3805,43 +3577,43 @@
       public List<Long> confirmedMessages = new LinkedList<Long>();
 
       /* (non-Javadoc)
-      * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
-      */
+       * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+       */
       public void beforePrepare(Transaction tx) throws Exception
       {
       }
 
       /* (non-Javadoc)
-      * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
-      */
+       * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
+       */
       public void afterPrepare(Transaction tx)
       {
       }
 
       /* (non-Javadoc)
-      * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
-      */
+       * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+       */
       public void beforeCommit(Transaction tx) throws Exception
       {
       }
 
       /* (non-Javadoc)
-      * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
-      */
+       * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+       */
       public void afterCommit(Transaction tx)
       {
       }
 
       /* (non-Javadoc)
-      * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
-      */
+       * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
+       */
       public void beforeRollback(Transaction tx) throws Exception
       {
       }
 
       /* (non-Javadoc)
-      * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
-      */
+       * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
+       */
       public void afterRollback(Transaction tx)
       {
          for (Long msg : confirmedMessages)
@@ -3861,8 +3633,8 @@
       }
 
       /* (non-Javadoc)
-      * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
-      */
+       * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
+       */
       public List<MessageReference> getRelatedMessageReferences()
       {
          return null;
@@ -3870,4 +3642,4 @@
 
    }
 
-}
+}
\ No newline at end of file

Deleted: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrintDataAsXML.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrintDataAsXML.java	2012-03-05 11:19:23 UTC (rev 12237)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/PrintDataAsXML.java	2012-03-05 16:25:22 UTC (rev 12238)
@@ -1,69 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.persistence.impl.journal;
-
-import javax.xml.stream.XMLOutputFactory;
-import javax.xml.stream.XMLStreamWriter;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A PrintData
- *
- * @author clebertsuconic
- */
-public class PrintDataAsXML
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-
-   public static void main(String arg[])
-   {
-      if (arg.length != 2)
-      {
-         System.out.println("Use: java -cp hornetq-core.jar <bindings directory> <message directory>");
-         System.exit(-1);
-      }
-
-      try
-      {
-         JournalStorageManager.describeJournalAsXML(arg[0], arg[1]);
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Added: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XMLDataWriter.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XMLDataWriter.java	                        (rev 0)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XMLDataWriter.java	2012-03-05 16:25:22 UTC (rev 12238)
@@ -0,0 +1,659 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.message.BodyEncoder;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
+import org.hornetq.utils.Base64;
+import org.hornetq.utils.ExecutorFactory;
+
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.PersistentQueueBindingEncoding;
+import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.ReferenceDescribe;
+import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.MessageDescribe;
+import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
+import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
+import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
+
+/**
+ * @author <a href="mailto:jbertram at redhat.com">Justin Bertram</a>
+ */
+public class XMLDataWriter
+{
+   public static final String XML_VERSION = "1.0";
+   public static final String DOCUMENT_PARENT = "hornetq-journal";
+   public static final String BINDINGS_PARENT = "bindings";
+   public static final String BINDINGS_CHILD = "binding";
+   public static final String BINDING_ADDRESS = "address";
+   public static final String BINDING_FILTER_STRING = "filter-string";
+   public static final String BINDING_QUEUE_NAME = "queue-name";
+   public static final String BINDING_ID = "id";
+   public static final String MESSAGES_PARENT = "messages";
+   public static final String MESSAGES_CHILD = "message";
+   public static final String MESSAGE_ID = "id";
+   public static final String MESSAGE_PRIORITY = "priority";
+   public static final String MESSAGE_EXPIRATION = "expiration";
+   public static final String MESSAGE_TIMESTAMP = "timestamp";
+   public static final String DEFAULT_TYPE_PRETTY = "default";
+   public static final String BYTES_TYPE_PRETTY = "bytes";
+   public static final String MAP_TYPE_PRETTY = "map";
+   public static final String OBJECT_TYPE_PRETTY = "object";
+   public static final String STREAM_TYPE_PRETTY = "stream";
+   public static final String TEXT_TYPE_PRETTY = "text";
+   public static final String MESSAGE_TYPE = "type";
+   public static final String MESSAGE_USER_ID = "user-id";
+   public static final String MESSAGE_BODY = "body";
+   public static final String PROPERTIES_PARENT = "properties";
+   public static final String PROPERTIES_CHILD = "property";
+   public static final String PROPERTY_NAME = "name";
+   public static final String PROPERTY_VALUE = "value";
+   public static final String PROPERTY_TYPE = "type";
+   public static final String QUEUES_PARENT = "queues";
+   public static final String QUEUES_CHILD = "queue";
+   public static final String QUEUE_NAME = "name";
+   public static final String PROPERTY_TYPE_BOOLEAN = "boolean";
+   public static final String PROPERTY_TYPE_BYTE = "byte";
+   public static final String PROPERTY_TYPE_SHORT = "short";
+   public static final String PROPERTY_TYPE_INTEGER = "integer";
+   public static final String PROPERTY_TYPE_LONG = "long";
+   public static final String PROPERTY_TYPE_FLOAT = "float";
+   public static final String PROPERTY_TYPE_DOUBLE = "double";
+   public static final String PROPERTY_TYPE_STRING = "string";
+   public static final String PROPERTY_TYPE_SIMPLE_STRING = "simple-string";
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private JournalStorageManager storageManager;
+
+   private Configuration config;
+
+   private XMLStreamWriter xmlWriter;
+
+   // an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
+   private Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs;
+
+   // map of all message records hashed by their record ID (which will match the record ID of the message refs)
+   private HashMap<Long, Message> messages;
+
+   private Map<Long, Set<PagePosition>> cursorRecords;
+
+   private Set<Long> pgTXs;
+
+   HashMap<Long, PersistentQueueBindingEncoding> queueBindings;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public XMLDataWriter(String bindingsDir, String journalDir, String pagingDir, String largeMessagesDir)
+   {
+      config = new ConfigurationImpl();
+      config.setBindingsDirectory(bindingsDir);
+      config.setJournalDirectory(journalDir);
+      config.setPagingDirectory(pagingDir);
+      config.setLargeMessagesDirectory(largeMessagesDir);
+      config.setJournalType(JournalType.NIO);
+      final ExecutorService executor = Executors.newFixedThreadPool(1);
+      ExecutorFactory executorFactory = new ExecutorFactory()
+      {
+         public Executor getExecutor()
+         {
+            return executor;
+         }
+      };
+      
+      storageManager = new JournalStorageManager(config, executorFactory);
+
+      messageRefs = new HashMap<Long, HashMap<Long, ReferenceDescribe>>();
+
+      messages = new HashMap<Long, Message>();
+
+      cursorRecords = new HashMap<Long, Set<PagePosition>>();
+
+      pgTXs = new HashSet<Long>();
+
+      queueBindings = new HashMap<Long, PersistentQueueBindingEncoding>();
+
+      try
+      {
+         XMLOutputFactory factory = XMLOutputFactory.newInstance();
+         XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(System.out);
+         PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
+         xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(
+               XMLStreamWriter.class.getClassLoader(),
+               new Class[]{XMLStreamWriter.class},
+               handler);
+      } catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   // Public --------------------------------------------------------
+
+   public static void main(String arg[])
+   {
+      if (arg.length < 4)
+      {
+         System.out.println("Use: java -cp hornetq-core.jar <bindings directory> <message directory> <page directory> <large-message directory>");
+         System.exit(-1);
+      }
+
+      try
+      {         
+         XMLDataWriter xmlDataWriter = new XMLDataWriter(arg[0], arg[1], arg[2], arg[3]);
+         xmlDataWriter.getBindings();
+         xmlDataWriter.processMessageJournal();
+         xmlDataWriter.printDataAsXML();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private void processMessageJournal() throws Exception
+   {
+      ArrayList<RecordInfo> acks = new ArrayList<RecordInfo>();
+
+      List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+      Journal messageJournal = storageManager.getMessageJournal();
+
+      messageJournal.start();
+
+      ((JournalImpl)messageJournal).load(records, null, null, false);
+
+      for (RecordInfo info : records)
+      {
+         byte[] data = info.data;
+
+         HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
+         Object o = JournalStorageManager.newObjectEncoding(info, storageManager);
+         if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
+         {
+            messages.put(info.id, ((MessageDescribe) o).msg);
+         }
+         else if (info.getUserRecordType() == JournalStorageManager.ADD_LARGE_MESSAGE)
+         {
+            messages.put(info.id, ((MessageDescribe) o).msg);
+         }
+         else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+         {
+            ReferenceDescribe ref = (ReferenceDescribe) o;
+            HashMap<Long, ReferenceDescribe> map = messageRefs.get(info.id);
+            if (map == null)
+            {
+               HashMap<Long, ReferenceDescribe> newMap = new HashMap<Long, ReferenceDescribe>();
+               newMap.put(ref.refEncoding.queueID, ref);
+               messageRefs.put(info.id, newMap);
+            }
+            else
+            {
+               map.put(ref.refEncoding.queueID, ref);
+            }
+         }
+         else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
+         {
+            acks.add(info);
+         }
+         else if (info.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
+         {
+            CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+            encoding.decode(buff);
+
+            Set<PagePosition> set = cursorRecords.get(encoding.queueID);
+
+            if (set == null)
+            {
+               set = new HashSet<PagePosition>();
+               cursorRecords.put(encoding.queueID, set);
+            }
+
+            set.add(encoding.position);
+         }
+         else if (info.userRecordType == JournalStorageManager.PAGE_TRANSACTION)
+         {
+            if (info.isUpdate)
+            {
+               PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+
+               pageUpdate.decode(buff);
+               pgTXs.add(pageUpdate.pageTX);
+            }
+            else
+            {
+               PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+               pageTransactionInfo.decode(buff);
+
+               pageTransactionInfo.setRecordID(info.id);
+               pgTXs.add(pageTransactionInfo.getTransactionID());
+            }
+         }
+      }
+
+      messageJournal.stop();
+
+      removeAcked(acks);
+   }
+
+   private void removeAcked(ArrayList<RecordInfo> acks)
+   {
+      for (RecordInfo info : acks)
+      {
+         AckDescribe ack = (AckDescribe) JournalStorageManager.newObjectEncoding(info, null);
+         HashMap<Long, ReferenceDescribe> referenceDescribeHashMap = messageRefs.get(info.id);
+         referenceDescribeHashMap.remove(ack.refEncoding.queueID);
+         if (referenceDescribeHashMap.size() == 0)
+         {
+            messages.remove(info.id);
+            messageRefs.remove(info.id);
+         }
+      }
+   }
+
+   private void getBindings() throws Exception
+   {
+      List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+      Journal bindingsJournal = storageManager.getBindingsJournal();
+
+      bindingsJournal.start();
+
+      ((JournalImpl)bindingsJournal).load(records, null, null, false);
+
+      for (RecordInfo info : records)
+      {
+         if (info.getUserRecordType() == JournalStorageManager.QUEUE_BINDING_RECORD)
+         {
+            PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) JournalStorageManager.newObjectEncoding(info, null);
+            queueBindings.put(bindingEncoding.getId(), bindingEncoding);
+         }
+      }
+
+      bindingsJournal.stop();
+   }
+
+   private void printDataAsXML()
+   {
+      try
+      {
+         xmlWriter.writeStartDocument(XML_VERSION);
+         xmlWriter.writeStartElement(DOCUMENT_PARENT);
+         printBindingsAsXML();
+         printAllMessagesAsXML();
+         xmlWriter.writeEndElement(); // end DOCUMENT_PARENT
+         xmlWriter.writeEndDocument();
+         xmlWriter.flush();
+         xmlWriter.close();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   private void printBindingsAsXML() throws XMLStreamException
+   {
+      xmlWriter.writeStartElement(BINDINGS_PARENT);
+      for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet())
+      {
+         PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
+         xmlWriter.writeEmptyElement(BINDINGS_CHILD);
+         xmlWriter.writeAttribute(BINDING_ADDRESS, bindingEncoding.getAddress().toString());
+         String filter = "";
+         if (bindingEncoding.getFilterString() != null)
+         {
+            filter = bindingEncoding.getFilterString().toString();
+         }
+         xmlWriter.writeAttribute(BINDING_FILTER_STRING, filter);
+         xmlWriter.writeAttribute(BINDING_QUEUE_NAME, bindingEncoding.getQueueName().toString());
+         xmlWriter.writeAttribute(BINDING_ID, Long.toString(bindingEncoding.getId()));
+      }
+      xmlWriter.writeEndElement(); // end BINDINGS_PARENT
+   }
+
+   private void printAllMessagesAsXML() throws XMLStreamException
+   {
+      xmlWriter.writeStartElement(MESSAGES_PARENT);
+
+      for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet())
+      {
+         printSingleMessageAsXML((ServerMessage) messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
+      }
+
+      printPagedMessagesAsXML();
+
+      xmlWriter.writeEndElement(); // end "messages"
+   }
+
+   private void printPagedMessagesAsXML()
+   {
+      try
+      {
+         ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+         final ExecutorService executor = Executors.newFixedThreadPool(10);
+         ExecutorFactory execfactory = new ExecutorFactory()
+         {
+            public Executor getExecutor()
+            {
+               return executor;
+            }
+         };
+         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(config.getPagingDirectory(), 1000l, scheduled, execfactory, false, null);
+         HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
+         addressSettingsRepository.setDefault(new AddressSettings());
+         StorageManager sm = new NullStorageManager();
+         PagingManager manager = new PagingManagerImpl(pageStoreFactory, sm, addressSettingsRepository);
+
+         manager.start();
+
+         SimpleString stores[] = manager.getStoreNames();
+
+         for (SimpleString store : stores)
+         {
+            PagingStore pageStore = manager.getPageStore(store);
+
+            int pageId = (int) pageStore.getFirstPage();
+            for (int i = 0; i < pageStore.getNumberOfPages(); i++)
+            {
+               Page page = pageStore.createPage(pageId);
+               page.open();
+               List<PagedMessage> messages = page.read(sm);
+               page.close();
+
+               int messageId = 0;
+
+               for (PagedMessage message : messages)
+               {
+                  message.initMessage(sm);
+                  long queueIDs[] = message.getQueueIDs();
+                  List<String> queueNames = new ArrayList<String>();
+                  for (long queueID : queueIDs)
+                  {
+                     PagePosition posCheck = new PagePositionImpl(pageId, messageId);
+
+                     boolean acked = false;
+
+                     Set<PagePosition> positions = cursorRecords.get(queueID);
+                     if (positions != null)
+                     {
+                        acked = positions.contains(posCheck);
+                     }
+
+                     if (!acked)
+                     {
+                        queueNames.add(queueBindings.get(queueID).getQueueName().toString());
+                     }
+                  }
+
+                  if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID())))
+                  {
+                     printSingleMessageAsXML(message.getMessage(), queueNames);
+                  }
+
+                  messageId++;
+               }
+
+               pageId++;
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   private void printSingleMessageAsXML(ServerMessage message, List<String> queues) throws XMLStreamException
+   {
+      xmlWriter.writeStartElement(MESSAGES_CHILD);
+      xmlWriter.writeAttribute(MESSAGE_ID, Long.toString(message.getMessageID()));
+      xmlWriter.writeAttribute(MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
+      xmlWriter.writeAttribute(MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
+      xmlWriter.writeAttribute(MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
+      byte rawType = message.getType();
+      String prettyType = DEFAULT_TYPE_PRETTY;
+      if (rawType == Message.BYTES_TYPE)
+      {
+         prettyType = BYTES_TYPE_PRETTY;
+      }
+      else if (rawType == Message.MAP_TYPE)
+      {
+         prettyType = MAP_TYPE_PRETTY;
+      }
+      else if (rawType == Message.OBJECT_TYPE)
+      {
+         prettyType = OBJECT_TYPE_PRETTY;
+      }
+      else if (rawType == Message.STREAM_TYPE)
+      {
+         prettyType = STREAM_TYPE_PRETTY;
+      }
+      else if (rawType == Message.TEXT_TYPE)
+      {
+         prettyType = TEXT_TYPE_PRETTY;
+      }
+      xmlWriter.writeAttribute(MESSAGE_TYPE, prettyType);
+      xmlWriter.writeAttribute(MESSAGE_USER_ID, message.getUserID().toString());
+      xmlWriter.writeStartElement(MESSAGE_BODY);
+      if (message.isLargeMessage())
+      {
+         LargeServerMessage largeMessage = (LargeServerMessage) message;
+         final int CHUNK = 1000;
+
+         try
+         {
+            BodyEncoder encoder = largeMessage.getBodyEncoder();
+            encoder.open();
+            for (long i = 0; i < encoder.getLargeBodySize(); i += CHUNK)
+            {
+               HornetQBuffer buffer = HornetQBuffers.fixedBuffer(CHUNK);
+               encoder.encode(buffer, CHUNK);
+               xmlWriter.writeCharacters(encode(buffer.toByteBuffer().array()));
+            }
+         }
+         catch (HornetQException e)
+         {
+            e.printStackTrace();
+         }
+      }
+      else
+      {
+         xmlWriter.writeCharacters(encode(message.getBodyBuffer().toByteBuffer().array()));
+      }
+      xmlWriter.writeEndElement();
+      xmlWriter.writeStartElement(PROPERTIES_PARENT);
+      for (SimpleString key : message.getPropertyNames())
+      {
+         Object value = message.getObjectProperty(key);
+         xmlWriter.writeEmptyElement(PROPERTIES_CHILD);
+         xmlWriter.writeAttribute(PROPERTY_NAME, key.toString());
+         xmlWriter.writeAttribute(PROPERTY_VALUE, value.toString());
+         if (value instanceof Boolean)
+         {
+            xmlWriter.writeAttribute(PROPERTY_TYPE, PROPERTY_TYPE_BOOLEAN);
+         }
+         else if (value instanceof Byte)
+         {
+            xmlWriter.writeAttribute(PROPERTY_TYPE, PROPERTY_TYPE_BYTE);
+         }
+         else if (value instanceof Short)
+         {
+            xmlWriter.writeAttribute(PROPERTY_TYPE, PROPERTY_TYPE_SHORT);
+         }
+         else if (value instanceof Integer)
+         {
+            xmlWriter.writeAttribute(PROPERTY_TYPE, PROPERTY_TYPE_INTEGER);
+         }
+         else if (value instanceof Long)
+         {
+            xmlWriter.writeAttribute(PROPERTY_TYPE, PROPERTY_TYPE_LONG);
+         }
+         else if (value instanceof Float)
+         {
+            xmlWriter.writeAttribute(PROPERTY_TYPE, PROPERTY_TYPE_FLOAT);
+         }
+         else if (value instanceof Double)
+         {
+            xmlWriter.writeAttribute(PROPERTY_TYPE, PROPERTY_TYPE_DOUBLE);
+         }
+         else if (value instanceof String)
+         {
+            xmlWriter.writeAttribute(PROPERTY_TYPE, PROPERTY_TYPE_STRING);
+         }
+         else if (value instanceof SimpleString)
+         {
+            xmlWriter.writeAttribute(PROPERTY_TYPE, PROPERTY_TYPE_SIMPLE_STRING);
+         }
+      }
+      xmlWriter.writeEndElement(); // end "properties"
+      xmlWriter.writeStartElement(QUEUES_PARENT);
+      for (String queueName : queues)
+      {
+         xmlWriter.writeEmptyElement(QUEUES_CHILD);
+         xmlWriter.writeAttribute(QUEUE_NAME, queueName);
+      }
+      xmlWriter.writeEndElement(); // end "queues"
+      xmlWriter.writeEndElement(); // end "message"
+   }
+
+   private static String encode(final byte[] data)
+   {
+      return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+   }
+
+   private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap)
+   {
+      List<String> queues = new ArrayList<String>();
+      for (ReferenceDescribe ref : refMap.values())
+      {
+         queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
+      }
+      return queues;
+   }
+
+   // Inner classes -------------------------------------------------
+
+   class PrettyPrintHandler implements InvocationHandler
+   {
+      private XMLStreamWriter target;
+
+      private int depth = 0;
+
+      private static final char INDENT_CHAR = ' ';
+
+      private static final String LINEFEED_CHAR = "\n";
+
+
+      public PrettyPrintHandler(XMLStreamWriter target)
+      {
+         this.target = target;
+      }
+
+      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+      {
+         String m = method.getName();
+
+         if ("writeStartElement".equals(m))
+         {
+            target.writeCharacters(LINEFEED_CHAR);
+            target.writeCharacters(indent(depth, INDENT_CHAR));
+
+            depth++;
+         }
+         else if ("writeEndElement".equals(m))
+         {
+            depth--;
+
+            target.writeCharacters(LINEFEED_CHAR);
+            target.writeCharacters(indent(depth, INDENT_CHAR));
+         }
+         else if ("writeEmptyElement".equals(m) || "writeCharacters".equals(m))
+         {
+            target.writeCharacters(LINEFEED_CHAR);
+            target.writeCharacters(indent(depth, INDENT_CHAR));
+         }
+
+         method.invoke(target, args);
+
+         return null;
+      }
+
+      private String indent(int d, char s)
+      {
+         d *= 3; // level of indentation
+         char[] output = new char[d];
+         while (d-- > 0)
+         {
+            output[d] = INDENT_CHAR;
+         }
+         return new String(output);
+      }
+   }
+}
\ No newline at end of file



More information about the hornetq-commits mailing list