[hornetq-commits] JBoss hornetq SVN: r12288 - in branches/Branch_2_2_EAP_HORNETQ-787: tests/src/org/hornetq/tests/integration/persistence and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Mar 12 15:34:47 EDT 2012


Author: jbertram
Date: 2012-03-12 15:34:46 -0400 (Mon, 12 Mar 2012)
New Revision: 12288

Added:
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataExporter.java
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataImporter.java
Removed:
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
Modified:
   branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
   branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
Log:
[HORNETQ-787] added transactional functionality to send every message atomically, added comments

Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java	2012-03-12 15:10:18 UTC (rev 12287)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataConstants.java	2012-03-12 19:34:46 UTC (rev 12288)
@@ -14,7 +14,8 @@
 package org.hornetq.core.persistence.impl.journal;
 
 /**
- * The constants shared
+ * The constants shared by <code>org.hornetq.core.persistence.impl.journal.XmlDataImporter</code> and
+ * <code>org.hornetq.core.persistence.impl.journal.XmlDataExporter</code>.
  *
  * @author Justin Bertram
  */

Copied: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataExporter.java (from rev 12286, branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java)
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataExporter.java	                        (rev 0)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataExporter.java	2012-03-12 19:34:46 UTC (rev 12288)
@@ -0,0 +1,744 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.BodyEncoder;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
+import org.hornetq.utils.Base64;
+import org.hornetq.utils.ExecutorFactory;
+
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.*;
+
+/**
+ * Read the journal, page, and large-message data from a stopped instance of HornetQ and save it in an XML format to
+ * a file.  It uses the StAX <code>javax.xml.stream.XMLStreamWriter</code> for speed and simplicity.  Output can be
+ * read by <code>org.hornetq.core.persistence.impl.journal.XmlDataImporter</code>.
+ *
+ * @author Justin Bertram
+ */
+public class XmlDataExporter
+{
+   // Constants -----------------------------------------------------
+
+   public static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
+
+   // Attributes ----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(XmlDataExporter.class);
+
+   private JournalStorageManager storageManager;
+
+   private Configuration config;
+
+   private XMLStreamWriter xmlWriter;
+
+   // an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
+   private Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs;
+
+   // map of all message records hashed by their record ID (which will match the record ID of the message refs)
+   private HashMap<Long, Message> messages;
+
+   private Map<Long, Set<PagePosition>> cursorRecords;
+
+   private Set<Long> pgTXs;
+
+   HashMap<Long, PersistentQueueBindingEncoding> queueBindings;
+
+   long messagesPrinted = 0L;
+
+   long bindingsPrinted = 0L;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public XmlDataExporter(OutputStream out, String bindingsDir, String journalDir, String pagingDir, String largeMessagesDir)
+   {
+      config = new ConfigurationImpl();
+      config.setBindingsDirectory(bindingsDir);
+      config.setJournalDirectory(journalDir);
+      config.setPagingDirectory(pagingDir);
+      config.setLargeMessagesDirectory(largeMessagesDir);
+      config.setJournalType(JournalType.NIO);
+      final ExecutorService executor = Executors.newFixedThreadPool(1);
+      ExecutorFactory executorFactory = new ExecutorFactory()
+      {
+         public Executor getExecutor()
+         {
+            return executor;
+         }
+      };
+
+      storageManager = new JournalStorageManager(config, executorFactory);
+
+      messageRefs = new HashMap<Long, HashMap<Long, ReferenceDescribe>>();
+
+      messages = new HashMap<Long, Message>();
+
+      cursorRecords = new HashMap<Long, Set<PagePosition>>();
+
+      pgTXs = new HashSet<Long>();
+
+      queueBindings = new HashMap<Long, PersistentQueueBindingEncoding>();
+
+      try
+      {
+         XMLOutputFactory factory = XMLOutputFactory.newInstance();
+         XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out);
+         PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
+         xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(
+               XMLStreamWriter.class.getClassLoader(),
+               new Class[]{XMLStreamWriter.class},
+               handler);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   // Public --------------------------------------------------------
+
+   public static void main(String arg[])
+   {
+      if (arg.length < 4)
+      {
+         System.out.println("Use: java -cp hornetq-core.jar <bindings directory> <message directory> <page directory> <large-message directory>");
+         System.exit(-1);
+      }
+
+      try
+      {
+         XmlDataExporter xmlDataExporter = new XmlDataExporter(System.out, arg[0], arg[1], arg[2], arg[3]);
+         xmlDataExporter.writeXMLData();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   public void writeXMLData() throws Exception
+   {
+      long start = System.currentTimeMillis();
+      getBindings();
+      processMessageJournal();
+      printDataAsXML();
+      log.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms");
+      log.debug("Output " + messagesPrinted + " messages and " + bindingsPrinted + " bindings.");
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   /**
+    * Read through the message journal and stuff all the events/data we care about into local data structures.  We'll
+    * use this data later to print all the right information.
+    *
+    * @throws Exception will be thrown if anything goes wrong reading the journal
+    */
+   private void processMessageJournal() throws Exception
+   {
+      ArrayList<RecordInfo> acks = new ArrayList<RecordInfo>();
+
+      List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+      Journal messageJournal = storageManager.getMessageJournal();
+
+      log.debug("Reading journal from " + config.getJournalDirectory());
+
+      messageJournal.start();
+
+      ((JournalImpl) messageJournal).load(records, null, null, false);
+
+      for (RecordInfo info : records)
+      {
+         byte[] data = info.data;
+
+         HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
+         Object o = JournalStorageManager.newObjectEncoding(info, storageManager);
+         if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
+         {
+            messages.put(info.id, ((MessageDescribe) o).msg);
+         }
+         else if (info.getUserRecordType() == JournalStorageManager.ADD_LARGE_MESSAGE)
+         {
+            messages.put(info.id, ((MessageDescribe) o).msg);
+         }
+         else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+         {
+            ReferenceDescribe ref = (ReferenceDescribe) o;
+            HashMap<Long, ReferenceDescribe> map = messageRefs.get(info.id);
+            if (map == null)
+            {
+               HashMap<Long, ReferenceDescribe> newMap = new HashMap<Long, ReferenceDescribe>();
+               newMap.put(ref.refEncoding.queueID, ref);
+               messageRefs.put(info.id, newMap);
+            }
+            else
+            {
+               map.put(ref.refEncoding.queueID, ref);
+            }
+         }
+         else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
+         {
+            acks.add(info);
+         }
+         else if (info.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
+         {
+            CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+            encoding.decode(buff);
+
+            Set<PagePosition> set = cursorRecords.get(encoding.queueID);
+
+            if (set == null)
+            {
+               set = new HashSet<PagePosition>();
+               cursorRecords.put(encoding.queueID, set);
+            }
+
+            set.add(encoding.position);
+         }
+         else if (info.userRecordType == JournalStorageManager.PAGE_TRANSACTION)
+         {
+            if (info.isUpdate)
+            {
+               PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+
+               pageUpdate.decode(buff);
+               pgTXs.add(pageUpdate.pageTX);
+            }
+            else
+            {
+               PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+               pageTransactionInfo.decode(buff);
+
+               pageTransactionInfo.setRecordID(info.id);
+               pgTXs.add(pageTransactionInfo.getTransactionID());
+            }
+         }
+      }
+
+      messageJournal.stop();
+
+      removeAcked(acks);
+   }
+
+   /**
+    * Go back through the messages and message refs we found in the journal and remove the ones that have been acked.
+    *
+    * @param acks the list of ack records we got from the journal
+    */
+   private void removeAcked(ArrayList<RecordInfo> acks)
+   {
+      for (RecordInfo info : acks)
+      {
+         AckDescribe ack = (AckDescribe) JournalStorageManager.newObjectEncoding(info, null);
+         HashMap<Long, ReferenceDescribe> referenceDescribeHashMap = messageRefs.get(info.id);
+         referenceDescribeHashMap.remove(ack.refEncoding.queueID);
+         if (referenceDescribeHashMap.size() == 0)
+         {
+            messages.remove(info.id);
+            messageRefs.remove(info.id);
+         }
+      }
+   }
+
+   /**
+    * Open the bindings journal and extract all bindings data.
+    *
+    * @throws Exception will be thrown if anything goes wrong reading the bindings journal
+    */
+   private void getBindings() throws Exception
+   {
+      List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+      Journal bindingsJournal = storageManager.getBindingsJournal();
+
+      bindingsJournal.start();
+
+      log.debug("Reading bindings journal from " + config.getBindingsDirectory());
+
+      ((JournalImpl) bindingsJournal).load(records, null, null, false);
+
+      for (RecordInfo info : records)
+      {
+         if (info.getUserRecordType() == JournalStorageManager.QUEUE_BINDING_RECORD)
+         {
+            PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) JournalStorageManager.newObjectEncoding(info, null);
+            queueBindings.put(bindingEncoding.getId(), bindingEncoding);
+         }
+      }
+
+      bindingsJournal.stop();
+   }
+
+   private void printDataAsXML()
+   {
+      try
+      {
+         xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
+         xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
+         printBindingsAsXML();
+         printAllMessagesAsXML();
+         xmlWriter.writeEndElement(); // end DOCUMENT_PARENT
+         xmlWriter.writeEndDocument();
+         xmlWriter.flush();
+         xmlWriter.close();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   private void printBindingsAsXML() throws XMLStreamException
+   {
+      xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
+      for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet())
+      {
+         PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
+         xmlWriter.writeEmptyElement(XmlDataConstants.BINDINGS_CHILD);
+         xmlWriter.writeAttribute(XmlDataConstants.BINDING_ADDRESS, bindingEncoding.getAddress().toString());
+         String filter = "";
+         if (bindingEncoding.getFilterString() != null)
+         {
+            filter = bindingEncoding.getFilterString().toString();
+         }
+         xmlWriter.writeAttribute(XmlDataConstants.BINDING_FILTER_STRING, filter);
+         xmlWriter.writeAttribute(XmlDataConstants.BINDING_QUEUE_NAME, bindingEncoding.getQueueName().toString());
+         xmlWriter.writeAttribute(XmlDataConstants.BINDING_ID, Long.toString(bindingEncoding.getId()));
+         bindingsPrinted++;
+      }
+      xmlWriter.writeEndElement(); // end BINDINGS_PARENT
+   }
+
+   private void printAllMessagesAsXML() throws XMLStreamException
+   {
+      xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
+
+      // Order here is important.  We must process the messages from the journal before we process those from the page
+      // files in order to get the messages in the right order.
+      for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet())
+      {
+         printSingleMessageAsXML((ServerMessage) messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
+      }
+
+      printPagedMessagesAsXML();
+
+      xmlWriter.writeEndElement(); // end "messages"
+   }
+
+   /**
+    * Reads from the page files and prints messages as it finds them (making sure to check acks and transactions
+    * from the journal).
+    */
+   private void printPagedMessagesAsXML()
+   {
+      try
+      {
+         ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+         final ExecutorService executor = Executors.newFixedThreadPool(10);
+         ExecutorFactory executorFactory = new ExecutorFactory()
+         {
+            public Executor getExecutor()
+            {
+               return executor;
+            }
+         };
+         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(config.getPagingDirectory(), 1000l, scheduled, executorFactory, false, null);
+         HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
+         addressSettingsRepository.setDefault(new AddressSettings());
+         StorageManager sm = new NullStorageManager();
+         PagingManager manager = new PagingManagerImpl(pageStoreFactory, sm, addressSettingsRepository);
+
+         manager.start();
+
+         SimpleString stores[] = manager.getStoreNames();
+
+         for (SimpleString store : stores)
+         {
+            PagingStore pageStore = manager.getPageStore(store);
+            String folder = null;
+
+            if (pageStore != null)
+            {
+               folder = pageStore.getFolder();
+            }
+            log.debug("Reading page store " + store + " folder = " + folder);
+
+            int pageId = (int) pageStore.getFirstPage();
+            for (int i = 0; i < pageStore.getNumberOfPages(); i++)
+            {
+               log.debug("Reading page " + pageId);
+               Page page = pageStore.createPage(pageId);
+               page.open();
+               List<PagedMessage> messages = page.read(sm);
+               page.close();
+
+               int messageId = 0;
+
+               for (PagedMessage message : messages)
+               {
+                  message.initMessage(sm);
+                  long queueIDs[] = message.getQueueIDs();
+                  List<String> queueNames = new ArrayList<String>();
+                  for (long queueID : queueIDs)
+                  {
+                     PagePosition posCheck = new PagePositionImpl(pageId, messageId);
+
+                     boolean acked = false;
+
+                     Set<PagePosition> positions = cursorRecords.get(queueID);
+                     if (positions != null)
+                     {
+                        acked = positions.contains(posCheck);
+                     }
+
+                     if (!acked)
+                     {
+                        queueNames.add(queueBindings.get(queueID).getQueueName().toString());
+                     }
+                  }
+
+                  if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID())))
+                  {
+                     printSingleMessageAsXML(message.getMessage(), queueNames);
+                  }
+
+                  messageId++;
+               }
+
+               pageId++;
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   private void printSingleMessageAsXML(ServerMessage message, List<String> queues) throws XMLStreamException
+   {
+      xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
+      printMessageAttributes(message);
+      printMessageProperties(message);
+      printMessageQueues(queues);
+      printMessageBody(message);
+      xmlWriter.writeEndElement(); // end MESSAGES_CHILD
+      messagesPrinted++;
+   }
+
+   private void printMessageBody(ServerMessage message) throws XMLStreamException
+   {
+      xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
+
+      if (message.isLargeMessage())
+      {
+         printLargeMessageBody((LargeServerMessage) message);
+      }
+      else
+      {
+         xmlWriter.writeCData(encode(message.getBodyBuffer().toByteBuffer().array()));
+      }
+      xmlWriter.writeEndElement(); // end MESSAGE_BODY
+   }
+
+   private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException
+   {
+      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
+      BodyEncoder encoder = null;
+
+      try
+      {
+         encoder = message.getBodyEncoder();
+         encoder.open();
+         long totalBytesWritten = 0;
+         Long bufferSize;
+         long bodySize = encoder.getLargeBodySize();
+         for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE)
+         {
+            Long remainder = bodySize - totalBytesWritten;
+            if (remainder >= LARGE_MESSAGE_CHUNK_SIZE)
+            {
+               bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
+            }
+            else
+            {
+               bufferSize = remainder;
+            }
+            HornetQBuffer buffer = HornetQBuffers.fixedBuffer(bufferSize.intValue());
+            encoder.encode(buffer, bufferSize.intValue());
+            xmlWriter.writeCData(encode(buffer.toByteBuffer().array()));
+            totalBytesWritten += bufferSize;
+         }
+         encoder.close();
+      }
+      catch (HornetQException e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         if (encoder != null)
+         {
+            try
+            {
+               encoder.close();
+            }
+            catch (HornetQException e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+
+   private void printMessageQueues(List<String> queues) throws XMLStreamException
+   {
+      xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
+      for (String queueName : queues)
+      {
+         xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
+         xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
+      }
+      xmlWriter.writeEndElement(); // end QUEUES_PARENT
+   }
+
+   private void printMessageProperties(ServerMessage message) throws XMLStreamException
+   {
+      xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
+      for (SimpleString key : message.getPropertyNames())
+      {
+         Object value = message.getObjectProperty(key);
+         xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
+         xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
+         if (value instanceof byte[])
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, encode((byte[]) value));
+         }
+         else
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value.toString());
+         }
+
+         if (value instanceof Boolean)
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BOOLEAN);
+         }
+         else if (value instanceof Byte)
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTE);
+         }
+         else if (value instanceof Short)
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_SHORT);
+         }
+         else if (value instanceof Integer)
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_INTEGER);
+         }
+         else if (value instanceof Long)
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_LONG);
+         }
+         else if (value instanceof Float)
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_FLOAT);
+         }
+         else if (value instanceof Double)
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_DOUBLE);
+         }
+         else if (value instanceof String)
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_STRING);
+         }
+         else if (value instanceof SimpleString)
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING);
+         }
+         else if (value instanceof byte[])
+         {
+            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTES);
+         }
+      }
+      xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
+   }
+
+   private void printMessageAttributes(ServerMessage message) throws XMLStreamException
+   {
+      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
+      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
+      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
+      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
+      byte rawType = message.getType();
+      String prettyType = XmlDataConstants.DEFAULT_TYPE_PRETTY;
+      if (rawType == Message.BYTES_TYPE)
+      {
+         prettyType = XmlDataConstants.BYTES_TYPE_PRETTY;
+      }
+      else if (rawType == Message.MAP_TYPE)
+      {
+         prettyType = XmlDataConstants.MAP_TYPE_PRETTY;
+      }
+      else if (rawType == Message.OBJECT_TYPE)
+      {
+         prettyType = XmlDataConstants.OBJECT_TYPE_PRETTY;
+      }
+      else if (rawType == Message.STREAM_TYPE)
+      {
+         prettyType = XmlDataConstants.STREAM_TYPE_PRETTY;
+      }
+      else if (rawType == Message.TEXT_TYPE)
+      {
+         prettyType = XmlDataConstants.TEXT_TYPE_PRETTY;
+      }
+      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
+      if (message.getUserID() != null)
+      {
+         xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
+      }
+   }
+
+   private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap)
+   {
+      List<String> queues = new ArrayList<String>();
+      for (ReferenceDescribe ref : refMap.values())
+      {
+         queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
+      }
+      return queues;
+   }
+
+   private static String encode(final byte[] data)
+   {
+      return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+   }
+
+   // Inner classes -------------------------------------------------
+
+   /**
+    * Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that.
+    */
+   class PrettyPrintHandler implements InvocationHandler
+   {
+      private XMLStreamWriter target;
+
+      private int depth = 0;
+
+      private final char INDENT_CHAR = ' ';
+
+      private final String LINE_SEPARATOR = System.getProperty("line.separator");
+
+
+      public PrettyPrintHandler(XMLStreamWriter target)
+      {
+         this.target = target;
+      }
+
+      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+      {
+         String m = method.getName();
+
+         if ("writeStartElement".equals(m))
+         {
+            target.writeCharacters(LINE_SEPARATOR);
+            target.writeCharacters(indent(depth));
+
+            depth++;
+         }
+         else if ("writeEndElement".equals(m))
+         {
+            depth--;
+
+            target.writeCharacters(LINE_SEPARATOR);
+            target.writeCharacters(indent(depth));
+         }
+         else if ("writeEmptyElement".equals(m) || "writeCData".equals(m))
+         {
+            target.writeCharacters(LINE_SEPARATOR);
+            target.writeCharacters(indent(depth));
+         }
+
+         method.invoke(target, args);
+
+         return null;
+      }
+
+      private String indent(int depth)
+      {
+         depth *= 3; // level of indentation
+         char[] output = new char[depth];
+         while (depth-- > 0)
+         {
+            output[depth] = INDENT_CHAR;
+         }
+         return new String(output);
+      }
+   }
+}
\ No newline at end of file

Copied: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataImporter.java (from rev 12286, branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java)
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataImporter.java	                        (rev 0)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataImporter.java	2012-03-12 19:34:46 UTC (rev 12288)
@@ -0,0 +1,529 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientRequestor;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.management.ManagementHelper;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.utils.Base64;
+
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Read XML output from <code>org.hornetq.core.persistence.impl.journal.XmlDataExporter</code>, create a core session, and
+ * send the messages to a running instance of HornetQ.  It uses the StAX <code>javax.xml.stream.XMLStreamReader</code>
+ * for speed and simplicity.
+ *
+ * @author Justin Bertram
+ */
+public class XmlDataImporter
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(XmlDataImporter.class);
+
+   XMLStreamReader reader;
+
+   ClientSession session;
+
+   // this session is really only needed if the "session" variable does not auto-commit sends
+   ClientSession managementSession;
+
+   boolean localSession = false;
+
+   Map<String, String> addressMap = new HashMap<String, String>();
+
+   String tempFileName = "";
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /**
+    * This is the normal constructor for programmatic access to the <code>org.hornetq.core.persistence.impl.journal.XmlDataImporter</code>
+    * if the session passed in uses auto-commit for sends.  If the session needs to be transactional then use the
+    * constructor which takes 2 sessions.
+    *
+    * @param inputStream the stream from which to read the XML for import
+    * @param session used for sending messages, must use auto-commit for sends
+    */
+   public XmlDataImporter(InputStream inputStream, ClientSession session)
+   {
+      this(inputStream, session, null);
+   }
+
+   /**
+    * This is the constructor to use if you wish to import all messages transactionally.  Pass in a session which doesn't
+    * use auto-commit for sends, and one that does (for management operations necessary during import).
+    *
+    * @param inputStream the stream from which to read the XML for import
+    * @param session used for sending messages, doesn't need to auto-commit sends
+    * @param managementSession used for management queries, must use auto-commit for sends
+    */
+   public XmlDataImporter(InputStream inputStream, ClientSession session, ClientSession managementSession)
+   {
+      try
+      {
+         reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
+         this.session = session;
+         if (managementSession != null)
+         {
+            this.managementSession = managementSession;
+         }
+         else
+         {
+            this.managementSession = session;
+         }
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   public XmlDataImporter(InputStream inputStream, String host, String port, boolean transactional)
+   {
+      try
+      {
+         reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
+         HashMap<String, Object> connectionParams = new HashMap<String, Object>();
+         connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
+         connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
+         ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
+         ClientSessionFactory sf = serverLocator.createSessionFactory();
+         session = sf.createSession(false, !transactional, true);
+         if (transactional) {
+            managementSession = sf.createSession(false, true, true);
+         }
+         localSession = true;
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   public XmlDataImporter(String inputFile, String host, String port, boolean transactional) throws FileNotFoundException
+   {
+      this(new FileInputStream(inputFile), host, port, transactional);
+   }
+
+   // Public --------------------------------------------------------
+
+   public static void main(String arg[])
+   {
+      if (arg.length < 3)
+      {
+         System.out.println("Use: java -cp hornetq-core.jar " + XmlDataImporter.class + " <inputFile> <host> <port> [<transactional>]");
+         System.exit(-1);
+      }
+
+      try
+      {
+         XmlDataImporter xmlDataImporter = new XmlDataImporter(arg[0], arg[1], arg[2], (arg.length > 3 && Boolean.parseBoolean(arg[3])));
+         xmlDataImporter.processXml();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+   public void processXml() throws Exception
+   {
+      try
+      {
+         while (reader.hasNext())
+         {
+            log.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
+            if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
+            {
+               if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
+               {
+                  bindQueue();
+               }
+               if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
+               {
+                  processMessage();
+               }
+            }
+            reader.next();
+         }
+
+         if (!session.isAutoCommitSends())
+         {
+            session.commit();
+         }
+      }
+      finally
+      {
+         // if the session was created in our constructor then close it (otherwise the caller will close it)
+         if (localSession)
+         {
+            session.close();
+         }
+      }
+   }
+
+   private void processMessage() throws Exception
+   {
+      Byte type = 0;
+      Byte priority = 0;
+      Long expiration = 0L;
+      Long timestamp = 0L;
+      org.hornetq.utils.UUID userId = null;
+      ArrayList<String> queues = new ArrayList<String>();
+
+      // get message's attributes
+      for (int i = 0; i < reader.getAttributeCount(); i++)
+      {
+         String attributeName = reader.getAttributeLocalName(i);
+         if (XmlDataConstants.MESSAGE_TYPE.equals(attributeName))
+         {
+            type = getMessageType(reader.getAttributeValue(i));
+         }
+         else if (XmlDataConstants.MESSAGE_PRIORITY.equals(attributeName))
+         {
+            priority = Byte.parseByte(reader.getAttributeValue(i));
+         }
+         else if (XmlDataConstants.MESSAGE_EXPIRATION.equals(attributeName))
+         {
+            expiration = Long.parseLong(reader.getAttributeValue(i));
+         }
+         else if (XmlDataConstants.MESSAGE_TIMESTAMP.equals(attributeName))
+         {
+            timestamp = Long.parseLong(reader.getAttributeValue(i));
+         }
+         else if (XmlDataConstants.MESSAGE_USER_ID.equals(attributeName))
+         {
+            userId = org.hornetq.utils.UUIDGenerator.getInstance().generateUUID();
+         }
+      }
+
+      Message message = session.createMessage(type, true, expiration, timestamp, priority);
+      message.setUserID(userId);
+
+      boolean endLoop = false;
+
+      // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
+      while (reader.hasNext())
+      {
+         switch (reader.getEventType())
+         {
+            case XMLStreamConstants.START_ELEMENT:
+               if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName()))
+               {
+                  processMessageBody(message);
+               }
+               else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName()))
+               {
+                  processMessageProperties(message);
+               }
+               else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName()))
+               {
+                  processMessageQueues(queues);
+               }
+               break;
+            case XMLStreamConstants.END_ELEMENT:
+               if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
+               {
+                  endLoop = true;
+               }
+               break;
+         }
+         if (endLoop)
+         {
+            break;
+         }
+         reader.next();
+      }
+
+      sendMessage(queues, message);
+   }
+
+   private Byte getMessageType(String value)
+   {
+      Byte type = Message.DEFAULT_TYPE;
+      if (value.equals(XmlDataConstants.DEFAULT_TYPE_PRETTY))
+      {
+         type = Message.DEFAULT_TYPE;
+      }
+      else if (value.equals(XmlDataConstants.BYTES_TYPE_PRETTY))
+      {
+         type = Message.BYTES_TYPE;
+      }
+      else if (value.equals(XmlDataConstants.MAP_TYPE_PRETTY))
+      {
+         type = Message.MAP_TYPE;
+      }
+      else if (value.equals(XmlDataConstants.OBJECT_TYPE_PRETTY))
+      {
+         type = Message.OBJECT_TYPE;
+      }
+      else if (value.equals(XmlDataConstants.STREAM_TYPE_PRETTY))
+      {
+         type = Message.STREAM_TYPE;
+      }
+      else if (value.equals(XmlDataConstants.TEXT_TYPE_PRETTY))
+      {
+         type = Message.TEXT_TYPE;
+      }
+      return type;
+   }
+
+   private void sendMessage(ArrayList<String> queues, Message message) throws Exception
+   {
+      StringBuilder logMessage = new StringBuilder();
+      String destination = addressMap.get(queues.get(0));
+
+      logMessage.append("Sending ").append(message).append(" to address: ").append(destination).append("; routed to queues: ");
+      ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
+
+      for (String queue : queues)
+      {
+         // Get the ID of the queues involved so the message can be routed properly.  This is done because we cannot
+         // send directly to a queue, we have to send to an address instead but not all the queues related to the
+         // address may need the message
+         ClientRequestor requestor = new ClientRequestor(managementSession, "jms.queue.hornetq.management");
+         ClientMessage managementMessage = managementSession.createMessage(false);
+         ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
+         managementSession.start();
+         ClientMessage reply = requestor.request(managementMessage);
+         long queueID = (Integer) ManagementHelper.getResult(reply);
+         logMessage.append(queue).append(", ");
+         buffer.putLong(queueID);
+      }
+
+      logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma
+      log.debug(logMessage);
+
+      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
+      ClientProducer producer = session.createProducer(destination);
+      producer.send(message);
+      producer.close();
+
+      if (tempFileName.length() > 0)
+      {
+         File tempFile = new File(tempFileName);
+         if (!tempFile.delete())
+         {
+            log.warn("Could not delete: " + tempFileName);
+         }
+      }
+   }
+
+   private void processMessageQueues(ArrayList<String> queues)
+   {
+      for (int i = 0; i < reader.getAttributeCount(); i++)
+      {
+         if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i)))
+         {
+            queues.add(reader.getAttributeValue(i));
+         }
+      }
+   }
+
+   private void processMessageProperties(Message message)
+   {
+      String key = "";
+      String value = "";
+      String propertyType = "";
+
+      for (int i = 0; i < reader.getAttributeCount(); i++)
+      {
+         String attributeName = reader.getAttributeLocalName(i);
+         if (XmlDataConstants.PROPERTY_NAME.equals(attributeName))
+         {
+            key = reader.getAttributeValue(i);
+         }
+         else if (XmlDataConstants.PROPERTY_VALUE.equals(attributeName))
+         {
+            value = reader.getAttributeValue(i);
+         }
+         else if (XmlDataConstants.PROPERTY_TYPE.equals(attributeName))
+         {
+            propertyType = reader.getAttributeValue(i);
+         }
+      }
+
+      if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SHORT))
+      {
+         message.putShortProperty(key, Short.parseShort(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BOOLEAN))
+      {
+         message.putBooleanProperty(key, Boolean.parseBoolean(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTE))
+      {
+         message.putByteProperty(key, Byte.parseByte(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTES))
+      {
+         message.putBytesProperty(key, decode(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_DOUBLE))
+      {
+         message.putDoubleProperty(key, Double.parseDouble(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_FLOAT))
+      {
+         message.putFloatProperty(key, Float.parseFloat(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_INTEGER))
+      {
+         message.putIntProperty(key, Integer.parseInt(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_LONG))
+      {
+         message.putLongProperty(key, Long.parseLong(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING))
+      {
+         message.putStringProperty(new SimpleString(key), new SimpleString(value));
+      }
+      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_STRING))
+      {
+         message.putStringProperty(key, value);
+      }
+   }
+
+   private void processMessageBody(Message message) throws XMLStreamException, IOException
+   {
+      boolean isLarge = false;
+
+      if (reader.getAttributeCount() > 0)
+      {
+         isLarge = Boolean.parseBoolean(reader.getAttributeValue(0));
+      }
+      reader.next();
+      if (isLarge)
+      {
+         tempFileName = UUID.randomUUID().toString() + ".tmp";
+         log.debug("Creating temp file " + tempFileName + " for large message.");
+         OutputStream out = new FileOutputStream(tempFileName);
+         while (reader.hasNext())
+         {
+            if (reader.getEventType() == XMLStreamConstants.END_ELEMENT)
+            {
+               break;
+            }
+            else
+            {
+               String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
+               String trimmedCharacters = characters.trim();
+               if (trimmedCharacters.length() > 0)  // this will skip "indentation" characters
+               {
+                  byte[] data = decode(trimmedCharacters);
+                  out.write(data);
+               }
+            }
+            reader.next();
+         }
+         out.close();
+         FileInputStream fileInputStream = new FileInputStream(tempFileName);
+         BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
+         ((ClientMessage) message).setBodyInputStream(bufferedInput);
+      }
+      else
+      {
+         reader.next(); // step past the "indentation" characters to get to the CDATA with the message body
+         String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
+         message.getBodyBuffer().writeBytes(decode(characters.trim()));
+      }
+   }
+
+   private void bindQueue() throws Exception
+   {
+      String queueName = "";
+      String address = "";
+      String filter = "";
+
+      for (int i = 0; i < reader.getAttributeCount(); i++)
+      {
+         String attributeName = reader.getAttributeLocalName(i);
+         if (XmlDataConstants.BINDING_ADDRESS.equals(attributeName))
+         {
+            address = reader.getAttributeValue(i);
+         }
+         else if (XmlDataConstants.BINDING_QUEUE_NAME.equals(attributeName))
+         {
+            queueName = reader.getAttributeValue(i);
+         }
+         else if (XmlDataConstants.BINDING_FILTER_STRING.equals(attributeName))
+         {
+            filter = reader.getAttributeValue(i);
+         }
+      }
+
+      ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
+
+      if (!queueQuery.isExists())
+      {
+         session.createQueue(address, queueName, filter, true);
+         log.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
+      }
+      else
+      {
+         log.debug("Binding " + queueName + " already exists so won't re-bind.");
+      }
+
+      addressMap.put(queueName, address);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private static byte[] decode(String data)
+   {
+      return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+   }
+
+   // Inner classes -------------------------------------------------
+
+}
\ No newline at end of file

Deleted: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java	2012-03-12 15:10:18 UTC (rev 12287)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataReader.java	2012-03-12 19:34:46 UTC (rev 12288)
@@ -1,489 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.persistence.impl.journal;
-
-
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientRequestor;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.hornetq.core.remoting.impl.netty.TransportConstants;
-import org.hornetq.utils.Base64;
-
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamConstants;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- * Read XML output from <code>org.hornetq.core.persistence.impl.journal.XmlDataWriter</code>, create a core session, and
- * send the messages to a running instance of HornetQ.  It uses the StAX <code>javax.xml.stream.XMLStreamReader</code> 
- * for speed and simplicity.
- *
- * @author Justin Bertram
- */
-public class XmlDataReader
-{
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(XmlDataReader.class);
-
-   XMLStreamReader reader;
-
-   ClientSession session;
-
-   boolean localSession = false;
-
-   Map<String, String> addressMap = new HashMap<String, String>();
-
-   String tempFileName = "";
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public XmlDataReader(InputStream inputStream, ClientSession session)
-   {
-      try
-      {
-         reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
-         this.session = session;
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-   }
-
-   public XmlDataReader(InputStream inputStream, String host, String port)
-   {
-      try
-      {
-         reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
-         HashMap<String, Object> connectionParams = new HashMap<String, Object>();
-         connectionParams.put(TransportConstants.HOST_PROP_NAME, host);
-         connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
-         ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
-         ClientSessionFactory sf = serverLocator.createSessionFactory();
-         session = sf.createSession(false, true, true);
-         localSession = true;
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-   }
-
-   public XmlDataReader(String inputFile, String host, String port) throws FileNotFoundException
-   {
-      this(new FileInputStream(inputFile), host, port);
-   }
-
-   // Public --------------------------------------------------------
-
-   public static void main(String arg[])
-   {
-      if (arg.length < 3)
-      {
-         System.out.println("Use: java -cp hornetq-core.jar " + XmlDataReader.class + " <inputFile> <host> <port>");
-         System.exit(-1);
-      }
-
-      try
-      {
-         XmlDataReader xmlDataReader = new XmlDataReader(arg[0], arg[1], arg[2]);
-         xmlDataReader.processXml();
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-   }
-
-   public void processXml() throws Exception
-   {
-      try
-      {
-         while (reader.hasNext())
-         {
-            log.debug("EVENT:[" + reader.getLocation().getLineNumber() + "][" + reader.getLocation().getColumnNumber() + "] ");
-            if (reader.getEventType() == XMLStreamConstants.START_ELEMENT)
-            {
-               if (XmlDataConstants.BINDINGS_CHILD.equals(reader.getLocalName()))
-               {
-                  bindQueue();
-               }
-               if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
-               {
-                  processMessage();
-               }
-            }
-            reader.next();
-         }
-      }
-      finally
-      {
-         // if the session was created in our constructor then close it
-         if (localSession)
-         {
-            session.close();
-         }
-      }
-   }
-
-   private void processMessage() throws Exception
-   {
-      Byte type = 0;
-      Byte priority = 0;
-      Long expiration = 0L;
-      Long timestamp = 0L;
-      org.hornetq.utils.UUID userId = null;
-      ArrayList<String> queues = new ArrayList<String>();
-
-      // get message's attributes
-      for (int i = 0; i < reader.getAttributeCount(); i++)
-      {
-         String attributeName = reader.getAttributeLocalName(i);
-         if (XmlDataConstants.MESSAGE_TYPE.equals(attributeName))
-         {
-            type = getMessageType(reader.getAttributeValue(i));
-         }
-         else if (XmlDataConstants.MESSAGE_PRIORITY.equals(attributeName))
-         {
-            priority = Byte.parseByte(reader.getAttributeValue(i));
-         }
-         else if (XmlDataConstants.MESSAGE_EXPIRATION.equals(attributeName))
-         {
-            expiration = Long.parseLong(reader.getAttributeValue(i));
-         }
-         else if (XmlDataConstants.MESSAGE_TIMESTAMP.equals(attributeName))
-         {
-            timestamp = Long.parseLong(reader.getAttributeValue(i));
-         }
-         else if (XmlDataConstants.MESSAGE_USER_ID.equals(attributeName))
-         {
-            userId = org.hornetq.utils.UUIDGenerator.getInstance().generateUUID();
-         }
-      }
-
-      Message message = session.createMessage(type, true, expiration, timestamp, priority);
-      message.setUserID(userId);
-
-      boolean endLoop = false;
-
-      // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
-      while (reader.hasNext())
-      {
-         switch (reader.getEventType())
-         {
-            case XMLStreamConstants.START_ELEMENT:
-               if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName()))
-               {
-                  processMessageBody(message);
-               }
-               else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName()))
-               {
-                  processMessageProperties(message);
-               }
-               else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName()))
-               {
-                  processMessageQueues(queues);
-               }
-               break;
-            case XMLStreamConstants.END_ELEMENT:
-               if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName()))
-               {
-                  endLoop = true;
-               }
-               break;
-         }
-         if (endLoop)
-         {
-            break;
-         }
-         reader.next();
-      }
-
-      sendMessage(queues, message);
-   }
-
-   private Byte getMessageType(String value)
-   {
-      Byte type = Message.DEFAULT_TYPE;
-      if (value.equals(XmlDataConstants.DEFAULT_TYPE_PRETTY))
-      {
-         type = Message.DEFAULT_TYPE;
-      }
-      else if (value.equals(XmlDataConstants.BYTES_TYPE_PRETTY))
-      {
-         type = Message.BYTES_TYPE;
-      }
-      else if (value.equals(XmlDataConstants.MAP_TYPE_PRETTY))
-      {
-         type = Message.MAP_TYPE;
-      }
-      else if (value.equals(XmlDataConstants.OBJECT_TYPE_PRETTY))
-      {
-         type = Message.OBJECT_TYPE;
-      }
-      else if (value.equals(XmlDataConstants.STREAM_TYPE_PRETTY))
-      {
-         type = Message.STREAM_TYPE;
-      }
-      else if (value.equals(XmlDataConstants.TEXT_TYPE_PRETTY))
-      {
-         type = Message.TEXT_TYPE;
-      }
-      return type;
-   }
-
-   private void sendMessage(ArrayList<String> queues, Message message) throws Exception
-   {
-      StringBuilder logMessage = new StringBuilder();
-      String destination = addressMap.get(queues.get(0));
-
-      logMessage.append("Sending ").append(message).append(" to address: ").append(destination).append("; routed to: ");
-      ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
-
-      for (String queue : queues)
-      {
-         // Get the ID of the queues involved so the message can be routed properly.  This is done because we cannot
-         // send directly to a queue, we have to send to an address instead but not all the queues related to the
-         // address may need the message
-         ClientRequestor requestor = new ClientRequestor(session, "jms.queue.hornetq.management");
-         ClientMessage managementMessage = session.createMessage(false);
-         ManagementHelper.putAttribute(managementMessage, "core.queue." + queue, "ID");
-         session.start();
-         ClientMessage reply = requestor.request(managementMessage);
-         long queueID = (Integer) ManagementHelper.getResult(reply);
-         logMessage.append(queue).append(", ");
-         buffer.putLong(queueID);
-      }
-
-      logMessage.delete(logMessage.length() - 2, logMessage.length()); // take off the trailing comma
-      log.debug(logMessage);
-
-      message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
-      ClientProducer producer = session.createProducer(destination);
-      producer.send(message);
-      producer.close();
-
-      if (tempFileName.length() > 0)
-      {
-         File tempFile = new File(tempFileName);
-         if (!tempFile.delete())
-         {
-            log.warn("Could not delete: " + tempFileName);
-         }
-      }
-   }
-
-   private void processMessageQueues(ArrayList<String> queues)
-   {
-      for (int i = 0; i < reader.getAttributeCount(); i++)
-      {
-         if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i)))
-         {
-            queues.add(reader.getAttributeValue(i));
-         }
-      }
-   }
-
-   private void processMessageProperties(Message message)
-   {
-      String key = "";
-      String value = "";
-      String propertyType = "";
-
-      for (int i = 0; i < reader.getAttributeCount(); i++)
-      {
-         String attributeName = reader.getAttributeLocalName(i);
-         if (XmlDataConstants.PROPERTY_NAME.equals(attributeName))
-         {
-            key = reader.getAttributeValue(i);
-         }
-         else if (XmlDataConstants.PROPERTY_VALUE.equals(attributeName))
-         {
-            value = reader.getAttributeValue(i);
-         }
-         else if (XmlDataConstants.PROPERTY_TYPE.equals(attributeName))
-         {
-            propertyType = reader.getAttributeValue(i);
-         }
-      }
-
-      if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SHORT))
-      {
-         message.putShortProperty(key, Short.parseShort(value));
-      }
-      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BOOLEAN))
-      {
-         message.putBooleanProperty(key, Boolean.parseBoolean(value));
-      }
-      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTE))
-      {
-         message.putByteProperty(key, Byte.parseByte(value));
-      }
-      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_BYTES))
-      {
-         message.putBytesProperty(key, decode(value));
-      }
-      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_DOUBLE))
-      {
-         message.putDoubleProperty(key, Double.parseDouble(value));
-      }
-      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_FLOAT))
-      {
-         message.putFloatProperty(key, Float.parseFloat(value));
-      }
-      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_INTEGER))
-      {
-         message.putIntProperty(key, Integer.parseInt(value));
-      }
-      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_LONG))
-      {
-         message.putLongProperty(key, Long.parseLong(value));
-      }
-      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING))
-      {
-         message.putStringProperty(new SimpleString(key), new SimpleString(value));
-      }
-      else if (propertyType.equals(XmlDataConstants.PROPERTY_TYPE_STRING))
-      {
-         message.putStringProperty(key, value);
-      }
-   }
-
-   private void processMessageBody(Message message) throws XMLStreamException, IOException
-   {
-      boolean isLarge = false;
-
-      if (reader.getAttributeCount() > 0)
-      {
-         isLarge = Boolean.parseBoolean(reader.getAttributeValue(0));
-      }
-      reader.next();
-      if (isLarge)
-      {
-         tempFileName = UUID.randomUUID().toString() + ".tmp";
-         log.debug("Creating temp file " + tempFileName + " for large message.");
-         OutputStream out = new FileOutputStream(tempFileName);
-         while (reader.hasNext())
-         {
-            if (reader.getEventType() == XMLStreamConstants.END_ELEMENT)
-            {
-               break;
-            }
-            else
-            {
-               String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
-               String trimmedCharacters = characters.trim();
-               if (trimmedCharacters.length() > 0)  // this will skip "indentation" characters
-               {
-                  byte[] data = decode(trimmedCharacters);
-                  out.write(data);
-               }
-            }
-            reader.next();
-         }
-         out.close();
-         FileInputStream fileInputStream = new FileInputStream(tempFileName);
-         BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
-         ((ClientMessage) message).setBodyInputStream(bufferedInput);
-      }
-      else
-      {
-         reader.next(); // step past the "indentation" characters to get to the CDATA with the message body
-         String characters = new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength());
-         message.getBodyBuffer().writeBytes(decode(characters.trim()));
-      }
-   }
-
-   private void bindQueue() throws Exception
-   {
-      String queueName = "";
-      String address = "";
-      String filter = "";
-
-      for (int i = 0; i < reader.getAttributeCount(); i++)
-      {
-         String attributeName = reader.getAttributeLocalName(i);
-         if (XmlDataConstants.BINDING_ADDRESS.equals(attributeName))
-         {
-            address = reader.getAttributeValue(i);
-         }
-         else if (XmlDataConstants.BINDING_QUEUE_NAME.equals(attributeName))
-         {
-            queueName = reader.getAttributeValue(i);
-         }
-         else if (XmlDataConstants.BINDING_FILTER_STRING.equals(attributeName))
-         {
-            filter = reader.getAttributeValue(i);
-         }
-      }
-
-      ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));
-
-      if (!queueQuery.isExists())
-      {
-         session.createQueue(address, queueName, filter, true);
-         log.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
-      }
-      else
-      {
-         log.debug("Binding " + queueName + " already exists so won't re-bind.");
-      }
-
-      addressMap.put(queueName, address);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   private static byte[] decode(String data)
-   {
-      return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
-   }
-
-   // Inner classes -------------------------------------------------
-
-}
\ No newline at end of file

Deleted: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java	2012-03-12 15:10:18 UTC (rev 12287)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/XmlDataWriter.java	2012-03-12 19:34:46 UTC (rev 12288)
@@ -1,744 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.persistence.impl.journal;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.BodyEncoder;
-import org.hornetq.core.paging.Page;
-import org.hornetq.core.paging.PagedMessage;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.PagingStoreFactory;
-import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
-import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
-import org.hornetq.core.paging.impl.PagingManagerImpl;
-import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.settings.HierarchicalRepository;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
-import org.hornetq.utils.Base64;
-import org.hornetq.utils.ExecutorFactory;
-
-import javax.xml.stream.XMLOutputFactory;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamWriter;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static org.hornetq.core.persistence.impl.journal.JournalStorageManager.*;
-
-/**
- * Read the journal, page, and large-message data from a stopped instance of HornetQ and save it in an XML format to
- * a file.  It uses the StAX <code>javax.xml.stream.XMLStreamWriter</code> for speed and simplicity.  Output can be
- * read by <code>org.hornetq.core.persistence.impl.journal.XmlDataReader</code>.
- *
- * @author Justin Bertram
- */
-public class XmlDataWriter
-{
-   // Constants -----------------------------------------------------
-
-   public static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
-
-   // Attributes ----------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(XmlDataWriter.class);
-
-   private JournalStorageManager storageManager;
-
-   private Configuration config;
-
-   private XMLStreamWriter xmlWriter;
-
-   // an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
-   private Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs;
-
-   // map of all message records hashed by their record ID (which will match the record ID of the message refs)
-   private HashMap<Long, Message> messages;
-
-   private Map<Long, Set<PagePosition>> cursorRecords;
-
-   private Set<Long> pgTXs;
-
-   HashMap<Long, PersistentQueueBindingEncoding> queueBindings;
-
-   long messagesPrinted = 0L;
-
-   long bindingsPrinted = 0L;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public XmlDataWriter(OutputStream out, String bindingsDir, String journalDir, String pagingDir, String largeMessagesDir)
-   {
-      config = new ConfigurationImpl();
-      config.setBindingsDirectory(bindingsDir);
-      config.setJournalDirectory(journalDir);
-      config.setPagingDirectory(pagingDir);
-      config.setLargeMessagesDirectory(largeMessagesDir);
-      config.setJournalType(JournalType.NIO);
-      final ExecutorService executor = Executors.newFixedThreadPool(1);
-      ExecutorFactory executorFactory = new ExecutorFactory()
-      {
-         public Executor getExecutor()
-         {
-            return executor;
-         }
-      };
-
-      storageManager = new JournalStorageManager(config, executorFactory);
-
-      messageRefs = new HashMap<Long, HashMap<Long, ReferenceDescribe>>();
-
-      messages = new HashMap<Long, Message>();
-
-      cursorRecords = new HashMap<Long, Set<PagePosition>>();
-
-      pgTXs = new HashSet<Long>();
-
-      queueBindings = new HashMap<Long, PersistentQueueBindingEncoding>();
-
-      try
-      {
-         XMLOutputFactory factory = XMLOutputFactory.newInstance();
-         XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out);
-         PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
-         xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(
-               XMLStreamWriter.class.getClassLoader(),
-               new Class[]{XMLStreamWriter.class},
-               handler);
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-   }
-
-   // Public --------------------------------------------------------
-
-   public static void main(String arg[])
-   {
-      if (arg.length < 4)
-      {
-         System.out.println("Use: java -cp hornetq-core.jar <bindings directory> <message directory> <page directory> <large-message directory>");
-         System.exit(-1);
-      }
-
-      try
-      {
-         XmlDataWriter xmlDataWriter = new XmlDataWriter(System.out, arg[0], arg[1], arg[2], arg[3]);
-         xmlDataWriter.writeXMLData();
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-   }
-
-   public void writeXMLData() throws Exception
-   {
-      long start = System.currentTimeMillis();
-      getBindings();
-      processMessageJournal();
-      printDataAsXML();
-      log.debug("\n\nProcessing took: " + (System.currentTimeMillis() - start) + "ms");
-      log.debug("Output " + messagesPrinted + " messages and " + bindingsPrinted + " bindings.");
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   /**
-    * Read through the message journal and stuff all the events/data we care about into local data structures.  We'll
-    * use this data later to print all the right information.
-    *
-    * @throws Exception will be thrown if anything goes wrong reading the journal
-    */
-   private void processMessageJournal() throws Exception
-   {
-      ArrayList<RecordInfo> acks = new ArrayList<RecordInfo>();
-
-      List<RecordInfo> records = new LinkedList<RecordInfo>();
-
-      Journal messageJournal = storageManager.getMessageJournal();
-
-      log.debug("Reading journal from " + config.getJournalDirectory());
-
-      messageJournal.start();
-
-      ((JournalImpl) messageJournal).load(records, null, null, false);
-
-      for (RecordInfo info : records)
-      {
-         byte[] data = info.data;
-
-         HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
-
-         Object o = JournalStorageManager.newObjectEncoding(info, storageManager);
-         if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
-         {
-            messages.put(info.id, ((MessageDescribe) o).msg);
-         }
-         else if (info.getUserRecordType() == JournalStorageManager.ADD_LARGE_MESSAGE)
-         {
-            messages.put(info.id, ((MessageDescribe) o).msg);
-         }
-         else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
-         {
-            ReferenceDescribe ref = (ReferenceDescribe) o;
-            HashMap<Long, ReferenceDescribe> map = messageRefs.get(info.id);
-            if (map == null)
-            {
-               HashMap<Long, ReferenceDescribe> newMap = new HashMap<Long, ReferenceDescribe>();
-               newMap.put(ref.refEncoding.queueID, ref);
-               messageRefs.put(info.id, newMap);
-            }
-            else
-            {
-               map.put(ref.refEncoding.queueID, ref);
-            }
-         }
-         else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
-         {
-            acks.add(info);
-         }
-         else if (info.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
-         {
-            CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
-            encoding.decode(buff);
-
-            Set<PagePosition> set = cursorRecords.get(encoding.queueID);
-
-            if (set == null)
-            {
-               set = new HashSet<PagePosition>();
-               cursorRecords.put(encoding.queueID, set);
-            }
-
-            set.add(encoding.position);
-         }
-         else if (info.userRecordType == JournalStorageManager.PAGE_TRANSACTION)
-         {
-            if (info.isUpdate)
-            {
-               PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
-
-               pageUpdate.decode(buff);
-               pgTXs.add(pageUpdate.pageTX);
-            }
-            else
-            {
-               PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
-
-               pageTransactionInfo.decode(buff);
-
-               pageTransactionInfo.setRecordID(info.id);
-               pgTXs.add(pageTransactionInfo.getTransactionID());
-            }
-         }
-      }
-
-      messageJournal.stop();
-
-      removeAcked(acks);
-   }
-
-   /**
-    * Go back through the messages and message refs we found in the journal and remove the ones that have been acked.
-    *
-    * @param acks the list of ack records we got from the journal
-    */
-   private void removeAcked(ArrayList<RecordInfo> acks)
-   {
-      for (RecordInfo info : acks)
-      {
-         AckDescribe ack = (AckDescribe) JournalStorageManager.newObjectEncoding(info, null);
-         HashMap<Long, ReferenceDescribe> referenceDescribeHashMap = messageRefs.get(info.id);
-         referenceDescribeHashMap.remove(ack.refEncoding.queueID);
-         if (referenceDescribeHashMap.size() == 0)
-         {
-            messages.remove(info.id);
-            messageRefs.remove(info.id);
-         }
-      }
-   }
-
-   /**
-    * Open the bindings journal and extract all bindings data.
-    *
-    * @throws Exception will be thrown if anything goes wrong reading the bindings journal
-    */
-   private void getBindings() throws Exception
-   {
-      List<RecordInfo> records = new LinkedList<RecordInfo>();
-
-      Journal bindingsJournal = storageManager.getBindingsJournal();
-
-      bindingsJournal.start();
-
-      log.debug("Reading bindings journal from " + config.getBindingsDirectory());
-
-      ((JournalImpl) bindingsJournal).load(records, null, null, false);
-
-      for (RecordInfo info : records)
-      {
-         if (info.getUserRecordType() == JournalStorageManager.QUEUE_BINDING_RECORD)
-         {
-            PersistentQueueBindingEncoding bindingEncoding = (PersistentQueueBindingEncoding) JournalStorageManager.newObjectEncoding(info, null);
-            queueBindings.put(bindingEncoding.getId(), bindingEncoding);
-         }
-      }
-
-      bindingsJournal.stop();
-   }
-
-   private void printDataAsXML()
-   {
-      try
-      {
-         xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
-         xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
-         printBindingsAsXML();
-         printAllMessagesAsXML();
-         xmlWriter.writeEndElement(); // end DOCUMENT_PARENT
-         xmlWriter.writeEndDocument();
-         xmlWriter.flush();
-         xmlWriter.close();
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-   }
-
-   private void printBindingsAsXML() throws XMLStreamException
-   {
-      xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
-      for (Map.Entry<Long, PersistentQueueBindingEncoding> queueBindingEncodingEntry : queueBindings.entrySet())
-      {
-         PersistentQueueBindingEncoding bindingEncoding = queueBindings.get(queueBindingEncodingEntry.getKey());
-         xmlWriter.writeEmptyElement(XmlDataConstants.BINDINGS_CHILD);
-         xmlWriter.writeAttribute(XmlDataConstants.BINDING_ADDRESS, bindingEncoding.getAddress().toString());
-         String filter = "";
-         if (bindingEncoding.getFilterString() != null)
-         {
-            filter = bindingEncoding.getFilterString().toString();
-         }
-         xmlWriter.writeAttribute(XmlDataConstants.BINDING_FILTER_STRING, filter);
-         xmlWriter.writeAttribute(XmlDataConstants.BINDING_QUEUE_NAME, bindingEncoding.getQueueName().toString());
-         xmlWriter.writeAttribute(XmlDataConstants.BINDING_ID, Long.toString(bindingEncoding.getId()));
-         bindingsPrinted++;
-      }
-      xmlWriter.writeEndElement(); // end BINDINGS_PARENT
-   }
-
-   private void printAllMessagesAsXML() throws XMLStreamException
-   {
-      xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT);
-
-      // Order here is important.  We must process the messages from the journal before we process those from the page
-      // files in order to get the messages in the right order.
-      for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet())
-      {
-         printSingleMessageAsXML((ServerMessage) messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey())));
-      }
-
-      printPagedMessagesAsXML();
-
-      xmlWriter.writeEndElement(); // end "messages"
-   }
-
-   /**
-    * Reads from the page files and prints messages as it finds them (making sure to check acks and transactions
-    * from the journal).
-    */
-   private void printPagedMessagesAsXML()
-   {
-      try
-      {
-         ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
-         final ExecutorService executor = Executors.newFixedThreadPool(10);
-         ExecutorFactory executorFactory = new ExecutorFactory()
-         {
-            public Executor getExecutor()
-            {
-               return executor;
-            }
-         };
-         PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(config.getPagingDirectory(), 1000l, scheduled, executorFactory, false, null);
-         HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
-         addressSettingsRepository.setDefault(new AddressSettings());
-         StorageManager sm = new NullStorageManager();
-         PagingManager manager = new PagingManagerImpl(pageStoreFactory, sm, addressSettingsRepository);
-
-         manager.start();
-
-         SimpleString stores[] = manager.getStoreNames();
-
-         for (SimpleString store : stores)
-         {
-            PagingStore pageStore = manager.getPageStore(store);
-            String folder = null;
-
-            if (pageStore != null)
-            {
-               folder = pageStore.getFolder();
-            }
-            log.debug("Reading page store " + store + " folder = " + folder);
-
-            int pageId = (int) pageStore.getFirstPage();
-            for (int i = 0; i < pageStore.getNumberOfPages(); i++)
-            {
-               log.debug("Reading page " + pageId);
-               Page page = pageStore.createPage(pageId);
-               page.open();
-               List<PagedMessage> messages = page.read(sm);
-               page.close();
-
-               int messageId = 0;
-
-               for (PagedMessage message : messages)
-               {
-                  message.initMessage(sm);
-                  long queueIDs[] = message.getQueueIDs();
-                  List<String> queueNames = new ArrayList<String>();
-                  for (long queueID : queueIDs)
-                  {
-                     PagePosition posCheck = new PagePositionImpl(pageId, messageId);
-
-                     boolean acked = false;
-
-                     Set<PagePosition> positions = cursorRecords.get(queueID);
-                     if (positions != null)
-                     {
-                        acked = positions.contains(posCheck);
-                     }
-
-                     if (!acked)
-                     {
-                        queueNames.add(queueBindings.get(queueID).getQueueName().toString());
-                     }
-                  }
-
-                  if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID())))
-                  {
-                     printSingleMessageAsXML(message.getMessage(), queueNames);
-                  }
-
-                  messageId++;
-               }
-
-               pageId++;
-            }
-         }
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-   }
-
-   private void printSingleMessageAsXML(ServerMessage message, List<String> queues) throws XMLStreamException
-   {
-      xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
-      printMessageAttributes(message);
-      printMessageProperties(message);
-      printMessageQueues(queues);
-      printMessageBody(message);
-      xmlWriter.writeEndElement(); // end MESSAGES_CHILD
-      messagesPrinted++;
-   }
-
-   private void printMessageBody(ServerMessage message) throws XMLStreamException
-   {
-      xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
-
-      if (message.isLargeMessage())
-      {
-         printLargeMessageBody((LargeServerMessage) message);
-      }
-      else
-      {
-         xmlWriter.writeCData(encode(message.getBodyBuffer().toByteBuffer().array()));
-      }
-      xmlWriter.writeEndElement(); // end MESSAGE_BODY
-   }
-
-   private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException
-   {
-      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
-      BodyEncoder encoder = null;
-
-      try
-      {
-         encoder = message.getBodyEncoder();
-         encoder.open();
-         long totalBytesWritten = 0;
-         Long bufferSize;
-         long bodySize = encoder.getLargeBodySize();
-         for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE)
-         {
-            Long remainder = bodySize - totalBytesWritten;
-            if (remainder >= LARGE_MESSAGE_CHUNK_SIZE)
-            {
-               bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
-            }
-            else
-            {
-               bufferSize = remainder;
-            }
-            HornetQBuffer buffer = HornetQBuffers.fixedBuffer(bufferSize.intValue());
-            encoder.encode(buffer, bufferSize.intValue());
-            xmlWriter.writeCData(encode(buffer.toByteBuffer().array()));
-            totalBytesWritten += bufferSize;
-         }
-         encoder.close();
-      }
-      catch (HornetQException e)
-      {
-         e.printStackTrace();
-      }
-      finally
-      {
-         if (encoder != null)
-         {
-            try
-            {
-               encoder.close();
-            }
-            catch (HornetQException e)
-            {
-               e.printStackTrace();
-            }
-         }
-      }
-   }
-
-   private void printMessageQueues(List<String> queues) throws XMLStreamException
-   {
-      xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
-      for (String queueName : queues)
-      {
-         xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
-         xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
-      }
-      xmlWriter.writeEndElement(); // end QUEUES_PARENT
-   }
-
-   private void printMessageProperties(ServerMessage message) throws XMLStreamException
-   {
-      xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
-      for (SimpleString key : message.getPropertyNames())
-      {
-         Object value = message.getObjectProperty(key);
-         xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
-         xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
-         if (value instanceof byte[])
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, encode((byte[]) value));
-         }
-         else
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, value.toString());
-         }
-
-         if (value instanceof Boolean)
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BOOLEAN);
-         }
-         else if (value instanceof Byte)
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTE);
-         }
-         else if (value instanceof Short)
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_SHORT);
-         }
-         else if (value instanceof Integer)
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_INTEGER);
-         }
-         else if (value instanceof Long)
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_LONG);
-         }
-         else if (value instanceof Float)
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_FLOAT);
-         }
-         else if (value instanceof Double)
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_DOUBLE);
-         }
-         else if (value instanceof String)
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_STRING);
-         }
-         else if (value instanceof SimpleString)
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING);
-         }
-         else if (value instanceof byte[])
-         {
-            xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, XmlDataConstants.PROPERTY_TYPE_BYTES);
-         }
-      }
-      xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
-   }
-
-   private void printMessageAttributes(ServerMessage message) throws XMLStreamException
-   {
-      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
-      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
-      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
-      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
-      byte rawType = message.getType();
-      String prettyType = XmlDataConstants.DEFAULT_TYPE_PRETTY;
-      if (rawType == Message.BYTES_TYPE)
-      {
-         prettyType = XmlDataConstants.BYTES_TYPE_PRETTY;
-      }
-      else if (rawType == Message.MAP_TYPE)
-      {
-         prettyType = XmlDataConstants.MAP_TYPE_PRETTY;
-      }
-      else if (rawType == Message.OBJECT_TYPE)
-      {
-         prettyType = XmlDataConstants.OBJECT_TYPE_PRETTY;
-      }
-      else if (rawType == Message.STREAM_TYPE)
-      {
-         prettyType = XmlDataConstants.STREAM_TYPE_PRETTY;
-      }
-      else if (rawType == Message.TEXT_TYPE)
-      {
-         prettyType = XmlDataConstants.TEXT_TYPE_PRETTY;
-      }
-      xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
-      if (message.getUserID() != null)
-      {
-         xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
-      }
-   }
-
-   private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap)
-   {
-      List<String> queues = new ArrayList<String>();
-      for (ReferenceDescribe ref : refMap.values())
-      {
-         queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
-      }
-      return queues;
-   }
-
-   private static String encode(final byte[] data)
-   {
-      return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
-   }
-
-   // Inner classes -------------------------------------------------
-
-   /**
-    * Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that.
-    */
-   class PrettyPrintHandler implements InvocationHandler
-   {
-      private XMLStreamWriter target;
-
-      private int depth = 0;
-
-      private final char INDENT_CHAR = ' ';
-
-      private final String LINE_SEPARATOR = System.getProperty("line.separator");
-
-
-      public PrettyPrintHandler(XMLStreamWriter target)
-      {
-         this.target = target;
-      }
-
-      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
-      {
-         String m = method.getName();
-
-         if ("writeStartElement".equals(m))
-         {
-            target.writeCharacters(LINE_SEPARATOR);
-            target.writeCharacters(indent(depth));
-
-            depth++;
-         }
-         else if ("writeEndElement".equals(m))
-         {
-            depth--;
-
-            target.writeCharacters(LINE_SEPARATOR);
-            target.writeCharacters(indent(depth));
-         }
-         else if ("writeEmptyElement".equals(m) || "writeCData".equals(m))
-         {
-            target.writeCharacters(LINE_SEPARATOR);
-            target.writeCharacters(indent(depth));
-         }
-
-         method.invoke(target, args);
-
-         return null;
-      }
-
-      private String indent(int depth)
-      {
-         depth *= 3; // level of indentation
-         char[] output = new char[depth];
-         while (depth-- > 0)
-         {
-            output[depth] = INDENT_CHAR;
-         }
-         return new String(output);
-      }
-   }
-}
\ No newline at end of file

Modified: branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java	2012-03-12 15:10:18 UTC (rev 12287)
+++ branches/Branch_2_2_EAP_HORNETQ-787/tests/src/org/hornetq/tests/integration/persistence/XmlImportExportTest.java	2012-03-12 19:34:46 UTC (rev 12288)
@@ -25,11 +25,10 @@
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
-import org.hornetq.core.persistence.impl.journal.XmlDataReader;
-import org.hornetq.core.persistence.impl.journal.XmlDataWriter;
+import org.hornetq.core.persistence.impl.journal.XmlDataExporter;
+import org.hornetq.core.persistence.impl.journal.XmlDataImporter;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.integration.paging.PagingSendTest;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.tests.util.UnitTestCase;
 import org.hornetq.utils.UUIDGenerator;
@@ -110,8 +109,8 @@
       server.stop();
 
       ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
-      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
-      xmlDataWriter.writeXMLData();
+      XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataExporter.writeXMLData();
       System.out.print(new String(xmlOutputStream.toByteArray()));
 
       clearData();
@@ -121,8 +120,8 @@
       session = factory.createSession(false, true, true);
 
       ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
-      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
-      xmlDataReader.processXml();
+      XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+      xmlDataImporter.processXml();
       ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
       session.start();
 
@@ -189,8 +188,8 @@
       server.stop();
 
       ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
-      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
-      xmlDataWriter.writeXMLData();
+      XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataExporter.writeXMLData();
       System.out.print(new String(xmlOutputStream.toByteArray()));
 
       clearData();
@@ -200,8 +199,8 @@
       session = factory.createSession(false, true, true);
 
       ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
-      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
-      xmlDataReader.processXml();
+      XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+      xmlDataImporter.processXml();
       ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
       session.start();
 
@@ -250,8 +249,8 @@
       server.stop();
 
       ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
-      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
-      xmlDataWriter.writeXMLData();
+      XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataExporter.writeXMLData();
       System.out.print(new String(xmlOutputStream.toByteArray()));
 
       clearData();
@@ -261,8 +260,8 @@
       session = factory.createSession(false, true, true);
 
       ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
-      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
-      xmlDataReader.processXml();
+      XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+      xmlDataImporter.processXml();
       ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
       session.start();
 
@@ -293,8 +292,8 @@
       server.stop();
 
       ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
-      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
-      xmlDataWriter.writeXMLData();
+      XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataExporter.writeXMLData();
       System.out.print(new String(xmlOutputStream.toByteArray()));
 
       clearData();
@@ -304,8 +303,8 @@
       session = factory.createSession(false, true, true);
 
       ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
-      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
-      xmlDataReader.processXml();
+      XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+      xmlDataImporter.processXml();
 
       ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString("queueName1"));
 
@@ -331,77 +330,72 @@
       ClientSessionFactory factory = locator.createSessionFactory();
       ClientSession session = factory.createSession(false, false);
 
-      try
-      {
-         LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
+      LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager) server.getStorageManager());
 
-         fileMessage.setMessageID(1005);
-         fileMessage.setDurable(true);
+      fileMessage.setMessageID(1005);
+      fileMessage.setDurable(true);
 
-         for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
-         {
-            fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
-         }
+      for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+      {
+         fileMessage.addBytes(new byte[]{UnitTestCase.getSamplebyte(i)});
+      }
 
-         fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+      fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
-//         fileMessage.releaseResources();
+      fileMessage.releaseResources();
 
-         session.createQueue("A", "A");
+      session.createQueue("A", "A");
 
-         ClientProducer prod = session.createProducer("A");
+      ClientProducer prod = session.createProducer("A");
 
-         prod.send(fileMessage);
+      prod.send(fileMessage);
 
-         fileMessage.deleteFile();
+      fileMessage.deleteFile();
 
-         session.commit();
+      session.commit();
 
-         session.close();
-         locator.close();
-         server.stop();
+      session.close();
+      locator.close();
+      server.stop();
 
-         ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
-         XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
-         xmlDataWriter.writeXMLData();
-         System.out.print(new String(xmlOutputStream.toByteArray()));
+      ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+      XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataExporter.writeXMLData();
+      System.out.print(new String(xmlOutputStream.toByteArray()));
 
-         clearData();
-         server.start();
-         locator = createFactory(false);
-         factory = locator.createSessionFactory();
-         session = factory.createSession(false, true, true);
+      clearData();
+      server.start();
+      locator = createFactory(false);
+      factory = locator.createSessionFactory();
+      session = factory.createSession(false, true, true);
 
-         ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
-         XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
-         xmlDataReader.processXml();
-         session.close();
-         session = factory.createSession(false, false);
-         session.start();
+      ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+      XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+      xmlDataImporter.processXml();
+      session.close();
+      session = factory.createSession(false, false);
+      session.start();
 
-         ClientConsumer cons = session.createConsumer("A");
+      ClientConsumer cons = session.createConsumer("A");
 
-         ClientMessage msg = cons.receive(CONSUMER_TIMEOUT);
+      ClientMessage msg = cons.receive(CONSUMER_TIMEOUT);
 
-         Assert.assertNotNull(msg);
+      Assert.assertNotNull(msg);
 
-         Assert.assertEquals(2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize());
+      Assert.assertEquals(2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, msg.getBodySize());
 
-         for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
-         {
-            Assert.assertEquals(UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
-         }
-
-         msg.acknowledge();
-         session.commit();
-      }
-      finally
+      for (int i = 0; i < 2 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
       {
-         session.close();
-         factory.close();
-         locator.close();
-         server.stop();
+         Assert.assertEquals(UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
       }
+
+      msg.acknowledge();
+      session.commit();
+
+      session.close();
+      factory.close();
+      locator.close();
+      server.stop();
    }
 
    public void testPartialQueue() throws Exception
@@ -432,8 +426,8 @@
       server.stop();
 
       ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
-      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
-      xmlDataWriter.writeXMLData();
+      XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataExporter.writeXMLData();
       System.out.print(new String(xmlOutputStream.toByteArray()));
 
       clearData();
@@ -443,8 +437,8 @@
       session = factory.createSession(false, true, true);
 
       ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
-      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
-      xmlDataReader.processXml();
+      XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+      xmlDataImporter.processXml();
       consumer = session.createConsumer("myQueue1");
       session.start();
       msg = consumer.receive(CONSUMER_TIMEOUT);
@@ -462,6 +456,9 @@
 
    public void testPaging() throws Exception
    {
+      final String MY_ADDRESS = "myAddress";
+      final String MY_QUEUE = "myQueue";
+
       HornetQServer server = createServer(true);
 
       AddressSettings defaultSetting = new AddressSettings();
@@ -472,8 +469,7 @@
 
       ServerLocator locator = createInVMNonHALocator();
       // Making it synchronous, just because we want to stop sending messages as soon as the page-store becomes in
-      // page mode
-      // and we could only guarantee that by setting it to synchronous
+      // page mode and we could only guarantee that by setting it to synchronous
       locator.setBlockOnNonDurableSend(true);
       locator.setBlockOnDurableSend(true);
       locator.setBlockOnAcknowledge(true);
@@ -481,13 +477,11 @@
       ClientSessionFactory factory = locator.createSessionFactory();
       ClientSession session = factory.createSession(false, true, true);
 
-      session.createQueue("myAddress", "myQueue1", true);
+      session.createQueue(MY_ADDRESS, MY_QUEUE, true);
 
-      ClientProducer producer = session.createProducer("myAddress");
+      ClientProducer producer = session.createProducer(MY_ADDRESS);
 
-      ClientMessage message = null;
-
-      message = session.createMessage(true);
+      ClientMessage message = session.createMessage(true);
       message.getBodyBuffer().writeBytes(new byte[1024]);
 
       for (int i = 0; i < 200; i++)
@@ -500,8 +494,8 @@
       server.stop();
 
       ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
-      XmlDataWriter xmlDataWriter = new XmlDataWriter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
-      xmlDataWriter.writeXMLData();
+      XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataExporter.writeXMLData();
       System.out.print(new String(xmlOutputStream.toByteArray()));
 
       clearData();
@@ -511,10 +505,10 @@
       session = factory.createSession(false, true, true);
 
       ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
-      XmlDataReader xmlDataReader = new XmlDataReader(xmlInputStream, session);
-      xmlDataReader.processXml();
+      XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session);
+      xmlDataImporter.processXml();
 
-      ClientConsumer consumer = session.createConsumer("myQueue1");
+      ClientConsumer consumer = session.createConsumer(MY_QUEUE);
 
       session.start();
 
@@ -530,6 +524,53 @@
       server.stop();
    }
 
+   public void testTransactional() throws Exception
+   {
+      final String QUEUE_NAME = "A1";
+      HornetQServer server = createServer(true);
+      server.start();
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = locator.createSessionFactory();
+      ClientSession session = factory.createSession(false, true, true);
+
+      session.createQueue(QUEUE_NAME, QUEUE_NAME);
+
+      ClientProducer producer = session.createProducer(QUEUE_NAME);
+
+
+      ClientMessage msg = session.createMessage(true);
+      producer.send(msg);
+
+      session.close();
+      locator.close();
+      server.stop();
+
+      ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream();
+      XmlDataExporter xmlDataExporter = new XmlDataExporter(xmlOutputStream, getBindingsDir(), getJournalDir(), getPageDir(), getLargeMessagesDir());
+      xmlDataExporter.writeXMLData();
+      System.out.print(new String(xmlOutputStream.toByteArray()));
+
+      clearData();
+      server.start();
+      locator = createInVMNonHALocator();
+      factory = locator.createSessionFactory();
+      session = factory.createSession(false, false, true);
+      ClientSession managementSession = factory.createSession(false, true, true);
+
+      ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray());
+      XmlDataImporter xmlDataImporter = new XmlDataImporter(xmlInputStream, session, managementSession);
+      xmlDataImporter.processXml();
+      ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+      session.start();
+
+      msg = consumer.receive(CONSUMER_TIMEOUT);
+      Assert.assertNotNull(msg);
+
+      session.close();
+      locator.close();
+      server.stop();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list